You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/04/08 18:09:54 UTC

[kafka] branch 2.8 updated: KAFKA-12619; Raft leader should expose hw only after committing LeaderChange (#10481)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new e399a80  KAFKA-12619; Raft leader should expose hw only after committing LeaderChange (#10481)
e399a80 is described below

commit e399a802c9f961bca36766a6367645290127b32b
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Apr 8 10:42:30 2021 -0700

    KAFKA-12619; Raft leader should expose hw only after committing LeaderChange (#10481)
    
    KIP-595 describes an extra condition on commitment here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Fetch. In order to ensure that a newly elected leader's committed entries cannot get lost, it must commit one record from its own epoch. This guarantees that its latest entry is larger (in terms of epoch/offset) than any previously written record which ensures that any future leader must a [...]
    
    Although we had this check implemented, it was off by one. We only ensured that replication reached the epoch start offset, which does not reflect the appended `LeaderChange` record. This patch fixes the check and clarifies the point of the check. The rest of the patch is just fixing up test cases.
    
    Reviewers: dengziming <sw...@163.com>, Guozhang Wang <wa...@gmail.com>
---
 .../java/org/apache/kafka/raft/LeaderState.java    | 13 ++--
 .../org/apache/kafka/raft/KafkaRaftClientTest.java | 77 +++++++++++++++-------
 .../org/apache/kafka/raft/LeaderStateTest.java     | 60 ++++++++---------
 .../apache/kafka/raft/RaftClientTestContext.java   | 31 +++------
 4 files changed, 99 insertions(+), 82 deletions(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index 0f0f728..7224294 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -106,13 +106,18 @@ public class LeaderState implements EpochState {
         Optional<LogOffsetMetadata> highWatermarkUpdateOpt = followersByDescendingFetchOffset.get(indexOfHw).endOffset;
 
         if (highWatermarkUpdateOpt.isPresent()) {
-            // When a leader is first elected, it cannot know the high watermark of the previous
-            // leader. In order to avoid exposing a non-monotonically increasing value, we have
-            // to wait for followers to catch up to the start of the leader's epoch.
+
+            // The KRaft protocol requires an extra condition on commitment after a leader
+            // election. The leader must commit one record from its own epoch before it is
+            // allowed to expose records from any previous epoch. This guarantees that its
+            // log will contain the largest record (in terms of epoch/offset) in any log
+            // which ensures that any future leader will have replicated this record as well
+            // as all records from previous epochs that the current leader has committed.
+
             LogOffsetMetadata highWatermarkUpdateMetadata = highWatermarkUpdateOpt.get();
             long highWatermarkUpdate = highWatermarkUpdateMetadata.offset;
 
-            if (highWatermarkUpdate >= epochStartOffset) {
+            if (highWatermarkUpdate > epochStartOffset) {
                 if (highWatermark.isPresent()) {
                     LogOffsetMetadata currentHighWatermarkMetadata = highWatermark.get();
                     if (highWatermarkUpdate > currentHighWatermarkMetadata.offset
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 1c716f4..4aeed0b 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -187,7 +187,8 @@ public class KafkaRaftClientTest {
             .build();
 
         context.pollUntilRequest();
-        List<RaftRequest.Outbound> requests = context.collectEndQuorumRequests(epoch, Utils.mkSet(voter1, voter2));
+        List<RaftRequest.Outbound> requests = context.collectEndQuorumRequests(
+            epoch, Utils.mkSet(voter1, voter2), Optional.empty());
         assertEquals(2, requests.size());
 
         // Respond to one of the requests so that we can verify that no additional
@@ -203,7 +204,8 @@ public class KafkaRaftClientTest {
         int nonRespondedId = requests.get(1).destinationId();
         context.time.sleep(6000);
         context.pollUntilRequest();
-        List<RaftRequest.Outbound> retries = context.collectEndQuorumRequests(epoch, Utils.mkSet(nonRespondedId));
+        List<RaftRequest.Outbound> retries = context.collectEndQuorumRequests(
+            epoch, Utils.mkSet(nonRespondedId), Optional.empty());
         assertEquals(1, retries.size());
     }
 
@@ -831,13 +833,14 @@ public class KafkaRaftClientTest {
         // First poll has no high watermark advance
         context.client.poll();
         assertEquals(OptionalLong.empty(), context.client.highWatermark());
+        assertEquals(1L, context.log.endOffset().offset);
 
         // Let follower send a fetch to initialize the high watermark,
         // note the offset 0 would be a control message for becoming the leader
-        context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 0L, epoch, 500));
+        context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 1L, epoch, 0));
         context.pollUntilResponse();
         context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId));
-        assertEquals(OptionalLong.of(0L), context.client.highWatermark());
+        assertEquals(OptionalLong.of(1L), context.client.highWatermark());
 
         List<String> records = Arrays.asList("a", "b", "c");
         long offset = context.client.scheduleAppend(epoch, records);
@@ -1483,7 +1486,18 @@ public class KafkaRaftClientTest {
 
         RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
 
-        context.buildFollowerSet(epoch, closeFollower, laggingFollower);
+        // The lagging follower fetches first
+        context.deliverRequest(context.fetchRequest(1, laggingFollower, 1L, epoch, 0));
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(1L, epoch);
+
+        // Append some records, so that the close follower will be able to advance further.
+        context.client.scheduleAppend(epoch, Arrays.asList("foo", "bar"));
+        context.client.poll();
+
+        context.deliverRequest(context.fetchRequest(epoch, closeFollower, 3L, epoch, 0));
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(3L, epoch);
 
         // Now shutdown
         context.client.shutdown(context.electionTimeoutMs() * 2);
@@ -1495,10 +1509,11 @@ public class KafkaRaftClientTest {
         context.pollUntilRequest();
         assertTrue(context.client.isRunning());
 
-        List<RaftRequest.Outbound> endQuorumRequests = context.collectEndQuorumRequests(
-            1, Utils.mkSet(closeFollower, laggingFollower));
-
-        assertEquals(2, endQuorumRequests.size());
+        context.collectEndQuorumRequests(
+            epoch,
+            Utils.mkSet(closeFollower, laggingFollower),
+            Optional.of(Arrays.asList(closeFollower, laggingFollower))
+        );
     }
 
     @Test
@@ -1511,22 +1526,27 @@ public class KafkaRaftClientTest {
 
         RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
 
-        context.buildFollowerSet(epoch, closeFollower, laggingFollower);
+        context.deliverRequest(context.fetchRequest(1, laggingFollower, 1L, epoch, 0));
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(1L, epoch);
+
+        context.client.scheduleAppend(epoch, Arrays.asList("foo", "bar"));
+        context.client.poll();
+
+        context.deliverRequest(context.fetchRequest(epoch, closeFollower, 3L, epoch, 0));
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(3L, epoch);
 
         // Create observer
         int observerId = 3;
         context.deliverRequest(context.fetchRequest(epoch, observerId, 0L, 0, 0));
-
         context.pollUntilResponse();
-
-        long highWatermark = 1L;
-        context.assertSentFetchPartitionResponse(highWatermark, epoch);
+        context.assertSentFetchPartitionResponse(3L, epoch);
 
         context.deliverRequest(DescribeQuorumRequest.singletonRequest(context.metadataPartition));
-
         context.pollUntilResponse();
 
-        context.assertSentDescribeQuorumResponse(localId, epoch, highWatermark,
+        context.assertSentDescribeQuorumResponse(localId, epoch, 3L,
             Arrays.asList(
                 new ReplicaState()
                     .setReplicaId(localId)
@@ -1535,10 +1555,10 @@ public class KafkaRaftClientTest {
                     .setLogEndOffset(3L),
                 new ReplicaState()
                     .setReplicaId(laggingFollower)
-                    .setLogEndOffset(0L),
+                    .setLogEndOffset(1L),
                 new ReplicaState()
                     .setReplicaId(closeFollower)
-                    .setLogEndOffset(1L)),
+                    .setLogEndOffset(3)),
             singletonList(
                 new ReplicaState()
                     .setReplicaId(observerId)
@@ -2003,6 +2023,10 @@ public class KafkaRaftClientTest {
         context.becomeLeader();
         context.client.poll();
 
+        // After becoming leader, we expect the `LeaderChange` record to be appended
+        // in addition to the initial 9 records in the log.
+        assertEquals(10L, context.log.endOffset().offset);
+
         // The high watermark is not known to the leader until the followers
         // begin fetching, so we should not have fired the `handleClaim` callback.
         assertEquals(OptionalInt.empty(), context.listener.currentClaimedEpoch());
@@ -2018,8 +2042,8 @@ public class KafkaRaftClientTest {
 
         // Now catch up to the start of the leader epoch so that the high
         // watermark advances and we can start sending committed data to the
-        // listener.
-        context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 9L, 2, 500));
+        // listener. Note that the `LeaderChange` control record is filtered.
+        context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 10L, epoch, 500));
         context.client.poll();
         assertEquals(OptionalInt.empty(), context.listener.currentClaimedEpoch());
         assertEquals(3, context.listener.numCommittedBatches());
@@ -2054,11 +2078,12 @@ public class KafkaRaftClientTest {
 
         context.becomeLeader();
         context.client.poll();
+        assertEquals(10L, context.log.endOffset().offset);
 
         // Let the initial listener catch up
-        context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 9L, 2, 500));
+        context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 10L, epoch, 0));
         context.client.poll();
-        assertEquals(OptionalLong.of(9L), context.client.highWatermark());
+        assertEquals(OptionalLong.of(10L), context.client.highWatermark());
         context.client.poll();
         assertEquals(OptionalInt.of(epoch), context.listener.currentClaimedEpoch());
 
@@ -2184,16 +2209,18 @@ public class KafkaRaftClientTest {
 
         // Start off as the leader and receive a fetch to initialize the high watermark
         context.becomeLeader();
-        context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 9L, epoch, 500));
+        assertEquals(10L, context.log.endOffset().offset);
+
+        context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 10L, epoch, 0));
         context.pollUntilResponse();
-        assertEquals(OptionalLong.of(9L), context.client.highWatermark());
+        assertEquals(OptionalLong.of(10L), context.client.highWatermark());
         context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId));
 
         // Now we receive a vote request which transitions us to the 'unattached' state
         context.deliverRequest(context.voteRequest(epoch + 1, otherNodeId, epoch, 9L));
         context.pollUntilResponse();
         context.assertUnknownLeader(epoch + 1);
-        assertEquals(OptionalLong.of(9L), context.client.highWatermark());
+        assertEquals(OptionalLong.of(10L), context.client.highWatermark());
 
         // Timeout the election and become candidate
         int candidateEpoch = epoch + 2;
diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
index 8ec91a7..2a97698 100644
--- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
@@ -58,27 +58,32 @@ public class LeaderStateTest {
     public void testUpdateHighWatermarkQuorumSizeOne() {
         LeaderState state = new LeaderState(localId, epoch, 15L, Collections.singleton(localId), Collections.emptySet());
         assertEquals(Optional.empty(), state.highWatermark());
-        assertTrue(state.updateLocalState(0, new LogOffsetMetadata(15L)));
-        assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
+        assertFalse(state.updateLocalState(0, new LogOffsetMetadata(15L)));
+        assertEquals(Collections.emptySet(), state.nonAcknowledgingVoters());
+        assertEquals(Optional.empty(), state.highWatermark());
+        assertTrue(state.updateLocalState(0, new LogOffsetMetadata(16L)));
+        assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark());
+        assertTrue(state.updateLocalState(0, new LogOffsetMetadata(20)));
+        assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark());
     }
 
     @Test
     public void testNonMonotonicEndOffsetUpdate() {
         LeaderState state = new LeaderState(localId, epoch, 15L, Collections.singleton(localId), Collections.emptySet());
         assertEquals(Optional.empty(), state.highWatermark());
-        assertTrue(state.updateLocalState(0, new LogOffsetMetadata(15L)));
-        assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
+        assertTrue(state.updateLocalState(0, new LogOffsetMetadata(16L)));
+        assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark());
         assertThrows(IllegalArgumentException.class,
-            () -> state.updateLocalState(0, new LogOffsetMetadata(14L)));
+            () -> state.updateLocalState(0, new LogOffsetMetadata(15L)));
     }
 
     @Test
     public void testIdempotentEndOffsetUpdate() {
         LeaderState state = new LeaderState(localId, epoch, 15L, Collections.singleton(localId), Collections.emptySet());
         assertEquals(Optional.empty(), state.highWatermark());
-        assertTrue(state.updateLocalState(0, new LogOffsetMetadata(15L)));
-        assertFalse(state.updateLocalState(0, new LogOffsetMetadata(15L)));
-        assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
+        assertTrue(state.updateLocalState(0, new LogOffsetMetadata(16L)));
+        assertFalse(state.updateLocalState(0, new LogOffsetMetadata(16L)));
+        assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark());
     }
 
     @Test
@@ -86,11 +91,11 @@ public class LeaderStateTest {
         LeaderState state = new LeaderState(localId, epoch, 15L, Collections.singleton(localId), Collections.emptySet());
         assertEquals(Optional.empty(), state.highWatermark());
 
-        LogOffsetMetadata initialHw = new LogOffsetMetadata(15L, Optional.of(new MockOffsetMetadata("foo")));
+        LogOffsetMetadata initialHw = new LogOffsetMetadata(16L, Optional.of(new MockOffsetMetadata("bar")));
         assertTrue(state.updateLocalState(0, initialHw));
         assertEquals(Optional.of(initialHw), state.highWatermark());
 
-        LogOffsetMetadata updateHw = new LogOffsetMetadata(15L, Optional.of(new MockOffsetMetadata("bar")));
+        LogOffsetMetadata updateHw = new LogOffsetMetadata(16L, Optional.of(new MockOffsetMetadata("baz")));
         assertTrue(state.updateLocalState(0, updateHw));
         assertEquals(Optional.of(updateHw), state.highWatermark());
     }
@@ -99,25 +104,16 @@ public class LeaderStateTest {
     public void testUpdateHighWatermarkQuorumSizeTwo() {
         int otherNodeId = 1;
         LeaderState state = new LeaderState(localId, epoch, 10L, mkSet(localId, otherNodeId), Collections.emptySet());
-        assertFalse(state.updateLocalState(0, new LogOffsetMetadata(15L)));
-        assertEquals(Optional.empty(), state.highWatermark());
-        assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(10L)));
-        assertEquals(Collections.emptySet(), state.nonAcknowledgingVoters());
-        assertEquals(Optional.of(new LogOffsetMetadata(10L)), state.highWatermark());
-        assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(15L)));
-        assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
-    }
-
-    @Test
-    public void testHighWatermarkUnknownUntilStartOfLeaderEpoch() {
-        int otherNodeId = 1;
-        LeaderState state = new LeaderState(localId, epoch, 15L, mkSet(localId, otherNodeId), Collections.emptySet());
-        assertFalse(state.updateLocalState(0, new LogOffsetMetadata(20L)));
+        assertFalse(state.updateLocalState(0, new LogOffsetMetadata(13L)));
+        assertEquals(Collections.singleton(otherNodeId), state.nonAcknowledgingVoters());
         assertEquals(Optional.empty(), state.highWatermark());
         assertFalse(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(10L)));
+        assertEquals(Collections.emptySet(), state.nonAcknowledgingVoters());
         assertEquals(Optional.empty(), state.highWatermark());
-        assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(15L)));
-        assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
+        assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(11L)));
+        assertEquals(Optional.of(new LogOffsetMetadata(11L)), state.highWatermark());
+        assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(13L)));
+        assertEquals(Optional.of(new LogOffsetMetadata(13L)), state.highWatermark());
     }
 
     @Test
@@ -126,17 +122,21 @@ public class LeaderStateTest {
         int node2 = 2;
         LeaderState state = new LeaderState(localId, epoch, 10L, mkSet(localId, node1, node2), Collections.emptySet());
         assertFalse(state.updateLocalState(0, new LogOffsetMetadata(15L)));
+        assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters());
         assertEquals(Optional.empty(), state.highWatermark());
-        assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(10L)));
+        assertFalse(state.updateReplicaState(node1, 0, new LogOffsetMetadata(10L)));
         assertEquals(Collections.singleton(node2), state.nonAcknowledgingVoters());
-        assertEquals(Optional.of(new LogOffsetMetadata(10L)), state.highWatermark());
+        assertEquals(Optional.empty(), state.highWatermark());
+        assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(10L)));
+        assertEquals(Collections.emptySet(), state.nonAcknowledgingVoters());
+        assertEquals(Optional.empty(), state.highWatermark());
         assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
         assertFalse(state.updateLocalState(0, new LogOffsetMetadata(20L)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
-        assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(20L)));
+        assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(20L)));
         assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark());
-        assertFalse(state.updateReplicaState(node1, 0, new LogOffsetMetadata(20L)));
+        assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(20L)));
         assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark());
     }
 
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 3683a8e..3f19913 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -63,7 +63,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -561,7 +560,7 @@ public final class RaftClientTestContext {
 
     int assertSentEndQuorumEpochRequest(int epoch, int destinationId) {
         List<RaftRequest.Outbound> endQuorumRequests = collectEndQuorumRequests(
-            epoch, Collections.singleton(destinationId));
+            epoch, Collections.singleton(destinationId), Optional.empty());
         assertEquals(1, endQuorumRequests.size());
         return endQuorumRequests.get(0).correlationId();
     }
@@ -683,28 +682,11 @@ public final class RaftClientTestContext {
         return FetchSnapshotResponse.forTopicPartition(response, topicPartition);
     }
 
-    void buildFollowerSet(
+    List<RaftRequest.Outbound> collectEndQuorumRequests(
         int epoch,
-        int closeFollower,
-        int laggingFollower
-    ) throws Exception {
-        // The lagging follower fetches first
-        deliverRequest(fetchRequest(1, laggingFollower, 0L, 0, 0));
-
-        pollUntilResponse();
-        assertSentFetchPartitionResponse(0L, epoch);
-
-        // Append some records, so that the close follower will be able to advance further.
-        client.scheduleAppend(epoch, Arrays.asList("foo", "bar"));
-        client.poll();
-
-        deliverRequest(fetchRequest(epoch, closeFollower, 1L, epoch, 0));
-
-        pollUntilResponse();
-        assertSentFetchPartitionResponse(1L, epoch);
-    }
-
-    List<RaftRequest.Outbound> collectEndQuorumRequests(int epoch, Set<Integer> destinationIdSet) {
+        Set<Integer> destinationIdSet,
+        Optional<List<Integer>> preferredSuccessorsOpt
+    ) {
         List<RaftRequest.Outbound> endQuorumRequests = new ArrayList<>();
         Set<Integer> collectedDestinationIdSet = new HashSet<>();
         for (RaftMessage raftMessage : channel.drainSendQueue()) {
@@ -716,6 +698,9 @@ public final class RaftClientTestContext {
 
                 assertEquals(epoch, partitionRequest.leaderEpoch());
                 assertEquals(localIdOrThrow(), partitionRequest.leaderId());
+                preferredSuccessorsOpt.ifPresent(preferredSuccessors -> {
+                    assertEquals(preferredSuccessors, partitionRequest.preferredSuccessors());
+                });
 
                 RaftRequest.Outbound outboundRequest = (RaftRequest.Outbound) raftMessage;
                 collectedDestinationIdSet.add(outboundRequest.destinationId());