You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/05 19:07:34 UTC

[GitHub] [kafka] jsancio commented on a change in pull request #10468: Kafka 12373:Improve KafkaRaftClient handling of graceful shutdown

jsancio commented on a change in pull request #10468:
URL: https://github.com/apache/kafka/pull/10468#discussion_r607271981



##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -1673,6 +1673,69 @@ public void testLeaderGracefulShutdownTimeout() throws Exception {
         assertFutureThrows(shutdownFuture, TimeoutException.class);
     }
 
+    @Test
+    public void testLeaderGracefulShutdownOnClose() throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        int lingerMs = 50;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+            .withAppendLingerMs(lingerMs)
+            .build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+        assertEquals(1L, context.log.endOffset().offset);
+
+        int epoch = context.currentEpoch();
+        assertEquals(1L, context.client.scheduleAppend(epoch, singletonList("a")));
+
+        context.client.poll();
+        assertEquals(OptionalLong.of(lingerMs), context.messageQueue.lastPollTimeoutMs());
+
+        context.time.sleep(20);
+
+        // client closed now.
+        context.client.close();
+
+        // Flag for accepting appends should be toggled to false.
+        assertFalse(context.client.canAcceptAppends());
+
+        // acceptAppends flag set to false so no writes should be accepted by the Leader now.
+        assertNull(context.client.scheduleAppend(epoch, singletonList("b")));
+
+        // The leader should trigger a flush for whatever batches are present in the BatchAccumulator
+        assertEquals(2L, context.log.endOffset().offset);
+
+        // Now shutdown
+
+        // We should still be running until we have had a chance to send EndQuorumEpoch
+        assertTrue(context.client.isShuttingDown());
+        assertTrue(context.client.isRunning());
+
+        // Send EndQuorumEpoch request to the other voter
+        context.pollUntilRequest();
+        assertTrue(context.client.isShuttingDown());
+        assertTrue(context.client.isRunning());
+        context.assertSentEndQuorumEpochRequest(1, otherNodeId);
+
+        // We should still be able to handle vote requests during graceful shutdown
+        // in order to help the new leader get elected
+        context.deliverRequest(context.voteRequest(epoch + 1, otherNodeId, epoch, 1L));
+        context.client.poll();
+        context.assertSentVoteResponse(Errors.NONE, epoch + 1, OptionalInt.empty(), true);
+

Review comment:
       Okay, thanks! I have limited time at the moment. I'll try to look at it this week.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org