You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/12 22:14:16 UTC

[GitHub] [kafka] hachikuji opened a new pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

hachikuji opened a new pull request #10309:
URL: https://github.com/apache/kafka/pull/10309


   Currently the Raft leader raises an exception if there is a non-monotonic update to the fetch offset of a replica. In a situation where the replica had lost it disk state, this would prevent the replica from being able to recover. In this patch, we relax the validation to address this problem. It is worth pointing out that this validation could not be relied on to protect from data loss after a voter has lost committed state.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10309:
URL: https://github.com/apache/kafka/pull/10309#discussion_r594787145



##########
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##########
@@ -401,6 +401,51 @@ private void checkBackToBackLeaderFailures(QuorumConfig config) {
         }
     }
 
+    @Test
+    public void checkSingleNodeCommittedDataLossQuorumSizeThree() {
+        checkSingleNodeCommittedDataLoss(new QuorumConfig(3, 0));
+    }
+
+    private void checkSingleNodeCommittedDataLoss(QuorumConfig config) {
+        assertTrue(config.numVoters > 2,
+            "This test requires the cluster to be able to recover from one failed node");
+
+        for (int seed = 0; seed < 100; seed++) {
+            // We run this test without the `MonotonicEpoch` and `MajorityReachedHighWatermark`
+            // invariants since the loss of committed data on one node can violate them.
+
+            Cluster cluster = new Cluster(config, seed);
+            EventScheduler scheduler = new EventScheduler(cluster.random, cluster.time);
+            scheduler.addInvariant(new MonotonicHighWatermark(cluster));
+            scheduler.addInvariant(new SingleLeader(cluster));
+            scheduler.addValidation(new ConsistentCommittedData(cluster));
+
+            MessageRouter router = new MessageRouter(cluster);
+
+            cluster.startAll();
+            schedulePolling(scheduler, cluster, 3, 5);
+            scheduler.schedule(router::deliverAll, 0, 2, 5);
+            scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
+            scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10));
+
+            RaftNode node = cluster.randomRunning().orElseThrow(() ->
+                new AssertionError("Failed to find running node")
+            );
+
+            // Kill a random node and drop all of its persistent state. The Raft
+            // protocol guarantees should still ensure we lose no committed data
+            // as long as a new leader is elected before the failed node is restarted.
+            cluster.kill(node.nodeId);
+            cluster.deletePersistentState(node.nodeId);
+            scheduler.runUntil(() -> !cluster.hasLeader(node.nodeId) && cluster.hasConsistentLeader());

Review comment:
       Got it. I missed that `votedIdOpt` is set to `empty` by the leader and the followers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10309:
URL: https://github.com/apache/kafka/pull/10309#discussion_r596330007



##########
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##########
@@ -401,6 +401,51 @@ private void checkBackToBackLeaderFailures(QuorumConfig config) {
         }
     }
 
+    @Test
+    public void checkSingleNodeCommittedDataLossQuorumSizeThree() {
+        checkSingleNodeCommittedDataLoss(new QuorumConfig(3, 0));
+    }
+
+    private void checkSingleNodeCommittedDataLoss(QuorumConfig config) {
+        assertTrue(config.numVoters > 2,
+            "This test requires the cluster to be able to recover from one failed node");
+
+        for (int seed = 0; seed < 100; seed++) {
+            // We run this test without the `MonotonicEpoch` and `MajorityReachedHighWatermark`
+            // invariants since the loss of committed data on one node can violate them.
+
+            Cluster cluster = new Cluster(config, seed);
+            EventScheduler scheduler = new EventScheduler(cluster.random, cluster.time);
+            scheduler.addInvariant(new MonotonicHighWatermark(cluster));
+            scheduler.addInvariant(new SingleLeader(cluster));
+            scheduler.addValidation(new ConsistentCommittedData(cluster));
+
+            MessageRouter router = new MessageRouter(cluster);
+
+            cluster.startAll();
+            schedulePolling(scheduler, cluster, 3, 5);
+            scheduler.schedule(router::deliverAll, 0, 2, 5);
+            scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
+            scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10));
+
+            RaftNode node = cluster.randomRunning().orElseThrow(() ->
+                new AssertionError("Failed to find running node")
+            );
+
+            // Kill a random node and drop all of its persistent state. The Raft
+            // protocol guarantees should still ensure we lose no committed data
+            // as long as a new leader is elected before the failed node is restarted.
+            cluster.kill(node.nodeId);
+            cluster.deletePersistentState(node.nodeId);

Review comment:
       Good idea. Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10309:
URL: https://github.com/apache/kafka/pull/10309#discussion_r594638669



##########
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##########
@@ -401,6 +401,51 @@ private void checkBackToBackLeaderFailures(QuorumConfig config) {
         }
     }
 
+    @Test
+    public void checkSingleNodeCommittedDataLossQuorumSizeThree() {
+        checkSingleNodeCommittedDataLoss(new QuorumConfig(3, 0));
+    }
+
+    private void checkSingleNodeCommittedDataLoss(QuorumConfig config) {
+        assertTrue(config.numVoters > 2,
+            "This test requires the cluster to be able to recover from one failed node");
+
+        for (int seed = 0; seed < 100; seed++) {
+            // We run this test without the `MonotonicEpoch` and `MajorityReachedHighWatermark`
+            // invariants since the loss of committed data on one node can violate them.
+
+            Cluster cluster = new Cluster(config, seed);
+            EventScheduler scheduler = new EventScheduler(cluster.random, cluster.time);
+            scheduler.addInvariant(new MonotonicHighWatermark(cluster));
+            scheduler.addInvariant(new SingleLeader(cluster));
+            scheduler.addValidation(new ConsistentCommittedData(cluster));
+
+            MessageRouter router = new MessageRouter(cluster);
+
+            cluster.startAll();
+            schedulePolling(scheduler, cluster, 3, 5);
+            scheduler.schedule(router::deliverAll, 0, 2, 5);
+            scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
+            scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10));
+
+            RaftNode node = cluster.randomRunning().orElseThrow(() ->
+                new AssertionError("Failed to find running node")
+            );
+
+            // Kill a random node and drop all of its persistent state. The Raft
+            // protocol guarantees should still ensure we lose no committed data
+            // as long as a new leader is elected before the failed node is restarted.
+            cluster.kill(node.nodeId);
+            cluster.deletePersistentState(node.nodeId);
+            scheduler.runUntil(() -> !cluster.hasLeader(node.nodeId) && cluster.hasConsistentLeader());

Review comment:
       Yes. I meant to say `ElectionState` instead of `LeaderState`. `ElectionState` has a field called `votedIdOpt` for which `equals` checks for equality. This is not strictly required for having a "consistent" leader. I think for having a consistent leader for an epoch, only the `epoch` and `leaderIdOpt` need to match for all of the replicas.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10309:
URL: https://github.com/apache/kafka/pull/10309#discussion_r598980860



##########
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##########
@@ -33,22 +36,25 @@
  * they acknowledge the leader.
  */
 public class LeaderState implements EpochState {
+    static final long OBSERVER_SESSION_TIMEOUT_MS = 300_000L;
+
     private final int localId;
     private final int epoch;
     private final long epochStartOffset;
 
     private Optional<LogOffsetMetadata> highWatermark;
-    private final Map<Integer, VoterState> voterReplicaStates = new HashMap<>();
-    private final Map<Integer, ReplicaState> observerReplicaStates = new HashMap<>();
+    private final Map<Integer, ReplicaState> voterStates = new HashMap<>();
+    private final Map<Integer, ReplicaState> observerStates = new HashMap<>();
     private final Set<Integer> grantingVoters = new HashSet<>();
-    private static final long OBSERVER_SESSION_TIMEOUT_MS = 300_000L;
+    private final Logger log;
 
     protected LeaderState(
         int localId,
         int epoch,
         long epochStartOffset,
         Set<Integer> voters,
-        Set<Integer> grantingVoters
+        Set<Integer> grantingVoters,
+        LogContext logContext

Review comment:
       The log context is useful because it carries with it a logging prefix which can be used to distinguish log messages. For example, in a streams application, the fact that we have multiple producers can make debugging difficult. Or in the context of integration/system/simulation testing, we often get logs from multiple nodes mixed together. With a common prefix, it is easy to grep messages for a particular instance so long as the `LogContext` is carried through to all the dependencies. Sometimes it is a little annoying to add the extra parameter, but it is worthwhile for improved debugging whenever the parent object already has a log context.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #10309:
URL: https://github.com/apache/kafka/pull/10309#discussion_r598134855



##########
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##########
@@ -880,6 +931,10 @@ public void verify() {
                 PersistentState state = nodeEntry.getValue();
                 ElectionState electionState = state.store.readElectionState();
 
+                if (electionState == null) {

Review comment:
       Could be merged with next condition: 
   `electionState != null && electionState.epoch >= epoch && electionState.hasLeader()`

##########
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##########
@@ -170,36 +183,38 @@ public boolean updateReplicaState(int replicaId,
             .collect(Collectors.toList());
     }
 
-    private List<VoterState> followersByDescendingFetchOffset() {
-        return new ArrayList<>(this.voterReplicaStates.values()).stream()
+    private List<ReplicaState> followersByDescendingFetchOffset() {
+        return new ArrayList<>(this.voterStates.values()).stream()
             .sorted()
             .collect(Collectors.toList());
     }
 
     private boolean updateEndOffset(ReplicaState state,
                                     LogOffsetMetadata endOffsetMetadata) {
         state.endOffset.ifPresent(currentEndOffset -> {
-            if (currentEndOffset.offset > endOffsetMetadata.offset)
-                throw new IllegalArgumentException("Non-monotonic update to end offset for nodeId " + state.nodeId);
+            if (currentEndOffset.offset > endOffsetMetadata.offset) {
+                if (state.nodeId == localId) {
+                    throw new IllegalStateException("Detected non-monotonic update of local " +
+                        "end offset: " + currentEndOffset.offset + " -> " + endOffsetMetadata.offset);
+                } else {
+                    log.warn("Detected non-monotonic update of fetch offset from nodeId {}: {} -> {}",

Review comment:
       I wonder whether the current approach is too loose. Maybe this is already done, but do we want to inform failed replica to cleanup or truncate in the FetchResponse?

##########
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##########
@@ -33,22 +36,25 @@
  * they acknowledge the leader.
  */
 public class LeaderState implements EpochState {
+    static final long OBSERVER_SESSION_TIMEOUT_MS = 300_000L;
+
     private final int localId;
     private final int epoch;
     private final long epochStartOffset;
 
     private Optional<LogOffsetMetadata> highWatermark;
-    private final Map<Integer, VoterState> voterReplicaStates = new HashMap<>();
-    private final Map<Integer, ReplicaState> observerReplicaStates = new HashMap<>();
+    private final Map<Integer, ReplicaState> voterStates = new HashMap<>();
+    private final Map<Integer, ReplicaState> observerStates = new HashMap<>();
     private final Set<Integer> grantingVoters = new HashSet<>();
-    private static final long OBSERVER_SESSION_TIMEOUT_MS = 300_000L;
+    private final Logger log;
 
     protected LeaderState(
         int localId,
         int epoch,
         long epochStartOffset,
         Set<Integer> voters,
-        Set<Integer> grantingVoters
+        Set<Integer> grantingVoters,
+        LogContext logContext

Review comment:
       Just for my own education, when is it preferable to use upper class log context vs creating own log context?

##########
File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
##########
@@ -184,26 +220,40 @@ private LeaderState setUpLeaderAndFollowers(int follower1,
     @Test
     public void testGetObserverStatesWithObserver() {
         int observerId = 10;
-        long endOffset = 10L;
+        long epochStartOffset = 10L;
 
-        LeaderState state = new LeaderState(localId, epoch, endOffset, mkSet(localId), Collections.emptySet());
+        LeaderState state = newLeaderState(mkSet(localId), epochStartOffset);
         long timestamp = 20L;
-        assertFalse(state.updateReplicaState(observerId, timestamp, new LogOffsetMetadata(endOffset)));
+        assertFalse(state.updateReplicaState(observerId, timestamp, new LogOffsetMetadata(epochStartOffset)));
 
-        assertEquals(Collections.singletonMap(observerId, endOffset), state.getObserverStates(timestamp));
+        assertEquals(Collections.singletonMap(observerId, epochStartOffset), state.getObserverStates(timestamp));
     }
 
     @Test
     public void testNoOpForNegativeRemoteNodeId() {
         int observerId = -1;
-        long endOffset = 10L;
+        long epochStartOffset = 10L;

Review comment:
       So this offset was named wrong previously?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10309:
URL: https://github.com/apache/kafka/pull/10309#discussion_r598983673



##########
File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
##########
@@ -184,26 +220,40 @@ private LeaderState setUpLeaderAndFollowers(int follower1,
     @Test
     public void testGetObserverStatesWithObserver() {
         int observerId = 10;
-        long endOffset = 10L;
+        long epochStartOffset = 10L;
 
-        LeaderState state = new LeaderState(localId, epoch, endOffset, mkSet(localId), Collections.emptySet());
+        LeaderState state = newLeaderState(mkSet(localId), epochStartOffset);
         long timestamp = 20L;
-        assertFalse(state.updateReplicaState(observerId, timestamp, new LogOffsetMetadata(endOffset)));
+        assertFalse(state.updateReplicaState(observerId, timestamp, new LogOffsetMetadata(epochStartOffset)));
 
-        assertEquals(Collections.singletonMap(observerId, endOffset), state.getObserverStates(timestamp));
+        assertEquals(Collections.singletonMap(observerId, epochStartOffset), state.getObserverStates(timestamp));
     }
 
     @Test
     public void testNoOpForNegativeRemoteNodeId() {
         int observerId = -1;
-        long endOffset = 10L;
+        long epochStartOffset = 10L;

Review comment:
       I'm not sure I'd call it wrong. The epoch start offset is initialized as the current log end offset. But I thought it was better to choose a more explicit name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10309:
URL: https://github.com/apache/kafka/pull/10309#discussion_r598986869



##########
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##########
@@ -170,36 +183,38 @@ public boolean updateReplicaState(int replicaId,
             .collect(Collectors.toList());
     }
 
-    private List<VoterState> followersByDescendingFetchOffset() {
-        return new ArrayList<>(this.voterReplicaStates.values()).stream()
+    private List<ReplicaState> followersByDescendingFetchOffset() {
+        return new ArrayList<>(this.voterStates.values()).stream()
             .sorted()
             .collect(Collectors.toList());
     }
 
     private boolean updateEndOffset(ReplicaState state,
                                     LogOffsetMetadata endOffsetMetadata) {
         state.endOffset.ifPresent(currentEndOffset -> {
-            if (currentEndOffset.offset > endOffsetMetadata.offset)
-                throw new IllegalArgumentException("Non-monotonic update to end offset for nodeId " + state.nodeId);
+            if (currentEndOffset.offset > endOffsetMetadata.offset) {
+                if (state.nodeId == localId) {
+                    throw new IllegalStateException("Detected non-monotonic update of local " +
+                        "end offset: " + currentEndOffset.offset + " -> " + endOffsetMetadata.offset);
+                } else {
+                    log.warn("Detected non-monotonic update of fetch offset from nodeId {}: {} -> {}",

Review comment:
       The situation we are trying to handle is when a follower loses its disk. Basically the damage is already done by the time we receive the Fetch and the only thing we can do is let the follower try to catch back up. The problem with the old logic is that it prevented this even in situations which would not violate guarantees. I am planning to file a follow-up jira to think of some ways to handle disk loss situations more generally. We would like to at least detect the situation and see if we can prevent it from causing too much damage.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10309:
URL: https://github.com/apache/kafka/pull/10309#discussion_r594599970



##########
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##########
@@ -401,6 +401,51 @@ private void checkBackToBackLeaderFailures(QuorumConfig config) {
         }
     }
 
+    @Test
+    public void checkSingleNodeCommittedDataLossQuorumSizeThree() {
+        checkSingleNodeCommittedDataLoss(new QuorumConfig(3, 0));
+    }
+
+    private void checkSingleNodeCommittedDataLoss(QuorumConfig config) {
+        assertTrue(config.numVoters > 2,
+            "This test requires the cluster to be able to recover from one failed node");
+
+        for (int seed = 0; seed < 100; seed++) {
+            // We run this test without the `MonotonicEpoch` and `MajorityReachedHighWatermark`
+            // invariants since the loss of committed data on one node can violate them.
+
+            Cluster cluster = new Cluster(config, seed);
+            EventScheduler scheduler = new EventScheduler(cluster.random, cluster.time);
+            scheduler.addInvariant(new MonotonicHighWatermark(cluster));
+            scheduler.addInvariant(new SingleLeader(cluster));
+            scheduler.addValidation(new ConsistentCommittedData(cluster));
+
+            MessageRouter router = new MessageRouter(cluster);
+
+            cluster.startAll();
+            schedulePolling(scheduler, cluster, 3, 5);
+            scheduler.schedule(router::deliverAll, 0, 2, 5);
+            scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
+            scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10));
+
+            RaftNode node = cluster.randomRunning().orElseThrow(() ->
+                new AssertionError("Failed to find running node")
+            );
+
+            // Kill a random node and drop all of its persistent state. The Raft
+            // protocol guarantees should still ensure we lose no committed data
+            // as long as a new leader is elected before the failed node is restarted.
+            cluster.kill(node.nodeId);
+            cluster.deletePersistentState(node.nodeId);
+            scheduler.runUntil(() -> !cluster.hasLeader(node.nodeId) && cluster.hasConsistentLeader());

Review comment:
       It is checking consistent `ElectionState`, which is basically the same as verifying all `quorum-state` files match.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10309:
URL: https://github.com/apache/kafka/pull/10309#discussion_r594031740



##########
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##########
@@ -401,6 +401,51 @@ private void checkBackToBackLeaderFailures(QuorumConfig config) {
         }
     }
 
+    @Test
+    public void checkSingleNodeCommittedDataLossQuorumSizeThree() {
+        checkSingleNodeCommittedDataLoss(new QuorumConfig(3, 0));
+    }
+
+    private void checkSingleNodeCommittedDataLoss(QuorumConfig config) {
+        assertTrue(config.numVoters > 2,
+            "This test requires the cluster to be able to recover from one failed node");
+
+        for (int seed = 0; seed < 100; seed++) {
+            // We run this test without the `MonotonicEpoch` and `MajorityReachedHighWatermark`
+            // invariants since the loss of committed data on one node can violate them.
+
+            Cluster cluster = new Cluster(config, seed);
+            EventScheduler scheduler = new EventScheduler(cluster.random, cluster.time);
+            scheduler.addInvariant(new MonotonicHighWatermark(cluster));
+            scheduler.addInvariant(new SingleLeader(cluster));
+            scheduler.addValidation(new ConsistentCommittedData(cluster));
+
+            MessageRouter router = new MessageRouter(cluster);
+
+            cluster.startAll();
+            schedulePolling(scheduler, cluster, 3, 5);
+            scheduler.schedule(router::deliverAll, 0, 2, 5);
+            scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
+            scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10));
+
+            RaftNode node = cluster.randomRunning().orElseThrow(() ->
+                new AssertionError("Failed to find running node")
+            );
+
+            // Kill a random node and drop all of its persistent state. The Raft
+            // protocol guarantees should still ensure we lose no committed data
+            // as long as a new leader is elected before the failed node is restarted.
+            cluster.kill(node.nodeId);
+            cluster.deletePersistentState(node.nodeId);
+            scheduler.runUntil(() -> !cluster.hasLeader(node.nodeId) && cluster.hasConsistentLeader());

Review comment:
       Hmm. I was looking at the implementation for `hasConsistentLeader`. It checks that all of the `LeaderState` match. Which means that all of the replicas need to vote for the same leader. This is not strictly required for having a consistent leader.
   
   Maybe this works in this test because the number of voters is 3 and one of the nodes was killed.

##########
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##########
@@ -401,6 +401,51 @@ private void checkBackToBackLeaderFailures(QuorumConfig config) {
         }
     }
 
+    @Test
+    public void checkSingleNodeCommittedDataLossQuorumSizeThree() {
+        checkSingleNodeCommittedDataLoss(new QuorumConfig(3, 0));
+    }
+
+    private void checkSingleNodeCommittedDataLoss(QuorumConfig config) {
+        assertTrue(config.numVoters > 2,
+            "This test requires the cluster to be able to recover from one failed node");
+
+        for (int seed = 0; seed < 100; seed++) {
+            // We run this test without the `MonotonicEpoch` and `MajorityReachedHighWatermark`
+            // invariants since the loss of committed data on one node can violate them.
+
+            Cluster cluster = new Cluster(config, seed);
+            EventScheduler scheduler = new EventScheduler(cluster.random, cluster.time);
+            scheduler.addInvariant(new MonotonicHighWatermark(cluster));
+            scheduler.addInvariant(new SingleLeader(cluster));
+            scheduler.addValidation(new ConsistentCommittedData(cluster));
+
+            MessageRouter router = new MessageRouter(cluster);
+
+            cluster.startAll();
+            schedulePolling(scheduler, cluster, 3, 5);
+            scheduler.schedule(router::deliverAll, 0, 2, 5);
+            scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
+            scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10));
+
+            RaftNode node = cluster.randomRunning().orElseThrow(() ->
+                new AssertionError("Failed to find running node")
+            );
+
+            // Kill a random node and drop all of its persistent state. The Raft
+            // protocol guarantees should still ensure we lose no committed data
+            // as long as a new leader is elected before the failed node is restarted.
+            cluster.kill(node.nodeId);
+            cluster.deletePersistentState(node.nodeId);

Review comment:
       The implementation for `deletePersistentState` assumes that `kill` was or will be called for the change to take effect. Should we instead have tests call a method called `killAndDeletePersistentState`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10309:
URL: https://github.com/apache/kafka/pull/10309#discussion_r593492361



##########
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##########
@@ -247,7 +267,7 @@ private boolean isVoter(int remoteNodeId) {
         return voterReplicaStates.containsKey(remoteNodeId);
     }
 
-    private static class ReplicaState implements Comparable<ReplicaState> {
+    private static abstract class ReplicaState implements Comparable<ReplicaState> {

Review comment:
       Do we really need to distinguish between `VoterState` and `ObserverState`? For example, the only different is `hasAcknowledgedLeader`. I would argue that we could just move this field to `ReplicateState` and say that observers will have this value always false or the value is ignored.
   
   I am leaning towards just updating the value irrespective of if it is a voter or observer. This is probably useful to have it when we implement quorum reassignment. We can document whatever semantic you decide as a comment for this type.

##########
File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
##########
@@ -140,6 +160,22 @@ public void testUpdateHighWatermarkQuorumSizeThree() {
         assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark());
     }
 
+    @Test
+    public void testNonMonotonicHighWatermarkUpdate() {
+        MockTime time = new MockTime();
+        int node1 = 1;
+        LeaderState state = newLeaderState(mkSet(localId, node1), 0L);
+        state.updateLocalState(time.milliseconds(), new LogOffsetMetadata(10L));
+        state.updateReplicaState(node1, time.milliseconds(), new LogOffsetMetadata(10L));
+        assertEquals(Optional.of(new LogOffsetMetadata(10L)), state.highWatermark());
+
+        // Follower crashes and disk is lost. It fetches an earlier offset to rebuild state.
+        state.updateReplicaState(node1, time.milliseconds(), new LogOffsetMetadata(5L));

Review comment:
       Let's check that this calls returns `false`.
   
   Let's also add a test that calls `getVoterEndOffsets` and checks the returned map is correct.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10309:
URL: https://github.com/apache/kafka/pull/10309#discussion_r594725779



##########
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##########
@@ -401,6 +401,51 @@ private void checkBackToBackLeaderFailures(QuorumConfig config) {
         }
     }
 
+    @Test
+    public void checkSingleNodeCommittedDataLossQuorumSizeThree() {
+        checkSingleNodeCommittedDataLoss(new QuorumConfig(3, 0));
+    }
+
+    private void checkSingleNodeCommittedDataLoss(QuorumConfig config) {
+        assertTrue(config.numVoters > 2,
+            "This test requires the cluster to be able to recover from one failed node");
+
+        for (int seed = 0; seed < 100; seed++) {
+            // We run this test without the `MonotonicEpoch` and `MajorityReachedHighWatermark`
+            // invariants since the loss of committed data on one node can violate them.
+
+            Cluster cluster = new Cluster(config, seed);
+            EventScheduler scheduler = new EventScheduler(cluster.random, cluster.time);
+            scheduler.addInvariant(new MonotonicHighWatermark(cluster));
+            scheduler.addInvariant(new SingleLeader(cluster));
+            scheduler.addValidation(new ConsistentCommittedData(cluster));
+
+            MessageRouter router = new MessageRouter(cluster);
+
+            cluster.startAll();
+            schedulePolling(scheduler, cluster, 3, 5);
+            scheduler.schedule(router::deliverAll, 0, 2, 5);
+            scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
+            scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10));
+
+            RaftNode node = cluster.randomRunning().orElseThrow(() ->
+                new AssertionError("Failed to find running node")
+            );
+
+            // Kill a random node and drop all of its persistent state. The Raft
+            // protocol guarantees should still ensure we lose no committed data
+            // as long as a new leader is elected before the failed node is restarted.
+            cluster.kill(node.nodeId);
+            cluster.deletePersistentState(node.nodeId);
+            scheduler.runUntil(() -> !cluster.hasLeader(node.nodeId) && cluster.hasConsistentLeader());

Review comment:
       It amounts to the same thing because `votedIdOpt` is only set when the election outcome has not been determined.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #10309:
URL: https://github.com/apache/kafka/pull/10309


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10309:
URL: https://github.com/apache/kafka/pull/10309#discussion_r593509351



##########
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##########
@@ -247,7 +267,7 @@ private boolean isVoter(int remoteNodeId) {
         return voterReplicaStates.containsKey(remoteNodeId);
     }
 
-    private static class ReplicaState implements Comparable<ReplicaState> {
+    private static abstract class ReplicaState implements Comparable<ReplicaState> {

Review comment:
       Good point.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org