You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "junrao (via GitHub)" <gi...@apache.org> on 2023/05/24 00:22:06 UTC

[GitHub] [kafka] junrao commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

junrao commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1203191600


##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -522,7 +522,12 @@ class BrokerServer(
       }
 
       Some(new RemoteLogManager(remoteLogManagerConfig, config.brokerId, config.logDirs.head, time,
-        (tp: TopicPartition) => logManager.getLog(tp).asJava));
+        (tp: TopicPartition) => logManager.getLog(tp).asJava,
+        (tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => {
+          logManager.getLog(tp).foreach(log => {
+            log.updateLogStartOffsetFromRemoteTier(remoteLogStartOffset)
+          })
+        }));

Review Comment:
   unnecessary semicolon. Ditto in KafkaServer.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -618,6 +629,193 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) {
+            logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset);
+            updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset);
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private long remainingBreachedSize = 0L;
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(long remainingBreachedSize) {
+                this.remainingBreachedSize = remainingBreachedSize;
+            }
+
+            private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> {
+                    // Assumption that segments contain size > 0
+                    if (checkSizeRetention && remainingBreachedSize > 0) {
+                        remainingBreachedSize -= x.segmentSizeInBytes();
+                        return remainingBreachedSize >= 0;
+                    } else return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " +
+                            "${log.config.retentionSize} breach. Log size after deletion will be " +
+                            "${remainingBreachedSize + log.config.retentionSize}.");
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset());
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                // No need to update the logStartOffset.
+                return isSegmentDeleted;
+            }
+
+            // There are two cases:
+            // 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the
+            //    leader-epoch gets bumped but the log-start-offset gets truncated back to 0.
+            // 2) To remove the unreferenced segments.
+            private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x ->
+                        x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment ${} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
+                                    RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
+                                    RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+            public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata,
+                                                               boolean checkTimestampRetention,
+                                                               long cleanupTs,
+                                                               long retentionMs)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> checkTimestampRetention && x.maxTimestampMs() <= cleanupTs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to retention time " +
+                            "{}ms breach based on the largest record timestamp in the segment", metadata.remoteLogSegmentId(), retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled())
+                return;
+
+            // Cleanup remote log segments and update the log start offset if applicable.
+            final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+            if (!segmentMetadataIter.hasNext()) {
+                return;
+            }
+
+            final Optional<UnifiedLog> logOptional = fetchLog.apply(topicIdPartition.topicPartition());
+            if (!logOptional.isPresent()) {
+                return;
+            }
+
+            final UnifiedLog log = logOptional.get();
+            final Option<LeaderEpochFileCache> leaderEpochCacheOption = log.leaderEpochCache();
+            if (leaderEpochCacheOption.isEmpty()) {
+                return;
+            }
+
+            final long retentionMs = log.config().retentionMs;
+
+            // Iterate once
+            //  - to build the log size of segments with base-offset < local-log-start-offset
+            //  - to collect all the epochs of remote log segments
+            // These values can be cached and updated in RLMTask for this topic partition without computing in each
+            // iteration. But the logic can become little complex and need to cover different scenarios to avoid any
+            // leaks. We can have a followup to improve it by maintaining these values through both copying and deletion.
+            final Set<Integer> epochsSet = new HashSet<>();
+            long totalSizeEarlierToLocalLogStartOffset = 0L;
+            while (segmentMetadataIter.hasNext()) {

Review Comment:
   Should we exit the loop the first time a remote segment offset passes localLogStartOffset?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -2128,7 +2182,7 @@ object LogMetricNames {
 
 case class RetentionMsBreach(log: UnifiedLog) extends SegmentDeletionReason {
   override def logReason(toDelete: List[LogSegment]): Unit = {
-    val retentionMs = log.config.retentionMs
+    val retentionMs = log.config.remoteLogConfig.localRetentionMs

Review Comment:
   retention time depends on remote storage being enabled, right? Ditto in line 2284.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -152,13 +156,15 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
                             int brokerId,
                             String logDir,
                             Time time,
-                            Function<TopicPartition, Optional<UnifiedLog>> fetchLog) {
+                            Function<TopicPartition, Optional<UnifiedLog>> fetchLog,
+                            BiConsumer<TopicPartition, Long> updateRemoteLogStartOffset) {

Review Comment:
    missing javadoc for new param.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -618,6 +629,193 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) {
+            logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset);
+            updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset);
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private long remainingBreachedSize = 0L;
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(long remainingBreachedSize) {
+                this.remainingBreachedSize = remainingBreachedSize;
+            }
+
+            private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> {
+                    // Assumption that segments contain size > 0
+                    if (checkSizeRetention && remainingBreachedSize > 0) {
+                        remainingBreachedSize -= x.segmentSizeInBytes();
+                        return remainingBreachedSize >= 0;
+                    } else return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " +
+                            "${log.config.retentionSize} breach. Log size after deletion will be " +
+                            "${remainingBreachedSize + log.config.retentionSize}.");
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset());
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                // No need to update the logStartOffset.
+                return isSegmentDeleted;
+            }
+
+            // There are two cases:
+            // 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the

Review Comment:
   Hmm, not sure that I follow the comment.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -618,6 +629,193 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) {
+            logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset);
+            updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset);
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private long remainingBreachedSize = 0L;
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(long remainingBreachedSize) {
+                this.remainingBreachedSize = remainingBreachedSize;
+            }
+
+            private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> {
+                    // Assumption that segments contain size > 0
+                    if (checkSizeRetention && remainingBreachedSize > 0) {
+                        remainingBreachedSize -= x.segmentSizeInBytes();
+                        return remainingBreachedSize >= 0;
+                    } else return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " +
+                            "${log.config.retentionSize} breach. Log size after deletion will be " +
+                            "${remainingBreachedSize + log.config.retentionSize}.");
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset());
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                // No need to update the logStartOffset.
+                return isSegmentDeleted;
+            }
+
+            // There are two cases:
+            // 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the
+            //    leader-epoch gets bumped but the log-start-offset gets truncated back to 0.
+            // 2) To remove the unreferenced segments.
+            private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x ->
+                        x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment ${} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
+                                    RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
+                                    RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+            public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata,
+                                                               boolean checkTimestampRetention,
+                                                               long cleanupTs,
+                                                               long retentionMs)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> checkTimestampRetention && x.maxTimestampMs() <= cleanupTs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to retention time " +
+                            "{}ms breach based on the largest record timestamp in the segment", metadata.remoteLogSegmentId(), retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled())
+                return;
+
+            // Cleanup remote log segments and update the log start offset if applicable.
+            final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+            if (!segmentMetadataIter.hasNext()) {
+                return;
+            }
+
+            final Optional<UnifiedLog> logOptional = fetchLog.apply(topicIdPartition.topicPartition());
+            if (!logOptional.isPresent()) {
+                return;
+            }
+
+            final UnifiedLog log = logOptional.get();
+            final Option<LeaderEpochFileCache> leaderEpochCacheOption = log.leaderEpochCache();
+            if (leaderEpochCacheOption.isEmpty()) {
+                return;
+            }
+
+            final long retentionMs = log.config().retentionMs;
+
+            // Iterate once
+            //  - to build the log size of segments with base-offset < local-log-start-offset
+            //  - to collect all the epochs of remote log segments
+            // These values can be cached and updated in RLMTask for this topic partition without computing in each
+            // iteration. But the logic can become little complex and need to cover different scenarios to avoid any
+            // leaks. We can have a followup to improve it by maintaining these values through both copying and deletion.
+            final Set<Integer> epochsSet = new HashSet<>();
+            long totalSizeEarlierToLocalLogStartOffset = 0L;
+            while (segmentMetadataIter.hasNext()) {
+                RemoteLogSegmentMetadata segmentMetadata = segmentMetadataIter.next();
+                epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
+
+                if (segmentMetadata.endOffset() < log.localLogStartOffset()) {
+                    totalSizeEarlierToLocalLogStartOffset += segmentMetadata.segmentSizeInBytes();
+                }
+            }
+
+            // This is the total size of segments in local log that have their base-offset > local-log-start-offset
+            // and size of the segments in remote storage which have their end-offset < local-log-start-offset.
+            long totalSize = log.validLocalLogSegmentsSize() + totalSizeEarlierToLocalLogStartOffset;
+
+            // All the leader epochs in sorted order that exists in remote storage
+            List<Integer> remoteLeaderEpochs = new ArrayList<>(epochsSet);
+            Collections.sort(remoteLeaderEpochs);
+
+            long remainingBreachedSize = totalSize - log.config().retentionSize;
+
+            RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(remainingBreachedSize);
+
+            boolean checkTimestampRetention = retentionMs > -1;
+            long cleanupTs = time.milliseconds() - retentionMs;
+            boolean checkSizeRetention = log.config().retentionSize > -1;
+            LeaderEpochFileCache leaderEpochCache = leaderEpochCacheOption.get();
+            Iterator<EpochEntry> epochEntryIterator = leaderEpochCache.epochEntries().iterator();
+            boolean isSegmentDeleted = true;
+            while (isSegmentDeleted && epochEntryIterator.hasNext()) {
+                EpochEntry epochEntry = epochEntryIterator.next();
+                Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epochEntry.epoch);

Review Comment:
   Are we iterating the same remote segment multiple times since a segment could have multiple epochs?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -154,16 +154,41 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   locally {
     initializePartitionMetadata()
     updateLogStartOffset(logStartOffset)
+    updateLocalLogStartOffset(math.max(logStartOffset, localLog.segments.firstSegmentBaseOffset.getOrElse(0L)))
+    if (!remoteLogEnabled())
+      logStartOffset = localLogStartOffset()
     maybeIncrementFirstUnstableOffset()
     initializeTopicId()
 
     logOffsetsListener.onHighWatermarkUpdated(highWatermarkMetadata.messageOffset)
+
+    info(s"Completed load of log with ${localLog.segments.numberOfSegments} segments, local log start offset ${localLogStartOffset()} and " +
+      s"log end offset $logEndOffset")
   }
 
   def setLogOffsetsListener(listener: LogOffsetsListener): Unit = {
     logOffsetsListener = listener
   }
 
+  private def updateLocalLogStartOffset(offset: Long): Unit = {
+    _localLogStartOffset = offset
+
+    if (highWatermark < offset) {
+      updateHighWatermark(offset)
+    }
+
+    if (this.recoveryPoint < offset) {
+      localLog.updateRecoveryPoint(offset)
+    }
+  }
+
+  def updateLogStartOffsetFromRemoteTier(remoteLogStartOffset: Long): Unit = {
+    if (!remoteLogEnabled()) {
+      warn("Ignoring the call as the remote log storage is disabled")

Review Comment:
    Should we return here to actually ignore?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -618,6 +629,193 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) {
+            logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset);
+            updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset);
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private long remainingBreachedSize = 0L;
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(long remainingBreachedSize) {
+                this.remainingBreachedSize = remainingBreachedSize;
+            }
+
+            private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> {
+                    // Assumption that segments contain size > 0
+                    if (checkSizeRetention && remainingBreachedSize > 0) {
+                        remainingBreachedSize -= x.segmentSizeInBytes();
+                        return remainingBreachedSize >= 0;
+                    } else return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " +
+                            "${log.config.retentionSize} breach. Log size after deletion will be " +
+                            "${remainingBreachedSize + log.config.retentionSize}.");
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset());
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                // No need to update the logStartOffset.
+                return isSegmentDeleted;
+            }
+
+            // There are two cases:
+            // 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the
+            //    leader-epoch gets bumped but the log-start-offset gets truncated back to 0.
+            // 2) To remove the unreferenced segments.
+            private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x ->
+                        x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment ${} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
+                                    RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
+                                    RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+            public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata,
+                                                               boolean checkTimestampRetention,
+                                                               long cleanupTs,
+                                                               long retentionMs)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> checkTimestampRetention && x.maxTimestampMs() <= cleanupTs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to retention time " +
+                            "{}ms breach based on the largest record timestamp in the segment", metadata.remoteLogSegmentId(), retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled())
+                return;
+
+            // Cleanup remote log segments and update the log start offset if applicable.
+            final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+            if (!segmentMetadataIter.hasNext()) {
+                return;
+            }
+
+            final Optional<UnifiedLog> logOptional = fetchLog.apply(topicIdPartition.topicPartition());
+            if (!logOptional.isPresent()) {
+                return;
+            }
+
+            final UnifiedLog log = logOptional.get();
+            final Option<LeaderEpochFileCache> leaderEpochCacheOption = log.leaderEpochCache();
+            if (leaderEpochCacheOption.isEmpty()) {
+                return;
+            }
+
+            final long retentionMs = log.config().retentionMs;
+
+            // Iterate once
+            //  - to build the log size of segments with base-offset < local-log-start-offset
+            //  - to collect all the epochs of remote log segments
+            // These values can be cached and updated in RLMTask for this topic partition without computing in each
+            // iteration. But the logic can become little complex and need to cover different scenarios to avoid any
+            // leaks. We can have a followup to improve it by maintaining these values through both copying and deletion.
+            final Set<Integer> epochsSet = new HashSet<>();
+            long totalSizeEarlierToLocalLogStartOffset = 0L;
+            while (segmentMetadataIter.hasNext()) {
+                RemoteLogSegmentMetadata segmentMetadata = segmentMetadataIter.next();
+                epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
+
+                if (segmentMetadata.endOffset() < log.localLogStartOffset()) {
+                    totalSizeEarlierToLocalLogStartOffset += segmentMetadata.segmentSizeInBytes();
+                }
+            }
+
+            // This is the total size of segments in local log that have their base-offset > local-log-start-offset
+            // and size of the segments in remote storage which have their end-offset < local-log-start-offset.
+            long totalSize = log.validLocalLogSegmentsSize() + totalSizeEarlierToLocalLogStartOffset;

Review Comment:
   What about overlapping segments between remote and local? Do we double count them?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -154,16 +154,41 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   locally {
     initializePartitionMetadata()
     updateLogStartOffset(logStartOffset)
+    updateLocalLogStartOffset(math.max(logStartOffset, localLog.segments.firstSegmentBaseOffset.getOrElse(0L)))
+    if (!remoteLogEnabled())
+      logStartOffset = localLogStartOffset()

Review Comment:
   Hmm, logStartOffset could be larger than base offset of the first local segment. So, it seems that we can't just switch to the base offset of the first local segment if remote log is not enabled.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -618,6 +629,193 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) {
+            logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset);
+            updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset);
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private long remainingBreachedSize = 0L;
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(long remainingBreachedSize) {
+                this.remainingBreachedSize = remainingBreachedSize;
+            }
+
+            private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> {
+                    // Assumption that segments contain size > 0
+                    if (checkSizeRetention && remainingBreachedSize > 0) {
+                        remainingBreachedSize -= x.segmentSizeInBytes();
+                        return remainingBreachedSize >= 0;
+                    } else return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " +

Review Comment:
    Loggering uses $ notation, which is for scala.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -942,13 +974,19 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
         localLog.checkIfMemoryMappedBufferClosed()
         if (newLogStartOffset > logStartOffset) {
-          updatedLogStartOffset = true
-          updateLogStartOffset(newLogStartOffset)
-          _localLogStartOffset = newLogStartOffset
-          info(s"Incremented log start offset to $newLogStartOffset due to $reason")
-          leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
-          producerStateManager.onLogStartOffsetIncremented(newLogStartOffset)
-          maybeIncrementFirstUnstableOffset()
+          _localLogStartOffset = math.max(newLogStartOffset, localLogStartOffset())
+          // it should always get updated  if tiered-storage is not enabled.
+          if (!onlyLocalLogStartOffsetUpdate || !remoteLogEnabled()) {
+            updatedLogStartOffset = true
+            updateLogStartOffset(newLogStartOffset)
+            _localLogStartOffset = newLogStartOffset

Review Comment:
   Yes, agreed. Not sure why we need to update _localLogStartOffset here again.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1383,20 +1421,30 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     }
   }
 
+  private[log] def localRetentionMs: Long = {
+    if (config.remoteLogConfig.remoteStorageEnable) config.remoteLogConfig.localRetentionMs else config.retentionMs
+  }
+
   private def deleteRetentionMsBreachedSegments(): Int = {
-    if (config.retentionMs < 0) return 0
+    val retentionMs = localRetentionMs

Review Comment:
   Hmm, local retention needs to be bound by last tiered offset, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org