You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sa...@apache.org on 2023/07/24 14:06:35 UTC
[kafka] branch trunk updated: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache (#14004)
This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 84691b11f64 KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache (#14004)
84691b11f64 is described below
commit 84691b11f64e85a7f3f6fdbafd0f8fb2f8dd630c
Author: Kamal Chandraprakash <kc...@uber.com>
AuthorDate: Mon Jul 24 19:36:25 2023 +0530
KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache (#14004)
KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache
Reviewers: Satish Duggana <sa...@apache.org>, Viktor Nikitash <ni...@pdffiller.com>, Jorge Esteban Quilcate Otoya <qu...@gmail.com>, Abhijeet Kumar <ab...@gmail.com>
---
.../storage/RemoteLogLeaderEpochState.java | 24 ++-
.../storage/RemoteLogLeaderEpochStateTest.java | 206 +++++++++++++++++++++
2 files changed, 224 insertions(+), 6 deletions(-)
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java
index d5787dd7985..b607da06fec 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java
@@ -100,17 +100,29 @@ class RemoteLogLeaderEpochState {
void handleSegmentWithCopySegmentFinishedState(Long startOffset, RemoteLogSegmentId remoteLogSegmentId,
Long leaderEpochEndOffset) {
+ // If there are duplicate segments uploaded due to leader-election, then mark them as unreferenced.
+ // Duplicate segments can be uploaded when the previous leader had tier-lags and the next leader uploads the
+ // segment for the same leader-epoch which is a super-set of previously uploaded segments.
+ // (eg)
+ // case-1: Duplicate segment
+ // L0 uploaded segment S0 with offsets 0-100 and L1 uploaded segment S1 with offsets 0-200.
+ // We will mark the segment S0 as duplicate and add it to unreferencedSegmentIds.
+ // case-2: Overlapping segments
+ // L0 uploaded segment S0 with offsets 10-90 and L1 uploaded segment S1 with offsets 5-100, S2-101-200,
+ // and so on. When the consumer request for segment with offset 95, it should get the segment S1 and not S0.
+ Map.Entry<Long, RemoteLogSegmentId> lastEntry = offsetToId.lastEntry();
+ while (lastEntry != null && lastEntry.getKey() >= startOffset && highestLogOffset <= leaderEpochEndOffset) {
+ offsetToId.remove(lastEntry.getKey());
+ unreferencedSegmentIds.add(lastEntry.getValue());
+ lastEntry = offsetToId.lastEntry();
+ }
+
// Add the segment epochs mapping as the segment is copied successfully.
- RemoteLogSegmentId oldEntry = offsetToId.put(startOffset, remoteLogSegmentId);
+ offsetToId.put(startOffset, remoteLogSegmentId);
// Remove the metadata from unreferenced entries as it is successfully copied and added to the offset mapping.
unreferencedSegmentIds.remove(remoteLogSegmentId);
- // Add the old entry to unreferenced entries as the mapping is removed for the old entry.
- if (oldEntry != null) {
- unreferencedSegmentIds.add(oldEntry);
- }
-
// Update the highest offset entry for this leader epoch as we added a new mapping.
if (highestLogOffset == null || leaderEpochEndOffset > highestLogOffset) {
highestLogOffset = leaderEpochEndOffset;
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochStateTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochStateTest.java
new file mode 100644
index 00000000000..820596499e7
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochStateTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class RemoteLogLeaderEpochStateTest {
+
+ TopicPartition tp = new TopicPartition("topic", 0);
+ TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), tp);
+ RemoteLogLeaderEpochState epochState = new RemoteLogLeaderEpochState();
+
+ @Test
+ void testListAllRemoteLogSegmentsOnEmpty() throws RemoteResourceNotFoundException {
+ assertFalse(epochState.listAllRemoteLogSegments(Collections.emptyMap()).hasNext());
+ }
+
+ @Test
+ void testListAllRemoteLogSegmentsShouldThrowErrorForUnknownSegmentId() {
+ RemoteLogSegmentId segmentId1 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+ RemoteLogSegmentId segmentId2 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+ epochState.handleSegmentWithCopySegmentFinishedState(0L, segmentId1, 10L);
+ assertThrows(RemoteResourceNotFoundException.class,
+ () -> epochState.listAllRemoteLogSegments(Collections.singletonMap(segmentId2, null)));
+ }
+
+ @Test
+ void testListAllRemoteLogSegmentsShouldReturnSortedSegments() throws RemoteResourceNotFoundException {
+ Map<RemoteLogSegmentId, RemoteLogSegmentMetadata> segmentIdToMetadataMap = new HashMap<>();
+
+ // copy started but never finished so marked as unreferenced
+ RemoteLogSegmentId segmentId1 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+ epochState.handleSegmentWithCopySegmentStartedState(segmentId1);
+ segmentIdToMetadataMap.put(segmentId1, createRemoteLogSegmentMetadata(segmentId1, 0L));
+
+ // copy finished successfully
+ RemoteLogSegmentId segmentId2 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+ epochState.handleSegmentWithCopySegmentFinishedState(5L, segmentId2, 10L);
+ segmentIdToMetadataMap.put(segmentId2, createRemoteLogSegmentMetadata(segmentId2, 5L));
+
+ // copy finished successfully, but overwritten by the next segment upload so marked as unreferenced.
+ RemoteLogSegmentId segmentId3 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+ epochState.handleSegmentWithCopySegmentFinishedState(11L, segmentId3, 100L);
+ segmentIdToMetadataMap.put(segmentId3, createRemoteLogSegmentMetadata(segmentId3, 11L));
+
+ // copy finished successfully
+ RemoteLogSegmentId segmentId4 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+ epochState.handleSegmentWithCopySegmentFinishedState(9L, segmentId4, 150L);
+ segmentIdToMetadataMap.put(segmentId4, createRemoteLogSegmentMetadata(segmentId4, 9L));
+
+ // segments should be sorted by start-offset
+ List<RemoteLogSegmentId> expectedList = Arrays.asList(segmentId1, segmentId2, segmentId4, segmentId3);
+ List<RemoteLogSegmentId> actualList = new ArrayList<>();
+ epochState.listAllRemoteLogSegments(segmentIdToMetadataMap)
+ .forEachRemaining(metadata -> actualList.add(metadata.remoteLogSegmentId()));
+ assertEquals(expectedList, actualList);
+ }
+
+ @Test
+ void handleSegmentWithCopySegmentStartedState() {
+ RemoteLogSegmentId segmentId = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+ epochState.handleSegmentWithCopySegmentStartedState(segmentId);
+ assertEquals(1, epochState.unreferencedSegmentIds().size());
+ assertTrue(epochState.unreferencedSegmentIds().contains(segmentId));
+ }
+
+ @Test
+ void handleSegmentWithCopySegmentFinishedState() {
+ RemoteLogSegmentId segmentId1 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+ RemoteLogSegmentId segmentId2 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+ epochState.handleSegmentWithCopySegmentFinishedState(10L, segmentId1, 100L);
+ epochState.handleSegmentWithCopySegmentFinishedState(101L, segmentId2, 200L);
+
+ assertEquals(2, epochState.referencedSegmentIds().size());
+ assertEquals(segmentId1, epochState.floorEntry(90L));
+ assertEquals(segmentId2, epochState.floorEntry(150L));
+ assertTrue(epochState.unreferencedSegmentIds().isEmpty());
+ assertEquals(200L, epochState.highestLogOffset());
+ }
+
+ @Test
+ void handleSegmentWithCopySegmentFinishedStateForOverlappingSegments() {
+ RemoteLogSegmentId segmentId1 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+ RemoteLogSegmentId segmentId2 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+ epochState.handleSegmentWithCopySegmentFinishedState(10L, segmentId1, 100L);
+ epochState.handleSegmentWithCopySegmentFinishedState(5L, segmentId2, 150L);
+
+ assertEquals(1, epochState.referencedSegmentIds().size());
+ assertEquals(segmentId2, epochState.floorEntry(11L));
+ assertEquals(1, epochState.unreferencedSegmentIds().size());
+ assertTrue(epochState.unreferencedSegmentIds().contains(segmentId1));
+ assertEquals(150L, epochState.highestLogOffset());
+ }
+
+ @Test
+ void handleSegmentWithCopySegmentFinishedStateForMultipleOverlappingSegments() {
+ RemoteLogSegmentId segmentId1 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+ RemoteLogSegmentId segmentId2 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+ RemoteLogSegmentId segmentId3 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+ RemoteLogSegmentId segmentId4 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+
+ epochState.handleSegmentWithCopySegmentFinishedState(10L, segmentId1, 100L);
+ epochState.handleSegmentWithCopySegmentFinishedState(5L, segmentId2, 150L);
+ epochState.handleSegmentWithCopySegmentFinishedState(148L, segmentId3, 155L);
+ epochState.handleSegmentWithCopySegmentFinishedState(4L, segmentId4, 200L);
+
+ assertEquals(1, epochState.referencedSegmentIds().size());
+ assertEquals(segmentId4, epochState.floorEntry(11L));
+ assertEquals(3, epochState.unreferencedSegmentIds().size());
+ assertTrue(epochState.unreferencedSegmentIds().containsAll(Arrays.asList(segmentId1, segmentId2, segmentId3)));
+ assertEquals(200L, epochState.highestLogOffset());
+ }
+
+ @Test
+ void handleSegmentWithCopySegmentFinishedStateForDuplicateSegments() {
+ RemoteLogSegmentId segmentId1 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+ RemoteLogSegmentId segmentId2 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+ epochState.handleSegmentWithCopySegmentFinishedState(10L, segmentId1, 100L);
+ epochState.handleSegmentWithCopySegmentFinishedState(10L, segmentId2, 100L);
+
+ assertEquals(segmentId2, epochState.floorEntry(11L));
+ assertEquals(1, epochState.unreferencedSegmentIds().size());
+ assertTrue(epochState.unreferencedSegmentIds().contains(segmentId1));
+ assertEquals(100L, epochState.highestLogOffset());
+ }
+
+ @Test
+ void handleSegmentWithCopySegmentFinishedStateForSegmentsWithSameStartOffset() {
+ RemoteLogSegmentId segmentId1 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+ RemoteLogSegmentId segmentId2 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+ epochState.handleSegmentWithCopySegmentFinishedState(10L, segmentId1, 100L);
+ epochState.handleSegmentWithCopySegmentFinishedState(10L, segmentId2, 150L);
+
+ assertEquals(segmentId2, epochState.floorEntry(11L));
+ assertEquals(segmentId2, epochState.floorEntry(111L));
+ assertEquals(1, epochState.unreferencedSegmentIds().size());
+ assertEquals(150L, epochState.highestLogOffset());
+ }
+
+ @Test
+ void handleSegmentWithDeleteSegmentStartedState() {
+ RemoteLogSegmentId segmentId1 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+ RemoteLogSegmentId segmentId2 = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+ epochState.handleSegmentWithCopySegmentFinishedState(10L, segmentId1, 100L);
+ epochState.handleSegmentWithCopySegmentFinishedState(101L, segmentId2, 200L);
+ assertEquals(2, epochState.referencedSegmentIds().size());
+
+ epochState.handleSegmentWithDeleteSegmentStartedState(10L, segmentId1);
+ epochState.handleSegmentWithDeleteSegmentStartedState(101L, segmentId2);
+ assertTrue(epochState.referencedSegmentIds().isEmpty());
+ assertEquals(2, epochState.unreferencedSegmentIds().size());
+ assertTrue(epochState.unreferencedSegmentIds().containsAll(Arrays.asList(segmentId1, segmentId2)));
+ }
+
+ @Test
+ void handleSegmentWithDeleteSegmentFinishedState() {
+ RemoteLogSegmentId segmentId = new RemoteLogSegmentId(tpId, Uuid.randomUuid());
+ epochState.handleSegmentWithCopySegmentStartedState(segmentId);
+ assertEquals(1, epochState.unreferencedSegmentIds().size());
+
+ epochState.handleSegmentWithDeleteSegmentFinishedState(segmentId);
+ assertTrue(epochState.unreferencedSegmentIds().isEmpty());
+ }
+
+ private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId,
+ long startOffset) {
+ RemoteLogSegmentMetadata metadata = mock(RemoteLogSegmentMetadata.class);
+ when(metadata.remoteLogSegmentId()).thenReturn(remoteLogSegmentId);
+ when(metadata.startOffset()).thenReturn(startOffset);
+ return metadata;
+ }
+}
\ No newline at end of file