You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sa...@apache.org on 2023/07/24 14:06:35 UTC

[kafka] branch trunk updated: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache (#14004)

This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 84691b11f64 KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache (#14004)
84691b11f64 is described below

commit 84691b11f64e85a7f3f6fdbafd0f8fb2f8dd630c
Author: Kamal Chandraprakash <kc...@uber.com>
AuthorDate: Mon Jul 24 19:36:25 2023 +0530

    KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache (#14004)
    
    KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache
    
    Reviewers: Satish Duggana <sa...@apache.org>, Viktor Nikitash <ni...@pdffiller.com>, Jorge Esteban Quilcate Otoya <qu...@gmail.com>, Abhijeet Kumar <ab...@gmail.com>
---
 .../storage/RemoteLogLeaderEpochState.java         |  24 ++-
 .../storage/RemoteLogLeaderEpochStateTest.java     | 206 +++++++++++++++++++++
 2 files changed, 224 insertions(+), 6 deletions(-)

diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java
index d5787dd7985..b607da06fec 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java
@@ -100,17 +100,29 @@ class RemoteLogLeaderEpochState {
 
     void handleSegmentWithCopySegmentFinishedState(Long startOffset, RemoteLogSegmentId remoteLogSegmentId,
                                                    Long leaderEpochEndOffset) {
+        // If there are duplicate segments uploaded due to leader-election, then mark them as unreferenced.
+        // Duplicate segments can be uploaded when the previous leader had tier-lags and the next leader uploads the
+        // segment for the same leader-epoch which is a super-set of previously uploaded segments.
+        // (eg)
+        // case-1: Duplicate segment
+        //      L0 uploaded segment S0 with offsets 0-100 and L1 uploaded segment S1 with offsets 0-200.
+        //      We will mark the segment S0 as duplicate and add it to unreferencedSegmentIds.
+        // case-2: Overlapping segments
+        //      L0 uploaded segment S0 with offsets 10-90 and L1 uploaded segment S1 with offsets 5-100, S2-101-200,
+        //      and so on. When the consumer request for segment with offset 95, it should get the segment S1 and not S0.
+        Map.Entry<Long, RemoteLogSegmentId> lastEntry = offsetToId.lastEntry();
+        while (lastEntry != null && lastEntry.getKey() >= startOffset && highestLogOffset <= leaderEpochEndOffset) {
+            offsetToId.remove(lastEntry.getKey());
+            unreferencedSegmentIds.add(lastEntry.getValue());
+            lastEntry = offsetToId.lastEntry();
+        }
+
         // Add the segment epochs mapping as the segment is copied successfully.
-        RemoteLogSegmentId oldEntry = offsetToId.put(startOffset, remoteLogSegmentId);
+        offsetToId.put(startOffset, remoteLogSegmentId);
 
         // Remove the metadata from unreferenced entries as it is successfully copied and added to the offset mapping.
         unreferencedSegmentIds.remove(remoteLogSegmentId);
 
-        // Add the old entry to unreferenced entries as the mapping is removed for the old entry.
-        if (oldEntry != null) {
-            unreferencedSegmentIds.add(oldEntry);
-        }
-
         // Update the highest offset entry for this leader epoch as we added a new mapping.
         if (highestLogOffset == null || leaderEpochEndOffset > highestLogOffset) {
             highestLogOffset = leaderEpochEndOffset;
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochStateTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochStateTest.java
new file mode 100644
index 00000000000..820596499e7
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochStateTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class RemoteLogLeaderEpochStateTest {
+
+    TopicPartition tp = new TopicPartition("topic", 0);
+    TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), tp);
+    RemoteLogLeaderEpochState epochState = new RemoteLogLeaderEpochState();
+
+    @Test
+    void testListAllRemoteLogSegmentsOnEmpty() throws RemoteResourceNotFoundException {
+        assertFalse(epochState.listAllRemoteLogSegments(Collections.emptyMap()).hasNext());
+    }
+
+    @Test
+    void testListAllRemoteLogSegmentsShouldThrowErrorForUnknownSegmentId() {
+        RemoteLogSegmentId segmentId1 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+        RemoteLogSegmentId segmentId2 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+        epochState.handleSegmentWithCopySegmentFinishedState(0L, segmentId1, 10L);
+        assertThrows(RemoteResourceNotFoundException.class,
+            () -> epochState.listAllRemoteLogSegments(Collections.singletonMap(segmentId2, null)));
+    }
+
+    @Test
+    void testListAllRemoteLogSegmentsShouldReturnSortedSegments() throws RemoteResourceNotFoundException {
+        Map<RemoteLogSegmentId, RemoteLogSegmentMetadata> segmentIdToMetadataMap = new HashMap<>();
+
+        // copy started but never finished so marked as unreferenced
+        RemoteLogSegmentId segmentId1 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+        epochState.handleSegmentWithCopySegmentStartedState(segmentId1);
+        segmentIdToMetadataMap.put(segmentId1, createRemoteLogSegmentMetadata(segmentId1, 0L));
+
+        // copy finished successfully
+        RemoteLogSegmentId segmentId2 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+        epochState.handleSegmentWithCopySegmentFinishedState(5L, segmentId2, 10L);
+        segmentIdToMetadataMap.put(segmentId2, createRemoteLogSegmentMetadata(segmentId2, 5L));
+
+        // copy finished successfully, but overwritten by the next segment upload so marked as unreferenced.
+        RemoteLogSegmentId segmentId3 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+        epochState.handleSegmentWithCopySegmentFinishedState(11L, segmentId3, 100L);
+        segmentIdToMetadataMap.put(segmentId3, createRemoteLogSegmentMetadata(segmentId3, 11L));
+
+        // copy finished successfully
+        RemoteLogSegmentId segmentId4 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+        epochState.handleSegmentWithCopySegmentFinishedState(9L, segmentId4, 150L);
+        segmentIdToMetadataMap.put(segmentId4, createRemoteLogSegmentMetadata(segmentId4, 9L));
+
+        // segments should be sorted by start-offset
+        List<RemoteLogSegmentId> expectedList = Arrays.asList(segmentId1, segmentId2, segmentId4, segmentId3);
+        List<RemoteLogSegmentId> actualList = new ArrayList<>();
+        epochState.listAllRemoteLogSegments(segmentIdToMetadataMap)
+                .forEachRemaining(metadata -> actualList.add(metadata.remoteLogSegmentId()));
+        assertEquals(expectedList, actualList);
+    }
+
+    @Test
+    void handleSegmentWithCopySegmentStartedState() {
+        RemoteLogSegmentId segmentId = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+        epochState.handleSegmentWithCopySegmentStartedState(segmentId);
+        assertEquals(1, epochState.unreferencedSegmentIds().size());
+        assertTrue(epochState.unreferencedSegmentIds().contains(segmentId));
+    }
+
+    @Test
+    void handleSegmentWithCopySegmentFinishedState() {
+        RemoteLogSegmentId segmentId1 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+        RemoteLogSegmentId segmentId2 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+        epochState.handleSegmentWithCopySegmentFinishedState(10L, segmentId1, 100L);
+        epochState.handleSegmentWithCopySegmentFinishedState(101L, segmentId2, 200L);
+
+        assertEquals(2, epochState.referencedSegmentIds().size());
+        assertEquals(segmentId1, epochState.floorEntry(90L));
+        assertEquals(segmentId2, epochState.floorEntry(150L));
+        assertTrue(epochState.unreferencedSegmentIds().isEmpty());
+        assertEquals(200L, epochState.highestLogOffset());
+    }
+
+    @Test
+    void handleSegmentWithCopySegmentFinishedStateForOverlappingSegments() {
+        RemoteLogSegmentId segmentId1 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+        RemoteLogSegmentId segmentId2 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+        epochState.handleSegmentWithCopySegmentFinishedState(10L, segmentId1, 100L);
+        epochState.handleSegmentWithCopySegmentFinishedState(5L, segmentId2, 150L);
+
+        assertEquals(1, epochState.referencedSegmentIds().size());
+        assertEquals(segmentId2, epochState.floorEntry(11L));
+        assertEquals(1, epochState.unreferencedSegmentIds().size());
+        assertTrue(epochState.unreferencedSegmentIds().contains(segmentId1));
+        assertEquals(150L, epochState.highestLogOffset());
+    }
+
+    @Test
+    void handleSegmentWithCopySegmentFinishedStateForMultipleOverlappingSegments() {
+        RemoteLogSegmentId segmentId1 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+        RemoteLogSegmentId segmentId2 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+        RemoteLogSegmentId segmentId3 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+        RemoteLogSegmentId segmentId4 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+
+        epochState.handleSegmentWithCopySegmentFinishedState(10L, segmentId1, 100L);
+        epochState.handleSegmentWithCopySegmentFinishedState(5L, segmentId2, 150L);
+        epochState.handleSegmentWithCopySegmentFinishedState(148L, segmentId3, 155L);
+        epochState.handleSegmentWithCopySegmentFinishedState(4L, segmentId4, 200L);
+
+        assertEquals(1, epochState.referencedSegmentIds().size());
+        assertEquals(segmentId4, epochState.floorEntry(11L));
+        assertEquals(3, epochState.unreferencedSegmentIds().size());
+        assertTrue(epochState.unreferencedSegmentIds().containsAll(Arrays.asList(segmentId1, segmentId2, segmentId3)));
+        assertEquals(200L, epochState.highestLogOffset());
+    }
+
+    @Test
+    void handleSegmentWithCopySegmentFinishedStateForDuplicateSegments() {
+        RemoteLogSegmentId segmentId1 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+        RemoteLogSegmentId segmentId2 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+        epochState.handleSegmentWithCopySegmentFinishedState(10L, segmentId1, 100L);
+        epochState.handleSegmentWithCopySegmentFinishedState(10L, segmentId2, 100L);
+
+        assertEquals(segmentId2, epochState.floorEntry(11L));
+        assertEquals(1, epochState.unreferencedSegmentIds().size());
+        assertTrue(epochState.unreferencedSegmentIds().contains(segmentId1));
+        assertEquals(100L, epochState.highestLogOffset());
+    }
+
+    @Test
+    void handleSegmentWithCopySegmentFinishedStateForSegmentsWithSameStartOffset() {
+        RemoteLogSegmentId segmentId1 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+        RemoteLogSegmentId segmentId2 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+        epochState.handleSegmentWithCopySegmentFinishedState(10L, segmentId1, 100L);
+        epochState.handleSegmentWithCopySegmentFinishedState(10L, segmentId2, 150L);
+
+        assertEquals(segmentId2, epochState.floorEntry(11L));
+        assertEquals(segmentId2, epochState.floorEntry(111L));
+        assertEquals(1, epochState.unreferencedSegmentIds().size());
+        assertEquals(150L, epochState.highestLogOffset());
+    }
+
+    @Test
+    void handleSegmentWithDeleteSegmentStartedState() {
+        RemoteLogSegmentId segmentId1 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+        RemoteLogSegmentId segmentId2 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+        epochState.handleSegmentWithCopySegmentFinishedState(10L, segmentId1, 100L);
+        epochState.handleSegmentWithCopySegmentFinishedState(101L, segmentId2, 200L);
+        assertEquals(2, epochState.referencedSegmentIds().size());
+
+        epochState.handleSegmentWithDeleteSegmentStartedState(10L, segmentId1);
+        epochState.handleSegmentWithDeleteSegmentStartedState(101L, segmentId2);
+        assertTrue(epochState.referencedSegmentIds().isEmpty());
+        assertEquals(2, epochState.unreferencedSegmentIds().size());
+        assertTrue(epochState.unreferencedSegmentIds().containsAll(Arrays.asList(segmentId1, segmentId2)));
+    }
+
+    @Test
+    void handleSegmentWithDeleteSegmentFinishedState() {
+        RemoteLogSegmentId segmentId = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+        epochState.handleSegmentWithCopySegmentStartedState(segmentId);
+        assertEquals(1, epochState.unreferencedSegmentIds().size());
+
+        epochState.handleSegmentWithDeleteSegmentFinishedState(segmentId);
+        assertTrue(epochState.unreferencedSegmentIds().isEmpty());
+    }
+
+    private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId,
+                                                                    long startOffset) {
+        RemoteLogSegmentMetadata metadata = mock(RemoteLogSegmentMetadata.class);
+        when(metadata.remoteLogSegmentId()).thenReturn(remoteLogSegmentId);
+        when(metadata.startOffset()).thenReturn(startOffset);
+        return metadata;
+    }
+}
\ No newline at end of file