You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/04/08 18:09:54 UTC
[kafka] branch 2.8 updated: KAFKA-12619;
Raft leader should expose hw only after committing LeaderChange
(#10481)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new e399a80 KAFKA-12619; Raft leader should expose hw only after committing LeaderChange (#10481)
e399a80 is described below
commit e399a802c9f961bca36766a6367645290127b32b
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Apr 8 10:42:30 2021 -0700
KAFKA-12619; Raft leader should expose hw only after committing LeaderChange (#10481)
KIP-595 describes an extra condition on commitment here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Fetch. In order to ensure that a newly elected leader's committed entries cannot get lost, it must commit one record from its own epoch. This guarantees that its latest entry is larger (in terms of epoch/offset) than any previously written record which ensures that any future leader must a [...]
Although we had this check implemented, it was off by one. We only ensured that replication reached the epoch start offset, which does not reflect the appended `LeaderChange` record. This patch fixes the check and clarifies the point of the check. The rest of the patch is just fixing up test cases.
Reviewers: dengziming <sw...@163.com>, Guozhang Wang <wa...@gmail.com>
---
.../java/org/apache/kafka/raft/LeaderState.java | 13 ++--
.../org/apache/kafka/raft/KafkaRaftClientTest.java | 77 +++++++++++++++-------
.../org/apache/kafka/raft/LeaderStateTest.java | 60 ++++++++---------
.../apache/kafka/raft/RaftClientTestContext.java | 31 +++------
4 files changed, 99 insertions(+), 82 deletions(-)
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index 0f0f728..7224294 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -106,13 +106,18 @@ public class LeaderState implements EpochState {
Optional<LogOffsetMetadata> highWatermarkUpdateOpt = followersByDescendingFetchOffset.get(indexOfHw).endOffset;
if (highWatermarkUpdateOpt.isPresent()) {
- // When a leader is first elected, it cannot know the high watermark of the previous
- // leader. In order to avoid exposing a non-monotonically increasing value, we have
- // to wait for followers to catch up to the start of the leader's epoch.
+
+ // The KRaft protocol requires an extra condition on commitment after a leader
+ // election. The leader must commit one record from its own epoch before it is
+ // allowed to expose records from any previous epoch. This guarantees that its
+ // log will contain the largest record (in terms of epoch/offset) in any log
+ // which ensures that any future leader will have replicated this record as well
+ // as all records from previous epochs that the current leader has committed.
+
LogOffsetMetadata highWatermarkUpdateMetadata = highWatermarkUpdateOpt.get();
long highWatermarkUpdate = highWatermarkUpdateMetadata.offset;
- if (highWatermarkUpdate >= epochStartOffset) {
+ if (highWatermarkUpdate > epochStartOffset) {
if (highWatermark.isPresent()) {
LogOffsetMetadata currentHighWatermarkMetadata = highWatermark.get();
if (highWatermarkUpdate > currentHighWatermarkMetadata.offset
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 1c716f4..4aeed0b 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -187,7 +187,8 @@ public class KafkaRaftClientTest {
.build();
context.pollUntilRequest();
- List<RaftRequest.Outbound> requests = context.collectEndQuorumRequests(epoch, Utils.mkSet(voter1, voter2));
+ List<RaftRequest.Outbound> requests = context.collectEndQuorumRequests(
+ epoch, Utils.mkSet(voter1, voter2), Optional.empty());
assertEquals(2, requests.size());
// Respond to one of the requests so that we can verify that no additional
@@ -203,7 +204,8 @@ public class KafkaRaftClientTest {
int nonRespondedId = requests.get(1).destinationId();
context.time.sleep(6000);
context.pollUntilRequest();
- List<RaftRequest.Outbound> retries = context.collectEndQuorumRequests(epoch, Utils.mkSet(nonRespondedId));
+ List<RaftRequest.Outbound> retries = context.collectEndQuorumRequests(
+ epoch, Utils.mkSet(nonRespondedId), Optional.empty());
assertEquals(1, retries.size());
}
@@ -831,13 +833,14 @@ public class KafkaRaftClientTest {
// First poll has no high watermark advance
context.client.poll();
assertEquals(OptionalLong.empty(), context.client.highWatermark());
+ assertEquals(1L, context.log.endOffset().offset);
// Let follower send a fetch to initialize the high watermark,
// note the offset 0 would be a control message for becoming the leader
- context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 0L, epoch, 500));
+ context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 1L, epoch, 0));
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId));
- assertEquals(OptionalLong.of(0L), context.client.highWatermark());
+ assertEquals(OptionalLong.of(1L), context.client.highWatermark());
List<String> records = Arrays.asList("a", "b", "c");
long offset = context.client.scheduleAppend(epoch, records);
@@ -1483,7 +1486,18 @@ public class KafkaRaftClientTest {
RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
- context.buildFollowerSet(epoch, closeFollower, laggingFollower);
+ // The lagging follower fetches first
+ context.deliverRequest(context.fetchRequest(1, laggingFollower, 1L, epoch, 0));
+ context.pollUntilResponse();
+ context.assertSentFetchPartitionResponse(1L, epoch);
+
+ // Append some records, so that the close follower will be able to advance further.
+ context.client.scheduleAppend(epoch, Arrays.asList("foo", "bar"));
+ context.client.poll();
+
+ context.deliverRequest(context.fetchRequest(epoch, closeFollower, 3L, epoch, 0));
+ context.pollUntilResponse();
+ context.assertSentFetchPartitionResponse(3L, epoch);
// Now shutdown
context.client.shutdown(context.electionTimeoutMs() * 2);
@@ -1495,10 +1509,11 @@ public class KafkaRaftClientTest {
context.pollUntilRequest();
assertTrue(context.client.isRunning());
- List<RaftRequest.Outbound> endQuorumRequests = context.collectEndQuorumRequests(
- 1, Utils.mkSet(closeFollower, laggingFollower));
-
- assertEquals(2, endQuorumRequests.size());
+ context.collectEndQuorumRequests(
+ epoch,
+ Utils.mkSet(closeFollower, laggingFollower),
+ Optional.of(Arrays.asList(closeFollower, laggingFollower))
+ );
}
@Test
@@ -1511,22 +1526,27 @@ public class KafkaRaftClientTest {
RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
- context.buildFollowerSet(epoch, closeFollower, laggingFollower);
+ context.deliverRequest(context.fetchRequest(1, laggingFollower, 1L, epoch, 0));
+ context.pollUntilResponse();
+ context.assertSentFetchPartitionResponse(1L, epoch);
+
+ context.client.scheduleAppend(epoch, Arrays.asList("foo", "bar"));
+ context.client.poll();
+
+ context.deliverRequest(context.fetchRequest(epoch, closeFollower, 3L, epoch, 0));
+ context.pollUntilResponse();
+ context.assertSentFetchPartitionResponse(3L, epoch);
// Create observer
int observerId = 3;
context.deliverRequest(context.fetchRequest(epoch, observerId, 0L, 0, 0));
-
context.pollUntilResponse();
-
- long highWatermark = 1L;
- context.assertSentFetchPartitionResponse(highWatermark, epoch);
+ context.assertSentFetchPartitionResponse(3L, epoch);
context.deliverRequest(DescribeQuorumRequest.singletonRequest(context.metadataPartition));
-
context.pollUntilResponse();
- context.assertSentDescribeQuorumResponse(localId, epoch, highWatermark,
+ context.assertSentDescribeQuorumResponse(localId, epoch, 3L,
Arrays.asList(
new ReplicaState()
.setReplicaId(localId)
@@ -1535,10 +1555,10 @@ public class KafkaRaftClientTest {
.setLogEndOffset(3L),
new ReplicaState()
.setReplicaId(laggingFollower)
- .setLogEndOffset(0L),
+ .setLogEndOffset(1L),
new ReplicaState()
.setReplicaId(closeFollower)
- .setLogEndOffset(1L)),
+ .setLogEndOffset(3)),
singletonList(
new ReplicaState()
.setReplicaId(observerId)
@@ -2003,6 +2023,10 @@ public class KafkaRaftClientTest {
context.becomeLeader();
context.client.poll();
+ // After becoming leader, we expect the `LeaderChange` record to be appended
+ // in addition to the initial 9 records in the log.
+ assertEquals(10L, context.log.endOffset().offset);
+
// The high watermark is not known to the leader until the followers
// begin fetching, so we should not have fired the `handleClaim` callback.
assertEquals(OptionalInt.empty(), context.listener.currentClaimedEpoch());
@@ -2018,8 +2042,8 @@ public class KafkaRaftClientTest {
// Now catch up to the start of the leader epoch so that the high
// watermark advances and we can start sending committed data to the
- // listener.
- context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 9L, 2, 500));
+ // listener. Note that the `LeaderChange` control record is filtered.
+ context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 10L, epoch, 500));
context.client.poll();
assertEquals(OptionalInt.empty(), context.listener.currentClaimedEpoch());
assertEquals(3, context.listener.numCommittedBatches());
@@ -2054,11 +2078,12 @@ public class KafkaRaftClientTest {
context.becomeLeader();
context.client.poll();
+ assertEquals(10L, context.log.endOffset().offset);
// Let the initial listener catch up
- context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 9L, 2, 500));
+ context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 10L, epoch, 0));
context.client.poll();
- assertEquals(OptionalLong.of(9L), context.client.highWatermark());
+ assertEquals(OptionalLong.of(10L), context.client.highWatermark());
context.client.poll();
assertEquals(OptionalInt.of(epoch), context.listener.currentClaimedEpoch());
@@ -2184,16 +2209,18 @@ public class KafkaRaftClientTest {
// Start off as the leader and receive a fetch to initialize the high watermark
context.becomeLeader();
- context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 9L, epoch, 500));
+ assertEquals(10L, context.log.endOffset().offset);
+
+ context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 10L, epoch, 0));
context.pollUntilResponse();
- assertEquals(OptionalLong.of(9L), context.client.highWatermark());
+ assertEquals(OptionalLong.of(10L), context.client.highWatermark());
context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId));
// Now we receive a vote request which transitions us to the 'unattached' state
context.deliverRequest(context.voteRequest(epoch + 1, otherNodeId, epoch, 9L));
context.pollUntilResponse();
context.assertUnknownLeader(epoch + 1);
- assertEquals(OptionalLong.of(9L), context.client.highWatermark());
+ assertEquals(OptionalLong.of(10L), context.client.highWatermark());
// Timeout the election and become candidate
int candidateEpoch = epoch + 2;
diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
index 8ec91a7..2a97698 100644
--- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
@@ -58,27 +58,32 @@ public class LeaderStateTest {
public void testUpdateHighWatermarkQuorumSizeOne() {
LeaderState state = new LeaderState(localId, epoch, 15L, Collections.singleton(localId), Collections.emptySet());
assertEquals(Optional.empty(), state.highWatermark());
- assertTrue(state.updateLocalState(0, new LogOffsetMetadata(15L)));
- assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
+ assertFalse(state.updateLocalState(0, new LogOffsetMetadata(15L)));
+ assertEquals(Collections.emptySet(), state.nonAcknowledgingVoters());
+ assertEquals(Optional.empty(), state.highWatermark());
+ assertTrue(state.updateLocalState(0, new LogOffsetMetadata(16L)));
+ assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark());
+ assertTrue(state.updateLocalState(0, new LogOffsetMetadata(20)));
+ assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark());
}
@Test
public void testNonMonotonicEndOffsetUpdate() {
LeaderState state = new LeaderState(localId, epoch, 15L, Collections.singleton(localId), Collections.emptySet());
assertEquals(Optional.empty(), state.highWatermark());
- assertTrue(state.updateLocalState(0, new LogOffsetMetadata(15L)));
- assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
+ assertTrue(state.updateLocalState(0, new LogOffsetMetadata(16L)));
+ assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark());
assertThrows(IllegalArgumentException.class,
- () -> state.updateLocalState(0, new LogOffsetMetadata(14L)));
+ () -> state.updateLocalState(0, new LogOffsetMetadata(15L)));
}
@Test
public void testIdempotentEndOffsetUpdate() {
LeaderState state = new LeaderState(localId, epoch, 15L, Collections.singleton(localId), Collections.emptySet());
assertEquals(Optional.empty(), state.highWatermark());
- assertTrue(state.updateLocalState(0, new LogOffsetMetadata(15L)));
- assertFalse(state.updateLocalState(0, new LogOffsetMetadata(15L)));
- assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
+ assertTrue(state.updateLocalState(0, new LogOffsetMetadata(16L)));
+ assertFalse(state.updateLocalState(0, new LogOffsetMetadata(16L)));
+ assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark());
}
@Test
@@ -86,11 +91,11 @@ public class LeaderStateTest {
LeaderState state = new LeaderState(localId, epoch, 15L, Collections.singleton(localId), Collections.emptySet());
assertEquals(Optional.empty(), state.highWatermark());
- LogOffsetMetadata initialHw = new LogOffsetMetadata(15L, Optional.of(new MockOffsetMetadata("foo")));
+ LogOffsetMetadata initialHw = new LogOffsetMetadata(16L, Optional.of(new MockOffsetMetadata("bar")));
assertTrue(state.updateLocalState(0, initialHw));
assertEquals(Optional.of(initialHw), state.highWatermark());
- LogOffsetMetadata updateHw = new LogOffsetMetadata(15L, Optional.of(new MockOffsetMetadata("bar")));
+ LogOffsetMetadata updateHw = new LogOffsetMetadata(16L, Optional.of(new MockOffsetMetadata("baz")));
assertTrue(state.updateLocalState(0, updateHw));
assertEquals(Optional.of(updateHw), state.highWatermark());
}
@@ -99,25 +104,16 @@ public class LeaderStateTest {
public void testUpdateHighWatermarkQuorumSizeTwo() {
int otherNodeId = 1;
LeaderState state = new LeaderState(localId, epoch, 10L, mkSet(localId, otherNodeId), Collections.emptySet());
- assertFalse(state.updateLocalState(0, new LogOffsetMetadata(15L)));
- assertEquals(Optional.empty(), state.highWatermark());
- assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(10L)));
- assertEquals(Collections.emptySet(), state.nonAcknowledgingVoters());
- assertEquals(Optional.of(new LogOffsetMetadata(10L)), state.highWatermark());
- assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(15L)));
- assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
- }
-
- @Test
- public void testHighWatermarkUnknownUntilStartOfLeaderEpoch() {
- int otherNodeId = 1;
- LeaderState state = new LeaderState(localId, epoch, 15L, mkSet(localId, otherNodeId), Collections.emptySet());
- assertFalse(state.updateLocalState(0, new LogOffsetMetadata(20L)));
+ assertFalse(state.updateLocalState(0, new LogOffsetMetadata(13L)));
+ assertEquals(Collections.singleton(otherNodeId), state.nonAcknowledgingVoters());
assertEquals(Optional.empty(), state.highWatermark());
assertFalse(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(10L)));
+ assertEquals(Collections.emptySet(), state.nonAcknowledgingVoters());
assertEquals(Optional.empty(), state.highWatermark());
- assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(15L)));
- assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
+ assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(11L)));
+ assertEquals(Optional.of(new LogOffsetMetadata(11L)), state.highWatermark());
+ assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(13L)));
+ assertEquals(Optional.of(new LogOffsetMetadata(13L)), state.highWatermark());
}
@Test
@@ -126,17 +122,21 @@ public class LeaderStateTest {
int node2 = 2;
LeaderState state = new LeaderState(localId, epoch, 10L, mkSet(localId, node1, node2), Collections.emptySet());
assertFalse(state.updateLocalState(0, new LogOffsetMetadata(15L)));
+ assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters());
assertEquals(Optional.empty(), state.highWatermark());
- assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(10L)));
+ assertFalse(state.updateReplicaState(node1, 0, new LogOffsetMetadata(10L)));
assertEquals(Collections.singleton(node2), state.nonAcknowledgingVoters());
- assertEquals(Optional.of(new LogOffsetMetadata(10L)), state.highWatermark());
+ assertEquals(Optional.empty(), state.highWatermark());
+ assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(10L)));
+ assertEquals(Collections.emptySet(), state.nonAcknowledgingVoters());
+ assertEquals(Optional.empty(), state.highWatermark());
assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L)));
assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
assertFalse(state.updateLocalState(0, new LogOffsetMetadata(20L)));
assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
- assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(20L)));
+ assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(20L)));
assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark());
- assertFalse(state.updateReplicaState(node1, 0, new LogOffsetMetadata(20L)));
+ assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(20L)));
assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark());
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 3683a8e..3f19913 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -63,7 +63,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -561,7 +560,7 @@ public final class RaftClientTestContext {
int assertSentEndQuorumEpochRequest(int epoch, int destinationId) {
List<RaftRequest.Outbound> endQuorumRequests = collectEndQuorumRequests(
- epoch, Collections.singleton(destinationId));
+ epoch, Collections.singleton(destinationId), Optional.empty());
assertEquals(1, endQuorumRequests.size());
return endQuorumRequests.get(0).correlationId();
}
@@ -683,28 +682,11 @@ public final class RaftClientTestContext {
return FetchSnapshotResponse.forTopicPartition(response, topicPartition);
}
- void buildFollowerSet(
+ List<RaftRequest.Outbound> collectEndQuorumRequests(
int epoch,
- int closeFollower,
- int laggingFollower
- ) throws Exception {
- // The lagging follower fetches first
- deliverRequest(fetchRequest(1, laggingFollower, 0L, 0, 0));
-
- pollUntilResponse();
- assertSentFetchPartitionResponse(0L, epoch);
-
- // Append some records, so that the close follower will be able to advance further.
- client.scheduleAppend(epoch, Arrays.asList("foo", "bar"));
- client.poll();
-
- deliverRequest(fetchRequest(epoch, closeFollower, 1L, epoch, 0));
-
- pollUntilResponse();
- assertSentFetchPartitionResponse(1L, epoch);
- }
-
- List<RaftRequest.Outbound> collectEndQuorumRequests(int epoch, Set<Integer> destinationIdSet) {
+ Set<Integer> destinationIdSet,
+ Optional<List<Integer>> preferredSuccessorsOpt
+ ) {
List<RaftRequest.Outbound> endQuorumRequests = new ArrayList<>();
Set<Integer> collectedDestinationIdSet = new HashSet<>();
for (RaftMessage raftMessage : channel.drainSendQueue()) {
@@ -716,6 +698,9 @@ public final class RaftClientTestContext {
assertEquals(epoch, partitionRequest.leaderEpoch());
assertEquals(localIdOrThrow(), partitionRequest.leaderId());
+ preferredSuccessorsOpt.ifPresent(preferredSuccessors -> {
+ assertEquals(preferredSuccessors, partitionRequest.preferredSuccessors());
+ });
RaftRequest.Outbound outboundRequest = (RaftRequest.Outbound) raftMessage;
collectedDestinationIdSet.add(outboundRequest.destinationId());