You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by di...@apache.org on 2023/09/07 14:38:28 UTC

[kafka] branch 3.6 updated: KAFKA-15351: Update log-start-offset after leader election for topics enabled with remote storage (#14340)

This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new 5d7840e1b26 KAFKA-15351: Update log-start-offset after leader election for topics enabled with remote storage (#14340)
5d7840e1b26 is described below

commit 5d7840e1b26d1d3cfcb444fd4f14aec35a68e40a
Author: Kamal Chandraprakash <kc...@uber.com>
AuthorDate: Thu Sep 7 20:02:16 2023 +0530

    KAFKA-15351: Update log-start-offset after leader election for topics enabled with remote storage (#14340)
    
    On leadership failover, the new leader's start offset may be older than the start offset of old leader. This works fine for local storage scenario because the new leader still contains data associated with stale start offset. But in case of remote storage, although new leader has a stale offset, the data associated with it has been deleted from remote by the old leader. Hence, we end up in a situation where leader has a start offset but no data associated with it.
    
    This commit fixes the situation by ensuring that on every leadership failover, for topics with remote storage, the leader will update it's start offset from the base of first segment in current leader chain present in the remote storage (if any).
    
    Reviewers: Satish Duggana <sa...@apache.org>, Luke Chen <sh...@gmail.com>, Christo Lolov <lo...@amazon.com>, Divij Vaidya <di...@amazon.com>
---
 .../java/kafka/log/remote/RemoteLogManager.java    |  44 +++++++-
 .../kafka/log/remote/RemoteLogManagerTest.java     | 113 ++++++++++++++++++++-
 .../integration/BaseReassignReplicaTest.java       |  14 +--
 3 files changed, 158 insertions(+), 13 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index f4b20014b49..9b70cfbe568 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -584,8 +584,10 @@ public class RemoteLogManager implements Closeable {
             return leaderEpoch >= 0;
         }
 
-        // The copiedOffsetOption is OptionalLong.empty() initially for a new leader RLMTask, and needs to be fetched inside the task's run() method.
+        // The copied and log-start offset is empty initially for a new leader RLMTask, and needs to be fetched inside
+        // the task's run() method.
         private volatile OptionalLong copiedOffsetOption = OptionalLong.empty();
+        private volatile boolean isLogStartOffsetUpdatedOnBecomingLeader = false;
 
         public void convertToLeader(int leaderEpochVal) {
             if (leaderEpochVal < 0) {
@@ -594,22 +596,33 @@ public class RemoteLogManager implements Closeable {
             if (this.leaderEpoch != leaderEpochVal) {
                 leaderEpoch = leaderEpochVal;
             }
-            // Reset readOffset, so that it is set in next run of RLMTask
+            // Reset copied and log-start offset, so that it is set in next run of RLMTask
             copiedOffsetOption = OptionalLong.empty();
+            isLogStartOffsetUpdatedOnBecomingLeader = false;
         }
 
         public void convertToFollower() {
             leaderEpoch = -1;
         }
 
-        private void maybeUpdateReadOffset(UnifiedLog log) throws RemoteStorageException {
+        private void maybeUpdateLogStartOffsetOnBecomingLeader(UnifiedLog log) throws RemoteStorageException {
+            if (!isLogStartOffsetUpdatedOnBecomingLeader) {
+                long logStartOffset = findLogStartOffset(topicIdPartition, log);
+                updateRemoteLogStartOffset.accept(topicIdPartition.topicPartition(), logStartOffset);
+                isLogStartOffsetUpdatedOnBecomingLeader = true;
+                logger.info("Found the logStartOffset: {} for partition: {} after becoming leader, leaderEpoch: {}",
+                        logStartOffset, topicIdPartition, leaderEpoch);
+            }
+        }
+
+        private void maybeUpdateCopiedOffset(UnifiedLog log) throws RemoteStorageException {
             if (!copiedOffsetOption.isPresent()) {
                 // This is found by traversing from the latest leader epoch from leader epoch history and find the highest offset
                 // of a segment with that epoch copied into remote storage. If it can not find an entry then it checks for the
                 // previous leader epoch till it finds an entry, If there are no entries till the earliest leader epoch in leader
                 // epoch cache then it starts copying the segments from the earliest epoch entry's offset.
                 copiedOffsetOption = OptionalLong.of(findHighestRemoteOffset(topicIdPartition, log));
-                logger.info("Found the highest copied remote offset: {} for partition: {} after becoming leader, " +
+                logger.info("Found the highest copiedRemoteOffset: {} for partition: {} after becoming leader, " +
                                 "leaderEpoch: {}", copiedOffsetOption, topicIdPartition, leaderEpoch);
             }
         }
@@ -645,7 +658,8 @@ public class RemoteLogManager implements Closeable {
                 return;
 
             try {
-                maybeUpdateReadOffset(log);
+                maybeUpdateLogStartOffsetOnBecomingLeader(log);
+                maybeUpdateCopiedOffset(log);
                 long copiedOffset = copiedOffsetOption.getAsLong();
 
                 // LSO indicates the offset below are ready to be consumed (high-watermark or committed)
@@ -1369,6 +1383,26 @@ public class RemoteLogManager implements Closeable {
         return offset.orElse(-1L);
     }
 
+    long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) throws RemoteStorageException {
+        Optional<Long> logStartOffset = Optional.empty();
+        Option<LeaderEpochFileCache> maybeLeaderEpochFileCache = log.leaderEpochCache();
+        if (maybeLeaderEpochFileCache.isDefined()) {
+            LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
+            OptionalInt earliestEpochOpt = cache.earliestEntry()
+                    .map(epochEntry -> OptionalInt.of(epochEntry.epoch))
+                    .orElseGet(OptionalInt::empty);
+            while (!logStartOffset.isPresent() && earliestEpochOpt.isPresent()) {
+                Iterator<RemoteLogSegmentMetadata> iterator =
+                        remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, earliestEpochOpt.getAsInt());
+                if (iterator.hasNext()) {
+                    logStartOffset = Optional.of(iterator.next().startOffset());
+                }
+                earliestEpochOpt = cache.nextEpoch(earliestEpochOpt.getAsInt());
+            }
+        }
+        return logStartOffset.orElseGet(log::localLogStartOffset);
+    }
+
     /**
      * Submit a remote log read task.
      * This method returns immediately. The read operation is executed in a thread pool.
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 682b58b9725..92a6c63537c 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -93,6 +93,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -198,6 +199,10 @@ public class RemoteLogManagerTest {
             public RemoteLogMetadataManager createRemoteLogMetadataManager() {
                 return remoteLogMetadataManager;
             }
+            @Override
+            long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) {
+                return 0L;
+            }
         };
     }
 
@@ -315,8 +320,6 @@ public class RemoteLogManagerTest {
         }
     }
 
-
-
     @Test
     void testStartup() {
         remoteLogManager.startup();
@@ -1399,6 +1402,112 @@ public class RemoteLogManagerTest {
         verify(remoteLogMetadataManager, times(16)).updateRemoteLogSegmentMetadata(any());
     }
 
+    /**
+     * This test asserts that the newly elected leader for a partition is able to find the log-start-offset.
+     * Note that the case tested here is that the previous leader deleted the log segments up-to offset 500. And, the
+     * log-start-offset didn't propagate to the replicas before the leader-election.
+     */
+    @Test
+    public void testFindLogStartOffset() throws RemoteStorageException, IOException {
+        List<EpochEntry> epochEntries = new ArrayList<>();
+        epochEntries.add(new EpochEntry(0, 0L));
+        epochEntries.add(new EpochEntry(1, 250L));
+        epochEntries.add(new EpochEntry(2, 550L));
+        checkpoint.write(epochEntries);
+
+        LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+        long timestamp = time.milliseconds();
+        int segmentSize = 1024;
+        List<RemoteLogSegmentMetadata> segmentMetadataList = Arrays.asList(
+                new RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
+                        500, 539, timestamp, brokerId, timestamp, segmentSize, truncateAndGetLeaderEpochs(epochEntries, 500L, 539L)),
+                new RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
+                        540, 700, timestamp, brokerId, timestamp, segmentSize, truncateAndGetLeaderEpochs(epochEntries, 540L, 700L))
+                );
+        when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition), anyInt()))
+                .thenAnswer(invocation -> {
+                    int epoch = invocation.getArgument(1);
+                    if (epoch == 1)
+                        return segmentMetadataList.iterator();
+                    else
+                        return Collections.emptyIterator();
+                });
+        try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
+                tp -> Optional.of(mockLog),
+                (topicPartition, offset) -> { },
+                brokerTopicStats) {
+            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+                return remoteLogMetadataManager;
+            }
+        }) {
+            assertEquals(500L, remoteLogManager.findLogStartOffset(leaderTopicIdPartition, mockLog));
+        }
+    }
+
+    @Test
+    public void testFindLogStartOffsetFallbackToLocalLogStartOffsetWhenRemoteIsEmpty() throws RemoteStorageException, IOException {
+        List<EpochEntry> epochEntries = new ArrayList<>();
+        epochEntries.add(new EpochEntry(1, 250L));
+        epochEntries.add(new EpochEntry(2, 550L));
+        checkpoint.write(epochEntries);
+
+        LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        when(mockLog.localLogStartOffset()).thenReturn(250L);
+        when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition), anyInt()))
+                .thenReturn(Collections.emptyIterator());
+
+        try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
+                tp -> Optional.of(mockLog),
+                (topicPartition, offset) -> { },
+                brokerTopicStats) {
+            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+                return remoteLogMetadataManager;
+            }
+        }) {
+            assertEquals(250L, remoteLogManager.findLogStartOffset(leaderTopicIdPartition, mockLog));
+        }
+    }
+
+    @Test
+    public void testLogStartOffsetUpdatedOnStartup() throws RemoteStorageException, IOException, InterruptedException {
+        List<EpochEntry> epochEntries = new ArrayList<>();
+        epochEntries.add(new EpochEntry(1, 250L));
+        epochEntries.add(new EpochEntry(2, 550L));
+        checkpoint.write(epochEntries);
+
+        LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+        RemoteLogSegmentMetadata metadata = mock(RemoteLogSegmentMetadata.class);
+        when(metadata.startOffset()).thenReturn(600L);
+        when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition), anyInt()))
+                .thenAnswer(invocation -> {
+                    int epoch = invocation.getArgument(1);
+                    if (epoch == 2)
+                        return Collections.singletonList(metadata).iterator();
+                    else
+                        return Collections.emptyIterator();
+                });
+
+        AtomicLong logStartOffset = new AtomicLong(0);
+        try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
+                tp -> Optional.of(mockLog),
+                (topicPartition, offset) ->  logStartOffset.set(offset),
+                brokerTopicStats) {
+            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+                return remoteLogMetadataManager;
+            }
+        }) {
+            RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
+            task.convertToLeader(4);
+            task.copyLogSegmentsToRemote(mockLog);
+            assertEquals(600L, logStartOffset.get());
+        }
+    }
+
     private List<RemoteLogSegmentMetadata> listRemoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
                                                                         int segmentCount,
                                                                         int recordsPerSegment,
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseReassignReplicaTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseReassignReplicaTest.java
index afac6513036..d0ccae89aab 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseReassignReplicaTest.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseReassignReplicaTest.java
@@ -55,8 +55,8 @@ public abstract class BaseReassignReplicaTest extends TieredStorageTestHarness {
         final String topicA = "topicA";
         final String topicB = "topicB";
         final Integer p0 = 0;
-        final Integer partitionCount = 5;
-        final Integer replicationFactor = 2;
+        final Integer partitionCount = 1;
+        final Integer replicationFactor = 1;
         final Integer maxBatchCountPerSegment = 1;
         final Map<Integer, List<Integer>> replicaAssignment = null;
         final boolean enableRemoteLogStorage = true;
@@ -66,13 +66,15 @@ public abstract class BaseReassignReplicaTest extends TieredStorageTestHarness {
         }
 
         builder
-                // create topicA with 5 partitions, 2 RF and ensure that the user-topic-partitions are mapped to
-                // metadata partitions
-                .createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment,
+                // create topicA with 50 partitions and 2 RF. Using 50 partitions to ensure that the user-partitions
+                // are mapped to all the __remote_log_metadata partitions. This is required to ensure that
+                // TBRLMM able to handle the assignment of the newly created replica to one of the already assigned
+                // metadata partition
+                .createTopic(topicA, 50, 2, maxBatchCountPerSegment,
                         replicaAssignment, enableRemoteLogStorage)
                 .expectUserTopicMappedToMetadataPartitions(topicA, metadataPartitions)
                 // create topicB with 1 partition and 1 RF
-                .createTopic(topicB, 1, 1, maxBatchCountPerSegment,
+                .createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment,
                         mkMap(mkEntry(p0, Collections.singletonList(broker0))), enableRemoteLogStorage)
                 // send records to partition 0
                 .expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new KeyValueSpec("k0", "v0"))