You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sa...@apache.org on 2023/09/12 04:42:41 UTC

[kafka] branch trunk updated: KAFKA-15352: Update log-start-offset before initiating deletion of remote segments (#14349)

This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7483991a012 KAFKA-15352: Update log-start-offset before initiating deletion of remote segments (#14349)
7483991a012 is described below

commit 7483991a0122c1c9493464efbcda1c8a93c74fd6
Author: Christo Lolov <lo...@amazon.com>
AuthorDate: Tue Sep 12 05:42:30 2023 +0100

    KAFKA-15352: Update log-start-offset before initiating deletion of remote segments (#14349)
    
    This change is about the current leader updating the log-start-offset before the segments are deleted from remote storage. This will do a best-effort mechanism for followers to receive log-start-offset from the leader and they can update their log-start-offset before it becomes a leader.
    
    Reviewers: Kamal Chandraprakash<ka...@gmail.com>, Divij Vaidya <di...@amazon.com>, Luke Chen <sh...@gmail.com>, Satish Duggana <sa...@apache.org>
---
 .../java/kafka/log/remote/RemoteLogManager.java    |  94 ++++++----
 .../kafka/log/remote/RemoteLogManagerTest.java     | 197 ++++++++++++++++++++-
 2 files changed, 251 insertions(+), 40 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 4a35abf6a11..d8f2144b3e3 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -832,70 +832,64 @@ public class RemoteLogManager implements Closeable {
                 remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L);
             }
 
-            private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException {
+            private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) {
                 if (!retentionSizeData.isPresent()) {
                     return false;
                 }
 
-                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, ignored -> {
-                    // Assumption that segments contain size >= 0
-                    if (remainingBreachedSize > 0) {
-                        long remainingBytes = remainingBreachedSize - metadata.segmentSizeInBytes();
-                        if (remainingBytes >= 0) {
-                            remainingBreachedSize = remainingBytes;
-                            return true;
-                        }
+                boolean shouldDeleteSegment = false;
+
+                // Assumption that segments contain size >= 0
+                if (remainingBreachedSize > 0) {
+                    long remainingBytes = remainingBreachedSize - metadata.segmentSizeInBytes();
+                    if (remainingBytes >= 0) {
+                        remainingBreachedSize = remainingBytes;
+                        shouldDeleteSegment = true;
                     }
+                }
 
-                    return false;
-                });
-                if (isSegmentDeleted) {
+                if (shouldDeleteSegment) {
                     logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
-                    logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.",
+                    logger.info("About to delete remote log segment {} due to retention size {} breach. Log size after deletion will be {}.",
                             metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize);
                 }
-                return isSegmentDeleted;
+                return shouldDeleteSegment;
             }
 
-            public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
-                    throws RemoteStorageException, ExecutionException, InterruptedException {
+            public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) {
                 if (!retentionTimeData.isPresent()) {
                     return false;
                 }
 
-                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
-                        ignored -> metadata.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs);
-                if (isSegmentDeleted) {
+                boolean shouldDeleteSegment = metadata.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs;
+                if (shouldDeleteSegment) {
                     remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
                     // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals
                     // are ascending with in an epoch.
                     logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
-                    logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment",
+                    logger.info("About to delete remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment",
                             metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs);
                 }
-                return isSegmentDeleted;
+                return shouldDeleteSegment;
             }
 
             private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata,
                                                                  long logStartOffset,
-                                                                 NavigableMap<Integer, Long> leaderEpochEntries)
-                    throws RemoteStorageException, ExecutionException, InterruptedException {
-                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, ignored -> {
-                    if (!leaderEpochEntries.isEmpty()) {
-                        // Note that `logStartOffset` and `leaderEpochEntries.firstEntry().getValue()` should be same
-                        Integer firstEpoch = leaderEpochEntries.firstKey();
-                        return metadata.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch <= firstEpoch)
-                                && metadata.endOffset() < logStartOffset;
-                    }
-                    return false;
-                });
-                if (isSegmentDeleted) {
-                    logger.info("Deleted remote log segment {} due to log-start-offset {} breach. " +
+                                                                 NavigableMap<Integer, Long> leaderEpochEntries) {
+                boolean shouldDeleteSegment = false;
+                if (!leaderEpochEntries.isEmpty()) {
+                    // Note that `logStartOffset` and `leaderEpochEntries.firstEntry().getValue()` should be same
+                    Integer firstEpoch = leaderEpochEntries.firstKey();
+                    shouldDeleteSegment = metadata.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch <= firstEpoch)
+                            && metadata.endOffset() < logStartOffset;
+                }
+                if (shouldDeleteSegment) {
+                    logger.info("About to delete remote log segment {} due to log-start-offset {} breach. " +
                             "Current earliest-epoch-entry: {}, segment-end-offset: {} and segment-epochs: {}",
                             metadata.remoteLogSegmentId(), logStartOffset, leaderEpochEntries.firstEntry(),
                             metadata.endOffset(), metadata.segmentLeaderEpochs());
                 }
-                return isSegmentDeleted;
+                return shouldDeleteSegment;
             }
 
             // It removes the segments beyond the current leader's earliest epoch. Those segments are considered as
@@ -989,6 +983,7 @@ public class RemoteLogManager implements Closeable {
             RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
             Iterator<Integer> epochIterator = epochWithOffsets.navigableKeySet().iterator();
             boolean canProcess = true;
+            List<RemoteLogSegmentMetadata> segmentsToDelete = new ArrayList<>();
             while (canProcess && epochIterator.hasNext()) {
                 Integer epoch = epochIterator.next();
                 Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
@@ -1004,19 +999,22 @@ public class RemoteLogManager implements Closeable {
                     // remote log segments won't be removed. The `isRemoteSegmentWithinLeaderEpoch` validates whether
                     // the epochs present in the segment lies in the checkpoint file. It will always return false
                     // since the checkpoint file was already truncated.
-                    boolean isSegmentDeleted = remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(
+                    boolean shouldDeleteSegment = remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(
                             metadata, logStartOffset, epochWithOffsets);
                     boolean isValidSegment = false;
-                    if (!isSegmentDeleted) {
+                    if (!shouldDeleteSegment) {
                         // check whether the segment contains the required epoch range with in the current leader epoch lineage.
                         isValidSegment = isRemoteSegmentWithinLeaderEpochs(metadata, logEndOffset, epochWithOffsets);
                         if (isValidSegment) {
-                            isSegmentDeleted =
+                            shouldDeleteSegment =
                                     remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) ||
                                             remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata);
                         }
                     }
-                    canProcess = isSegmentDeleted || !isValidSegment;
+                    if (shouldDeleteSegment) {
+                        segmentsToDelete.add(metadata);
+                    }
+                    canProcess = shouldDeleteSegment || !isValidSegment;
                 }
             }
 
@@ -1045,6 +1043,24 @@ public class RemoteLogManager implements Closeable {
 
             // Update log start offset with the computed value after retention cleanup is done
             remoteLogRetentionHandler.logStartOffset.ifPresent(offset -> handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset));
+
+            // At this point in time we have updated the log start offsets, but not initiated a deletion.
+            // Either a follower has picked up the changes to the log start offset, or they have not.
+            // If the follower HAS picked up the changes, and they become the leader this replica won't successfully complete
+            // the deletion.
+            // However, the new leader will correctly pick up all breaching segments as log start offset breaching ones
+            // and delete them accordingly.
+            // If the follower HAS NOT picked up the changes, and they become the leader then they will go through this process
+            // again and delete them with the original deletion reason i.e. size, time or log start offset breach.
+            List<String> undeletedSegments = new ArrayList<>();
+            for (RemoteLogSegmentMetadata segmentMetadata : segmentsToDelete) {
+                if (!remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x -> !isCancelled() && isLeader())) {
+                    undeletedSegments.add(segmentMetadata.remoteLogSegmentId().toString());
+                }
+            }
+            if (!undeletedSegments.isEmpty()) {
+                logger.info("The following remote segments could not be deleted: {}", String.join(",", undeletedSegments));
+            }
         }
 
         private Optional<RetentionTimeData> buildRetentionTimeData(long retentionMs) {
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 92a6c63537c..bb66994b273 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -59,6 +59,7 @@ import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
 import org.apache.kafka.storage.internals.log.EpochEntry;
 import org.apache.kafka.storage.internals.log.LazyIndex;
+import org.apache.kafka.storage.internals.log.LogConfig;
 import org.apache.kafka.storage.internals.log.OffsetIndex;
 import org.apache.kafka.storage.internals.log.ProducerStateManager;
 import org.apache.kafka.storage.internals.log.TimeIndex;
@@ -1508,16 +1509,210 @@ public class RemoteLogManagerTest {
         }
     }
 
+    @Test
+    public void testDeleteRetentionSizeBreachingSegments() throws RemoteStorageException, IOException {
+        AtomicLong logStartOffset = new AtomicLong(0);
+        try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
+                tp -> Optional.of(mockLog),
+                (topicPartition, offset) ->  logStartOffset.set(offset),
+                brokerTopicStats) {
+            public RemoteStorageManager createRemoteStorageManager() {
+                return remoteStorageManager;
+            }
+            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+                return remoteLogMetadataManager;
+            }
+        }) {
+            RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
+            task.convertToLeader(0);
+
+            when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+            when(mockLog.logEndOffset()).thenReturn(200L);
+
+            List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
+
+            List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, epochEntries);
+
+            when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+                    .thenReturn(remoteLogSegmentMetadatas.iterator());
+            when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+                    .thenReturn(remoteLogSegmentMetadatas.iterator())
+                    .thenReturn(remoteLogSegmentMetadatas.iterator())
+                    .thenReturn(remoteLogSegmentMetadatas.iterator());
+
+            checkpoint.write(epochEntries);
+            LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+            when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+            Map<String, Long> logProps = new HashMap<>();
+            logProps.put("retention.bytes", 0L);
+            logProps.put("retention.ms", -1L);
+            LogConfig mockLogConfig = new LogConfig(logProps);
+            when(mockLog.config()).thenReturn(mockLogConfig);
+
+            when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+                    .thenAnswer(answer -> CompletableFuture.runAsync(() -> { }));
+
+            task.run();
+
+            assertEquals(200L, logStartOffset.get());
+            verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0));
+            verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));
+        }
+    }
+
+    @Test
+    public void testDeleteRetentionMsBreachingSegments() throws RemoteStorageException, IOException {
+        AtomicLong logStartOffset = new AtomicLong(0);
+        try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
+                tp -> Optional.of(mockLog),
+                (topicPartition, offset) ->  logStartOffset.set(offset),
+                brokerTopicStats) {
+            public RemoteStorageManager createRemoteStorageManager() {
+                return remoteStorageManager;
+            }
+            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+                return remoteLogMetadataManager;
+            }
+        }) {
+            RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
+            task.convertToLeader(0);
+
+            when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+            when(mockLog.logEndOffset()).thenReturn(200L);
+
+            List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
+
+            List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, epochEntries);
+
+            when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+                    .thenReturn(remoteLogSegmentMetadatas.iterator());
+            when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+                    .thenReturn(remoteLogSegmentMetadatas.iterator())
+                    .thenReturn(remoteLogSegmentMetadatas.iterator());
+
+            checkpoint.write(epochEntries);
+            LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+            when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+            Map<String, Long> logProps = new HashMap<>();
+            logProps.put("retention.bytes", -1L);
+            logProps.put("retention.ms", 0L);
+            LogConfig mockLogConfig = new LogConfig(logProps);
+            when(mockLog.config()).thenReturn(mockLogConfig);
+
+            when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+                    .thenAnswer(answer -> CompletableFuture.runAsync(() -> { }));
+
+            task.run();
+
+            assertEquals(200L, logStartOffset.get());
+            verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0));
+            verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));
+        }
+    }
+
+    @Test
+    public void testDeleteRetentionMsBeingCancelledBeforeSecondDelete() throws RemoteStorageException, IOException {
+        AtomicLong logStartOffset = new AtomicLong(0);
+        try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
+                tp -> Optional.of(mockLog),
+                (topicPartition, offset) ->  logStartOffset.set(offset),
+                brokerTopicStats) {
+            public RemoteStorageManager createRemoteStorageManager() {
+                return remoteStorageManager;
+            }
+            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+                return remoteLogMetadataManager;
+            }
+        }) {
+            RemoteLogManager.RLMTask leaderTask = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
+            leaderTask.convertToLeader(0);
+
+            when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+            when(mockLog.logEndOffset()).thenReturn(200L);
+
+            List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
+
+            List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, epochEntries);
+
+            when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+                    .thenReturn(remoteLogSegmentMetadatas.iterator());
+            when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+                    .thenReturn(remoteLogSegmentMetadatas.iterator())
+                    .thenReturn(remoteLogSegmentMetadatas.iterator());
+
+            checkpoint.write(epochEntries);
+            LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+            when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+            Map<String, Long> logProps = new HashMap<>();
+            logProps.put("retention.bytes", -1L);
+            logProps.put("retention.ms", 0L);
+            LogConfig mockLogConfig = new LogConfig(logProps);
+            when(mockLog.config()).thenReturn(mockLogConfig);
+
+            when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+                    .thenAnswer(answer -> {
+                        // cancel the task so that we don't delete the second segment
+                        leaderTask.cancel();
+                        return CompletableFuture.runAsync(() -> {
+                        });
+                    });
+
+            leaderTask.run();
+
+            assertEquals(200L, logStartOffset.get());
+            verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0));
+            verify(remoteStorageManager, never()).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));
+
+            // test that the 2nd log segment will be deleted by the new leader
+            RemoteLogManager.RLMTask newLeaderTask = remoteLogManager.new RLMTask(followerTopicIdPartition, 128);
+            newLeaderTask.convertToLeader(1);
+
+            Iterator<RemoteLogSegmentMetadata> firstIterator = remoteLogSegmentMetadatas.iterator();
+            firstIterator.next();
+            Iterator<RemoteLogSegmentMetadata> secondIterator = remoteLogSegmentMetadatas.iterator();
+            secondIterator.next();
+            Iterator<RemoteLogSegmentMetadata> thirdIterator = remoteLogSegmentMetadatas.iterator();
+            thirdIterator.next();
+
+            when(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition))
+                    .thenReturn(firstIterator);
+            when(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition, 0))
+                    .thenReturn(secondIterator)
+                    .thenReturn(thirdIterator);
+
+            when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+                    .thenAnswer(answer -> CompletableFuture.runAsync(() -> { }));
+
+            newLeaderTask.run();
+
+            assertEquals(200L, logStartOffset.get());
+            verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0));
+            verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));
+        }
+    }
+
     private List<RemoteLogSegmentMetadata> listRemoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
                                                                         int segmentCount,
                                                                         int recordsPerSegment,
                                                                         int segmentSize) {
+        return listRemoteLogSegmentMetadata(topicIdPartition, segmentCount, recordsPerSegment, segmentSize, Collections.emptyList());
+    }
+
+    private List<RemoteLogSegmentMetadata> listRemoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
+                                                                        int segmentCount,
+                                                                        int recordsPerSegment,
+                                                                        int segmentSize,
+                                                                        List<EpochEntry> epochEntries) {
         List<RemoteLogSegmentMetadata> segmentMetadataList = new ArrayList<>();
         for (int idx = 0; idx < segmentCount; idx++) {
             long timestamp = time.milliseconds();
             long startOffset = (long) idx * recordsPerSegment;
             long endOffset = startOffset + recordsPerSegment - 1;
-            Map<Integer, Long> segmentLeaderEpochs = truncateAndGetLeaderEpochs(totalEpochEntries, startOffset, endOffset);
+            List<EpochEntry> localTotalEpochEntries = epochEntries.isEmpty() ? totalEpochEntries : epochEntries;
+            Map<Integer, Long> segmentLeaderEpochs = truncateAndGetLeaderEpochs(localTotalEpochEntries, startOffset, endOffset);
             segmentMetadataList.add(new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition,
                     Uuid.randomUuid()), startOffset, endOffset, timestamp, brokerId, timestamp, segmentSize,
                     segmentLeaderEpochs));