You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/21 03:52:40 UTC

[GitHub] [kafka] abbccdda commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

abbccdda commented on a change in pull request #10309:
URL: https://github.com/apache/kafka/pull/10309#discussion_r598134855



##########
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##########
@@ -880,6 +931,10 @@ public void verify() {
                 PersistentState state = nodeEntry.getValue();
                 ElectionState electionState = state.store.readElectionState();
 
+                if (electionState == null) {

Review comment:
       Could be merged with next condition: 
   `electionState != null && electionState.epoch >= epoch && electionState.hasLeader()`

##########
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##########
@@ -170,36 +183,38 @@ public boolean updateReplicaState(int replicaId,
             .collect(Collectors.toList());
     }
 
-    private List<VoterState> followersByDescendingFetchOffset() {
-        return new ArrayList<>(this.voterReplicaStates.values()).stream()
+    private List<ReplicaState> followersByDescendingFetchOffset() {
+        return new ArrayList<>(this.voterStates.values()).stream()
             .sorted()
             .collect(Collectors.toList());
     }
 
     private boolean updateEndOffset(ReplicaState state,
                                     LogOffsetMetadata endOffsetMetadata) {
         state.endOffset.ifPresent(currentEndOffset -> {
-            if (currentEndOffset.offset > endOffsetMetadata.offset)
-                throw new IllegalArgumentException("Non-monotonic update to end offset for nodeId " + state.nodeId);
+            if (currentEndOffset.offset > endOffsetMetadata.offset) {
+                if (state.nodeId == localId) {
+                    throw new IllegalStateException("Detected non-monotonic update of local " +
+                        "end offset: " + currentEndOffset.offset + " -> " + endOffsetMetadata.offset);
+                } else {
+                    log.warn("Detected non-monotonic update of fetch offset from nodeId {}: {} -> {}",

Review comment:
       I wonder whether the current approach is too loose. Maybe this is already done, but do we want to inform failed replica to cleanup or truncate in the FetchResponse?

##########
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##########
@@ -33,22 +36,25 @@
  * they acknowledge the leader.
  */
 public class LeaderState implements EpochState {
+    static final long OBSERVER_SESSION_TIMEOUT_MS = 300_000L;
+
     private final int localId;
     private final int epoch;
     private final long epochStartOffset;
 
     private Optional<LogOffsetMetadata> highWatermark;
-    private final Map<Integer, VoterState> voterReplicaStates = new HashMap<>();
-    private final Map<Integer, ReplicaState> observerReplicaStates = new HashMap<>();
+    private final Map<Integer, ReplicaState> voterStates = new HashMap<>();
+    private final Map<Integer, ReplicaState> observerStates = new HashMap<>();
     private final Set<Integer> grantingVoters = new HashSet<>();
-    private static final long OBSERVER_SESSION_TIMEOUT_MS = 300_000L;
+    private final Logger log;
 
     protected LeaderState(
         int localId,
         int epoch,
         long epochStartOffset,
         Set<Integer> voters,
-        Set<Integer> grantingVoters
+        Set<Integer> grantingVoters,
+        LogContext logContext

Review comment:
       Just for my own education, when is it preferable to use upper class log context vs creating own log context?

##########
File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
##########
@@ -184,26 +220,40 @@ private LeaderState setUpLeaderAndFollowers(int follower1,
     @Test
     public void testGetObserverStatesWithObserver() {
         int observerId = 10;
-        long endOffset = 10L;
+        long epochStartOffset = 10L;
 
-        LeaderState state = new LeaderState(localId, epoch, endOffset, mkSet(localId), Collections.emptySet());
+        LeaderState state = newLeaderState(mkSet(localId), epochStartOffset);
         long timestamp = 20L;
-        assertFalse(state.updateReplicaState(observerId, timestamp, new LogOffsetMetadata(endOffset)));
+        assertFalse(state.updateReplicaState(observerId, timestamp, new LogOffsetMetadata(epochStartOffset)));
 
-        assertEquals(Collections.singletonMap(observerId, endOffset), state.getObserverStates(timestamp));
+        assertEquals(Collections.singletonMap(observerId, epochStartOffset), state.getObserverStates(timestamp));
     }
 
     @Test
     public void testNoOpForNegativeRemoteNodeId() {
         int observerId = -1;
-        long endOffset = 10L;
+        long epochStartOffset = 10L;

Review comment:
       So this offset was named wrong previously?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org