You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/21 22:44:47 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

hachikuji commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r509760723



##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -1729,126 +1695,156 @@ public void testLeaderGracefulShutdownTimeout() throws Exception {
     public void testFollowerGracefulShutdown() throws Exception {
         int otherNodeId = 1;
         int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
 
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
-        quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters));
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), quorumStateStore.readElectionState());
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {

Review comment:
       I think nearly every call to `updateQuorumStateStore` is just writing an initial state. Seems like we can introduce a more direct option to the builder.
   
   By the way, one of the annoyances is needing to provide `voters` through the initial state and through `build` below. Since we always need `voters`, maybe we can provide it in the builder constructor. That would allow us to add helpers to construct the state. For example, we could turn this into:
   
   ```java
   new RaftClientTestContext.Builder(voters)
     .initializeAsFollower(epoch, otherNodeId)
     .build()
   ```
   
   Similarly, we could probably do state assertions in the test context as well and save the need to always pass through `voters` (e.g. we could have `context.assertFollower(epoch, leaderId)` instead of the cumbersome `assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), context.quorumStateStore.readElectionState())`).

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -1729,126 +1695,156 @@ public void testLeaderGracefulShutdownTimeout() throws Exception {
     public void testFollowerGracefulShutdown() throws Exception {
         int otherNodeId = 1;
         int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
 
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
-        quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters));
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), quorumStateStore.readElectionState());
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {

Review comment:
       I think just about every call to `updateQuorumStateStore` is just writing an initial state. Seems like we can introduce a more direct option to the builder.
   
   By the way, one of the annoyances is needing to provide `voters` through the initial state and through `build` below. Since we always need `voters`, maybe we can provide it in the builder constructor. That would allow us to add helpers to construct the state. For example, we could turn this into:
   
   ```java
   new RaftClientTestContext.Builder(voters)
     .initializeAsFollower(epoch, otherNodeId)
     .build()
   ```

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -1729,126 +1695,156 @@ public void testLeaderGracefulShutdownTimeout() throws Exception {
     public void testFollowerGracefulShutdown() throws Exception {
         int otherNodeId = 1;
         int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
 
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
-        quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters));
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), quorumStateStore.readElectionState());
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters));
+                });
+            })
+            .build(voters);
 
-        client.poll();
+        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), context.quorumStateStore.readElectionState());
+
+        context.client.poll();
 
         int shutdownTimeoutMs = 5000;
-        CompletableFuture<Void> shutdownFuture = client.shutdown(shutdownTimeoutMs);
-        assertTrue(client.isRunning());
+        CompletableFuture<Void> shutdownFuture = context.client.shutdown(shutdownTimeoutMs);
+        assertTrue(context.client.isRunning());
         assertFalse(shutdownFuture.isDone());
 
-        client.poll();
-        assertFalse(client.isRunning());
+        context.client.poll();
+        assertFalse(context.client.isRunning());
         assertTrue(shutdownFuture.isDone());
         assertNull(shutdownFuture.get());
     }
 
     @Test
     public void testGracefulShutdownSingleMemberQuorum() throws IOException {
-        KafkaRaftClient client = buildClient(Collections.singleton(localId));
+        RaftClientTestContext context = RaftClientTestContext.build(Collections.singleton(LOCAL_ID));
+
         assertEquals(ElectionState.withElectedLeader(
-            1, localId, Collections.singleton(localId)), quorumStateStore.readElectionState());
-        client.poll();
-        assertEquals(0, channel.drainSendQueue().size());
+            1, LOCAL_ID, Collections.singleton(LOCAL_ID)), context.quorumStateStore.readElectionState());
+        context.client.poll();
+        assertEquals(0, context.channel.drainSendQueue().size());
         int shutdownTimeoutMs = 5000;
-        client.shutdown(shutdownTimeoutMs);
-        assertTrue(client.isRunning());
-        client.poll();
-        assertFalse(client.isRunning());
+        context.client.shutdown(shutdownTimeoutMs);
+        assertTrue(context.client.isRunning());
+        context.client.poll();
+        assertFalse(context.client.isRunning());
     }
 
     @Test
     public void testFollowerReplication() throws Exception {
         int otherNodeId = 1;
         int epoch = 5;
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
-        quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters));
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), quorumStateStore.readElectionState());
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters));
+                });
+            })
+            .build(voters);
+
+        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), context.quorumStateStore.readElectionState());
 
-        pollUntilSend(client);
+        context.pollUntilSend();
 
-        int fetchQuorumCorrelationId = assertSentFetchRequest(epoch, 0L, 0);
+        int fetchQuorumCorrelationId = context.assertSentFetchRequest(epoch, 0L, 0);
         Records records = MemoryRecords.withRecords(0L, CompressionType.NONE,
             3, new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes()));
         FetchResponseData response = fetchResponse(epoch, otherNodeId, records, 0L, Errors.NONE);
-        deliverResponse(fetchQuorumCorrelationId, otherNodeId, response);
+        context.deliverResponse(fetchQuorumCorrelationId, otherNodeId, response);
 
-        client.poll();
-        assertEquals(2L, log.endOffset().offset);
-        assertEquals(2L, log.lastFlushedOffset());
+        context.client.poll();
+        assertEquals(2L, context.log.endOffset().offset);
+        assertEquals(2L, context.log.lastFlushedOffset());
     }
 
     @Test
     public void testEmptyRecordSetInFetchResponse() throws Exception {
         int otherNodeId = 1;
         int epoch = 5;
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
-        quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters));
-        KafkaRaftClient client = buildClient(voters);
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
 
-        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), quorumStateStore.readElectionState());
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters));
+                });
+            })
+            .build(voters);
+
+        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), context.quorumStateStore.readElectionState());

Review comment:
       nit: we have assertions like this in many test cases. With a more direct api to update quorum state, we can move these assertions into that api.

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -1536,67 +1522,70 @@ public void testObserverLeaderRediscoveryAfterRequestTimeout() throws Exception
         int otherNodeId = 2;
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(leaderId, otherNodeId);
-        KafkaRaftClient client = buildClient(voters);
-        discoverLeaderAsObserver(client, voters, leaderId, epoch);
 
-        pollUntilSend(client);
-        RaftRequest.Outbound fetchRequest1 = assertSentFetchRequest();
+        RaftClientTestContext context = RaftClientTestContext.build(voters);
+
+        context.discoverLeaderAsObserver(voters, leaderId, epoch);
+
+        context.pollUntilSend();
+        RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
         assertEquals(leaderId, fetchRequest1.destinationId());
-        assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
+        RaftClientTestContext.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
 
-        time.sleep(requestTimeoutMs);
-        pollUntilSend(client);
+        context.time.sleep(REQUEST_TIMEOUT_MS);
+        context.pollUntilSend();
 
         // We should retry the Fetch against the other voter since the original
         // voter connection will be backing off.
-        RaftRequest.Outbound fetchRequest2 = assertSentFetchRequest();
+        RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
         assertNotEquals(leaderId, fetchRequest2.destinationId());
         assertTrue(voters.contains(fetchRequest2.destinationId()));
-        assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
+        RaftClientTestContext.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
 
-        deliverResponse(fetchRequest2.correlationId, fetchRequest2.destinationId(),
+        context.deliverResponse(fetchRequest2.correlationId, fetchRequest2.destinationId(),
             fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
-        client.poll();
+        context.client.poll();
 
-        assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), quorumStateStore.readElectionState());
+        assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), context.quorumStateStore.readElectionState());
     }
 
     @Test
     public void testLeaderGracefulShutdown() throws Exception {
         int otherNodeId = 1;
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
         int epoch = 1;
-        KafkaRaftClient client = initializeAsLeader(voters, epoch);
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
+
+        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(voters, epoch);
 
         // Now shutdown
         int shutdownTimeoutMs = 5000;
-        CompletableFuture<Void> shutdownFuture = client.shutdown(shutdownTimeoutMs);
+        CompletableFuture<Void> shutdownFuture = context.client.shutdown(shutdownTimeoutMs);
 
         // We should still be running until we have had a chance to send EndQuorumEpoch
-        assertTrue(client.isShuttingDown());
-        assertTrue(client.isRunning());
+        assertTrue(context.client.isShuttingDown());

Review comment:
       nit: it is a tad vexing to see all the `context` prefixes. I guess another option might be to define `RaftClientTestContext` as an abstract class so that the test method can define the test behavior within the scope of a subclass.
   
   For example:
   ```java
   new RaftClientTestContext(builder) {
     void run() {
       assertTrue(client.isShuttingDown());
       ...
     }
   }
   ```
   Not required, just an alternative to consider.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org