You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Hangleton (via GitHub)" <gi...@apache.org> on 2023/04/14 17:01:39 UTC

[GitHub] [kafka] Hangleton commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

Hangleton commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1167074461


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -595,6 +609,193 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) {
+            logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset);
+            updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset);
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private long remainingBreachedSize = 0L;
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(long remainingBreachedSize) {
+                this.remainingBreachedSize = remainingBreachedSize;
+            }
+
+            private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> {
+                    // Assumption that segments contain size > 0
+                    if (checkSizeRetention && remainingBreachedSize > 0) {
+                        remainingBreachedSize -= x.segmentSizeInBytes();
+                        return remainingBreachedSize >= 0;
+                    } else return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " +
+                            "${log.config.retentionSize} breach. Log size after deletion will be " +
+                            "${remainingBreachedSize + log.config.retentionSize}.");
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset());
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                // No need to update the logStartOffset.
+                return isSegmentDeleted;
+            }
+
+            // There are two cases:
+            // 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the
+            //    leader-epoch gets bumped but the log-start-offset gets truncated back to 0.
+            // 2) To remove the unreferenced segments.
+            private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x ->
+                        x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment ${} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
+                                    RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
+                                    RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+            public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata,
+                                                               boolean checkTimestampRetention,
+                                                               long cleanupTs,
+                                                               long retentionMs)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> checkTimestampRetention && x.maxTimestampMs() <= cleanupTs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to retention time " +
+                            "{}ms breach based on the largest record timestamp in the segment", metadata.remoteLogSegmentId(), retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled())
+                return;
+
+            // Cleanup remote log segments and update the log start offset if applicable.
+            final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+            if (!segmentMetadataIter.hasNext()) {
+                return;
+            }
+
+            final Optional<UnifiedLog> logOptional = fetchLog.apply(topicIdPartition.topicPartition());
+            if (!logOptional.isPresent()) {
+                return;
+            }
+
+            final UnifiedLog log = logOptional.get();
+            final Option<LeaderEpochFileCache> leaderEpochCacheOption = log.leaderEpochCache();
+            if (leaderEpochCacheOption.isEmpty()) {
+                return;
+            }
+
+            final long retentionMs = log.config().retentionMs;
+
+            // Iterate once
+            //  - to build the log size of segments with base-offset < local-log-start-offset
+            //  - to collect all the epochs of remote log segments
+            // These values can be cached and updated in RLMTask for this topic partition without computing in each
+            // iteration. But the logic can become little complex and need to cover different scenarios to avoid any
+            // leaks. We can have a followup to improve it by maintaining these values through both copying and deletion.
+            final Set<Integer> epochsSet = new HashSet<>();
+            long totalSizeEarlierToLocalLogStartOffset = 0L;
+            while (segmentMetadataIter.hasNext()) {
+                RemoteLogSegmentMetadata segmentMetadata = segmentMetadataIter.next();
+                epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
+
+                if (segmentMetadata.endOffset() < log.localLogStartOffset()) {
+                    totalSizeEarlierToLocalLogStartOffset += segmentMetadata.segmentSizeInBytes();
+                }
+            }
+
+            // This is the total size of segments in local log that have their base-offset > local-log-start-offset
+            // and size of the segments in remote storage which have their end-offset < local-log-start-offset.
+            long totalSize = log.validLocalLogSegmentsSize() + totalSizeEarlierToLocalLogStartOffset;
+
+            // All the leader epochs in sorted order that exists in remote storage
+            List<Integer> remoteLeaderEpochs = new ArrayList<>(epochsSet);
+            Collections.sort(remoteLeaderEpochs);
+
+            long remainingBreachedSize = totalSize - log.config().retentionSize;
+
+            RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(remainingBreachedSize);
+
+            boolean checkTimestampRetention = retentionMs > -1;
+            long cleanupTs = time.milliseconds() - retentionMs;
+            boolean checkSizeRetention = log.config().retentionSize > -1;
+            LeaderEpochFileCache leaderEpochCache = leaderEpochCacheOption.get();
+            Iterator<EpochEntry> epochEntryIterator = leaderEpochCache.epochEntries().iterator();
+            boolean isSegmentDeleted = true;
+            while (isSegmentDeleted && epochEntryIterator.hasNext()) {
+                EpochEntry epochEntry = epochEntryIterator.next();
+                Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epochEntry.epoch);
+                while (isSegmentDeleted && segmentsIterator.hasNext()) {
+                    RemoteLogSegmentMetadata metadata = segmentsIterator.next();
+                    isSegmentDeleted =
+                            remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata, checkTimestampRetention, cleanupTs, retentionMs) ||
+                                    remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata, checkSizeRetention) ||
+                                    remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata, log.logStartOffset());
+                }
+            }
+
+            // Remove the remote log segments whose segment-leader-epochs are lesser than the earliest-epoch known
+            // to the leader. This will remove the unreferenced segments in the remote storage.
+            if (leaderEpochCache.earliestEntry().isPresent()) {

Review Comment:
   Hmm, I don't think it is enough to evict the data for epochs < than the current leader's smallest epoch. With unclean leader election, it is possible to have divergence in-between a log prefix and suffix shared by two replicas.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -595,6 +609,193 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) {
+            logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset);
+            updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset);
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private long remainingBreachedSize = 0L;
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(long remainingBreachedSize) {
+                this.remainingBreachedSize = remainingBreachedSize;
+            }
+
+            private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> {
+                    // Assumption that segments contain size > 0
+                    if (checkSizeRetention && remainingBreachedSize > 0) {
+                        remainingBreachedSize -= x.segmentSizeInBytes();
+                        return remainingBreachedSize >= 0;
+                    } else return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " +
+                            "${log.config.retentionSize} breach. Log size after deletion will be " +
+                            "${remainingBreachedSize + log.config.retentionSize}.");
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset());
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                // No need to update the logStartOffset.
+                return isSegmentDeleted;
+            }
+
+            // There are two cases:
+            // 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the
+            //    leader-epoch gets bumped but the log-start-offset gets truncated back to 0.
+            // 2) To remove the unreferenced segments.
+            private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x ->
+                        x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment ${} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
+                                    RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
+                                    RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+            public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata,
+                                                               boolean checkTimestampRetention,
+                                                               long cleanupTs,
+                                                               long retentionMs)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> checkTimestampRetention && x.maxTimestampMs() <= cleanupTs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to retention time " +
+                            "{}ms breach based on the largest record timestamp in the segment", metadata.remoteLogSegmentId(), retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled())
+                return;
+
+            // Cleanup remote log segments and update the log start offset if applicable.
+            final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);

Review Comment:
   Hmm, it seems we are iterating over all remote segment metadata every time the expiration task is executed. This could become costly if the RLMM implementation does not cache the said metadata. That could be an explicit implementation constraint for plugin providers. Maybe we could also add a small layer a memoization here to avoid traversing the log metadata every time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org