You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/21 21:56:52 UTC

[GitHub] [kafka] jsancio opened a new pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

jsancio opened a new pull request #9476:
URL: https://github.com/apache/kafka/pull/9476


   There is a lot of functionality in KafkaRaftClientTest that is useful for writing other tests. Refactor that functionality into another class that can be reused in other tests.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r510448694



##########
File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
##########
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
+import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
+import org.apache.kafka.common.message.DescribeQuorumResponseData;
+import org.apache.kafka.common.message.EndQuorumEpochRequestData;
+import org.apache.kafka.common.message.EndQuorumEpochResponseData;
+import org.apache.kafka.common.message.FetchRequestData;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
+import org.apache.kafka.common.message.LeaderChangeMessage;
+import org.apache.kafka.common.message.VoteRequestData;
+import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.ControlRecordUtils;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.requests.BeginQuorumEpochResponse;
+import org.apache.kafka.common.requests.DescribeQuorumResponse;
+import org.apache.kafka.common.requests.VoteResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
+import org.mockito.Mockito;
+import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+final class RaftClientTestContext {
+    private static final int FETCH_MAX_WAIT_MS = 0;
+
+    static final TopicPartition METADATA_PARTITION = new TopicPartition("metadata", 0);
+    static final int LOCAL_ID = 0;
+
+    static final int ELECTION_BACKOFF_MAX_MS = 100;
+    static final int ELECTION_TIMEOUT_MS = 10000;
+    // fetch timeout is usually larger than election timeout
+    static final int FETCH_TIMEOUT_MS = 50000;
+    static final int REQUEST_TIMEOUT_MS = 5000;
+    static final int RETRY_BACKOFF_MS = 50;
+
+    private final QuorumStateStore quorumStateStore;
+    private final Random random;
+
+    final KafkaRaftClient client;
+    final Metrics metrics;
+    final MockLog log;
+    final MockNetworkChannel channel;
+    final MockTime time;
+    final Set<Integer> voters;
+
+    public static final class Builder {
+        private final QuorumStateStore quorumStateStore = new MockQuorumStateStore();
+        private final Random random = Mockito.spy(new Random(1));
+        private final MockLog log = new MockLog(METADATA_PARTITION);
+        private final Set<Integer> voters;
+
+        Builder(Set<Integer> voters) {
+            this.voters = voters;
+        }
+
+        Builder withElectedLeader(int epoch, int leaderId) throws IOException {
+            quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, leaderId, voters));
+            return this;
+        }
+
+        Builder withUnknownLeader(int epoch) throws IOException {
+            quorumStateStore.writeElectionState(ElectionState.withUnknownLeader(epoch, voters));
+            return this;
+        }
+
+        Builder withVotedCandidate(int epoch, int votedId) throws IOException {
+            quorumStateStore.writeElectionState(ElectionState.withVotedCandidate(epoch, votedId, voters));
+            return this;
+        }
+
+        Builder updateRandom(Consumer<Random> consumer) {
+            consumer.accept(random);
+            return this;
+        }
+
+        Builder updateLog(Consumer<MockLog> consumer) {
+            consumer.accept(log);
+            return this;
+        }
+
+        RaftClientTestContext build() throws IOException {
+            MockTime time = new MockTime();
+            Metrics metrics = new Metrics(time);
+            MockNetworkChannel channel = new MockNetworkChannel();
+            LogContext logContext = new LogContext();
+            QuorumState quorum = new QuorumState(LOCAL_ID, voters, ELECTION_TIMEOUT_MS, FETCH_TIMEOUT_MS,
+                    quorumStateStore, time, logContext, random);
+
+            Map<Integer, InetSocketAddress> voterAddresses = voters.stream().collect(Collectors.toMap(
+                        Function.identity(),
+                        RaftClientTestContext::mockAddress
+                        ));

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r510387310



##########
File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
##########
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
+import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
+import org.apache.kafka.common.message.DescribeQuorumResponseData;
+import org.apache.kafka.common.message.EndQuorumEpochRequestData;
+import org.apache.kafka.common.message.EndQuorumEpochResponseData;
+import org.apache.kafka.common.message.FetchRequestData;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
+import org.apache.kafka.common.message.LeaderChangeMessage;
+import org.apache.kafka.common.message.VoteRequestData;
+import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.ControlRecordUtils;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.requests.BeginQuorumEpochResponse;
+import org.apache.kafka.common.requests.DescribeQuorumResponse;
+import org.apache.kafka.common.requests.VoteResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
+import org.mockito.Mockito;
+import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+final class RaftClientTestContext {
+    private static final int FETCH_MAX_WAIT_MS = 0;
+
+    static final TopicPartition METADATA_PARTITION = new TopicPartition("metadata", 0);
+    static final int LOCAL_ID = 0;

Review comment:
       I'm somewhat inclined to add the local id to the builder rather than making it constant. It makes the builder a bit more self-contained. 
   
   On a similar note, it would be nice to push the other static config values into the builder as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r509760723



##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -1729,126 +1695,156 @@ public void testLeaderGracefulShutdownTimeout() throws Exception {
     public void testFollowerGracefulShutdown() throws Exception {
         int otherNodeId = 1;
         int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
 
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
-        quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters));
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), quorumStateStore.readElectionState());
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {

Review comment:
       I think nearly every call to `updateQuorumStateStore` is just writing an initial state. Seems like we can introduce a more direct option to the builder.
   
   By the way, one of the annoyances is needing to provide `voters` through the initial state and through `build` below. Since we always need `voters`, maybe we can provide it in the builder constructor. That would allow us to add helpers to construct the state. For example, we could turn this into:
   
   ```java
   new RaftClientTestContext.Builder(voters)
     .initializeAsFollower(epoch, otherNodeId)
     .build()
   ```
   
   Similarly, we could probably do state assertions in the test context as well and save the need to always pass through `voters` (e.g. we could have `context.assertFollower(epoch, leaderId)` instead of the cumbersome `assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), context.quorumStateStore.readElectionState())`).

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -1729,126 +1695,156 @@ public void testLeaderGracefulShutdownTimeout() throws Exception {
     public void testFollowerGracefulShutdown() throws Exception {
         int otherNodeId = 1;
         int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
 
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
-        quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters));
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), quorumStateStore.readElectionState());
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {

Review comment:
       I think just about every call to `updateQuorumStateStore` is just writing an initial state. Seems like we can introduce a more direct option to the builder.
   
   By the way, one of the annoyances is needing to provide `voters` through the initial state and through `build` below. Since we always need `voters`, maybe we can provide it in the builder constructor. That would allow us to add helpers to construct the state. For example, we could turn this into:
   
   ```java
   new RaftClientTestContext.Builder(voters)
     .initializeAsFollower(epoch, otherNodeId)
     .build()
   ```

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -1729,126 +1695,156 @@ public void testLeaderGracefulShutdownTimeout() throws Exception {
     public void testFollowerGracefulShutdown() throws Exception {
         int otherNodeId = 1;
         int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
 
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
-        quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters));
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), quorumStateStore.readElectionState());
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters));
+                });
+            })
+            .build(voters);
 
-        client.poll();
+        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), context.quorumStateStore.readElectionState());
+
+        context.client.poll();
 
         int shutdownTimeoutMs = 5000;
-        CompletableFuture<Void> shutdownFuture = client.shutdown(shutdownTimeoutMs);
-        assertTrue(client.isRunning());
+        CompletableFuture<Void> shutdownFuture = context.client.shutdown(shutdownTimeoutMs);
+        assertTrue(context.client.isRunning());
         assertFalse(shutdownFuture.isDone());
 
-        client.poll();
-        assertFalse(client.isRunning());
+        context.client.poll();
+        assertFalse(context.client.isRunning());
         assertTrue(shutdownFuture.isDone());
         assertNull(shutdownFuture.get());
     }
 
     @Test
     public void testGracefulShutdownSingleMemberQuorum() throws IOException {
-        KafkaRaftClient client = buildClient(Collections.singleton(localId));
+        RaftClientTestContext context = RaftClientTestContext.build(Collections.singleton(LOCAL_ID));
+
         assertEquals(ElectionState.withElectedLeader(
-            1, localId, Collections.singleton(localId)), quorumStateStore.readElectionState());
-        client.poll();
-        assertEquals(0, channel.drainSendQueue().size());
+            1, LOCAL_ID, Collections.singleton(LOCAL_ID)), context.quorumStateStore.readElectionState());
+        context.client.poll();
+        assertEquals(0, context.channel.drainSendQueue().size());
         int shutdownTimeoutMs = 5000;
-        client.shutdown(shutdownTimeoutMs);
-        assertTrue(client.isRunning());
-        client.poll();
-        assertFalse(client.isRunning());
+        context.client.shutdown(shutdownTimeoutMs);
+        assertTrue(context.client.isRunning());
+        context.client.poll();
+        assertFalse(context.client.isRunning());
     }
 
     @Test
     public void testFollowerReplication() throws Exception {
         int otherNodeId = 1;
         int epoch = 5;
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
-        quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters));
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), quorumStateStore.readElectionState());
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters));
+                });
+            })
+            .build(voters);
+
+        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), context.quorumStateStore.readElectionState());
 
-        pollUntilSend(client);
+        context.pollUntilSend();
 
-        int fetchQuorumCorrelationId = assertSentFetchRequest(epoch, 0L, 0);
+        int fetchQuorumCorrelationId = context.assertSentFetchRequest(epoch, 0L, 0);
         Records records = MemoryRecords.withRecords(0L, CompressionType.NONE,
             3, new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes()));
         FetchResponseData response = fetchResponse(epoch, otherNodeId, records, 0L, Errors.NONE);
-        deliverResponse(fetchQuorumCorrelationId, otherNodeId, response);
+        context.deliverResponse(fetchQuorumCorrelationId, otherNodeId, response);
 
-        client.poll();
-        assertEquals(2L, log.endOffset().offset);
-        assertEquals(2L, log.lastFlushedOffset());
+        context.client.poll();
+        assertEquals(2L, context.log.endOffset().offset);
+        assertEquals(2L, context.log.lastFlushedOffset());
     }
 
     @Test
     public void testEmptyRecordSetInFetchResponse() throws Exception {
         int otherNodeId = 1;
         int epoch = 5;
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
-        quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters));
-        KafkaRaftClient client = buildClient(voters);
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
 
-        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), quorumStateStore.readElectionState());
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters));
+                });
+            })
+            .build(voters);
+
+        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), context.quorumStateStore.readElectionState());

Review comment:
       nit: we have assertions like this in many test cases. With a more direct api to update quorum state, we can move these assertions into that api.

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -1536,67 +1522,70 @@ public void testObserverLeaderRediscoveryAfterRequestTimeout() throws Exception
         int otherNodeId = 2;
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(leaderId, otherNodeId);
-        KafkaRaftClient client = buildClient(voters);
-        discoverLeaderAsObserver(client, voters, leaderId, epoch);
 
-        pollUntilSend(client);
-        RaftRequest.Outbound fetchRequest1 = assertSentFetchRequest();
+        RaftClientTestContext context = RaftClientTestContext.build(voters);
+
+        context.discoverLeaderAsObserver(voters, leaderId, epoch);
+
+        context.pollUntilSend();
+        RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
         assertEquals(leaderId, fetchRequest1.destinationId());
-        assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
+        RaftClientTestContext.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
 
-        time.sleep(requestTimeoutMs);
-        pollUntilSend(client);
+        context.time.sleep(REQUEST_TIMEOUT_MS);
+        context.pollUntilSend();
 
         // We should retry the Fetch against the other voter since the original
         // voter connection will be backing off.
-        RaftRequest.Outbound fetchRequest2 = assertSentFetchRequest();
+        RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
         assertNotEquals(leaderId, fetchRequest2.destinationId());
         assertTrue(voters.contains(fetchRequest2.destinationId()));
-        assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
+        RaftClientTestContext.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
 
-        deliverResponse(fetchRequest2.correlationId, fetchRequest2.destinationId(),
+        context.deliverResponse(fetchRequest2.correlationId, fetchRequest2.destinationId(),
             fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
-        client.poll();
+        context.client.poll();
 
-        assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), quorumStateStore.readElectionState());
+        assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), context.quorumStateStore.readElectionState());
     }
 
     @Test
     public void testLeaderGracefulShutdown() throws Exception {
         int otherNodeId = 1;
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
         int epoch = 1;
-        KafkaRaftClient client = initializeAsLeader(voters, epoch);
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
+
+        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(voters, epoch);
 
         // Now shutdown
         int shutdownTimeoutMs = 5000;
-        CompletableFuture<Void> shutdownFuture = client.shutdown(shutdownTimeoutMs);
+        CompletableFuture<Void> shutdownFuture = context.client.shutdown(shutdownTimeoutMs);
 
         // We should still be running until we have had a chance to send EndQuorumEpoch
-        assertTrue(client.isShuttingDown());
-        assertTrue(client.isRunning());
+        assertTrue(context.client.isShuttingDown());

Review comment:
       nit: it is a tad vexing to see all the `context` prefixes. I guess another option might be to define `RaftClientTestContext` as an abstract class so that the test method can define the test behavior within the scope of a subclass.
   
   For example:
   ```java
   new RaftClientTestContext(builder) {
     void run() {
       assertTrue(client.isShuttingDown());
       ...
     }
   }
   ```
   Not required, just an alternative to consider.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r510421964



##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -90,470 +76,480 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class KafkaRaftClientTest {
-    private static final TopicPartition METADATA_PARTITION = new TopicPartition("metadata", 0);
-
-    private final int localId = 0;
-    private final int electionTimeoutMs = 10000;
-    private final int electionBackoffMaxMs = 100;
-    private final int fetchTimeoutMs = 50000;   // fetch timeout is usually larger than election timeout
-    private final int retryBackoffMs = 50;
-    private final int requestTimeoutMs = 5000;
-    private final int fetchMaxWaitMs = 0;
-
-    private final MockTime time = new MockTime();
-    private final MockLog log = new MockLog(METADATA_PARTITION);
-    private final MockNetworkChannel channel = new MockNetworkChannel();
-    private final Random random = Mockito.spy(new Random(1));
-    private final QuorumStateStore quorumStateStore = new MockQuorumStateStore();
-
-    @AfterEach
-    public void cleanUp() throws IOException {
-        quorumStateStore.clear();
-    }
-
-    private InetSocketAddress mockAddress(int id) {
-        return new InetSocketAddress("localhost", 9990 + id);
-    }
-
-    private KafkaRaftClient buildClient(Set<Integer> voters) throws IOException {
-        return buildClient(voters, new Metrics(time));
-    }
-
-    private KafkaRaftClient buildClient(Set<Integer> voters, Metrics metrics) throws IOException {
-        LogContext logContext = new LogContext();
-        QuorumState quorum = new QuorumState(localId, voters, electionTimeoutMs, fetchTimeoutMs,
-            quorumStateStore, time, logContext, random);
-
-        Map<Integer, InetSocketAddress> voterAddresses = voters.stream().collect(Collectors.toMap(
-            Function.identity(),
-            this::mockAddress
-        ));
-
-        KafkaRaftClient client = new KafkaRaftClient(channel, log, quorum, time, metrics,
-            new MockFuturePurgatory<>(time), new MockFuturePurgatory<>(time), voterAddresses,
-            electionBackoffMaxMs, retryBackoffMs, requestTimeoutMs, fetchMaxWaitMs, logContext, random);
-
-        client.initialize();
-
-        return client;
-    }
-
     @Test
     public void testInitializeSingleMemberQuorum() throws IOException {
-        buildClient(Collections.singleton(localId));
-        assertEquals(ElectionState.withElectedLeader(1, localId, Collections.singleton(localId)),
-            quorumStateStore.readElectionState());
+        RaftClientTestContext context = RaftClientTestContext.build(Collections.singleton(LOCAL_ID));
+        assertEquals(
+            ElectionState.withElectedLeader(1, LOCAL_ID, Collections.singleton(LOCAL_ID)),
+            context.quorumStateStore.readElectionState()
+        );
     }
 
     @Test
     public void testInitializeAsLeaderFromStateStoreSingleMemberQuorum() throws Exception {
         // Start off as leader. We should still bump the epoch after initialization
 
         int initialEpoch = 2;
-        Set<Integer> voters = Collections.singleton(localId);
-        quorumStateStore.writeElectionState(ElectionState.withElectedLeader(initialEpoch, localId, voters));
-
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(1L, log.endOffset().offset);
-        assertEquals(initialEpoch + 1, log.lastFetchedEpoch());
-        assertEquals(new LeaderAndEpoch(OptionalInt.of(localId), initialEpoch + 1),
-            client.currentLeaderAndEpoch());
-        assertEquals(ElectionState.withElectedLeader(initialEpoch + 1, localId, voters),
-            quorumStateStore.readElectionState());
+        Set<Integer> voters = Collections.singleton(LOCAL_ID);
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    quorumStateStore.writeElectionState(
+                        ElectionState.withElectedLeader(initialEpoch, LOCAL_ID, voters)
+                    );
+                });
+            })
+            .build(voters);
+
+        assertEquals(1L, context.log.endOffset().offset);
+        assertEquals(initialEpoch + 1, context.log.lastFetchedEpoch());
+        assertEquals(new LeaderAndEpoch(OptionalInt.of(LOCAL_ID), initialEpoch + 1),
+            context.client.currentLeaderAndEpoch());
+        assertEquals(ElectionState.withElectedLeader(initialEpoch + 1, LOCAL_ID, voters),
+            context.quorumStateStore.readElectionState());
     }
 
     @Test
     public void testInitializeAsLeaderFromStateStore() throws Exception {
-        Set<Integer> voters = Utils.mkSet(localId, 1);
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, 1);
         int epoch = 2;
 
-        Mockito.doReturn(0).when(random).nextInt(electionTimeoutMs);
-        quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, localId, voters));
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(0L, log.endOffset().offset);
-        assertEquals(ElectionState.withUnknownLeader(epoch, voters), quorumStateStore.readElectionState());
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateRandom(random -> {
+                Mockito.doReturn(0).when(random).nextInt(RaftClientTestContext.ELECTION_TIMEOUT_MS);
+            })
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, LOCAL_ID, voters));
+                });
+            })
+            .build(voters);
+
 
-        time.sleep(electionTimeoutMs);
-        pollUntilSend(client);
-        assertSentVoteRequest(epoch + 1, 0, 0L);
+        assertEquals(0L, context.log.endOffset().offset);
+        assertEquals(ElectionState.withUnknownLeader(epoch, voters), context.quorumStateStore.readElectionState());
+
+        context.time.sleep(RaftClientTestContext.ELECTION_TIMEOUT_MS);
+        context.pollUntilSend();
+        context.assertSentVoteRequest(epoch + 1, 0, 0L);
     }
 
     @Test
     public void testInitializeAsCandidateFromStateStore() throws Exception {
         // Need 3 node to require a 2-node majority
-        Set<Integer> voters = Utils.mkSet(localId, 1, 2);
-        quorumStateStore.writeElectionState(ElectionState.withVotedCandidate(2, localId, voters));
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, 1, 2);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    quorumStateStore.writeElectionState(ElectionState.withVotedCandidate(2, LOCAL_ID, voters));
+                });
+            })
+            .build(voters);
 
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(0L, log.endOffset().offset);
+        assertEquals(0L, context.log.endOffset().offset);
 
         // Send out vote requests.
-        client.poll();
+        context.client.poll();
 
-        List<RaftRequest.Outbound> voteRequests = collectVoteRequests(2, 0, 0);
+        List<RaftRequest.Outbound> voteRequests = context.collectVoteRequests(2, 0, 0);
         assertEquals(2, voteRequests.size());
     }
 
     @Test
     public void testInitializeAsCandidateAndBecomeLeader() throws Exception {
         final int otherNodeId = 1;
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
-        KafkaRaftClient client = buildClient(voters);
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
+        RaftClientTestContext context = RaftClientTestContext.build(voters);
 
-        assertEquals(ElectionState.withUnknownLeader(0, voters), quorumStateStore.readElectionState());
-        time.sleep(2 * electionTimeoutMs);
+        assertEquals(ElectionState.withUnknownLeader(0, voters), context.quorumStateStore.readElectionState());
+        context.time.sleep(2 * RaftClientTestContext.ELECTION_TIMEOUT_MS);
 
-        pollUntilSend(client);
-        assertEquals(ElectionState.withVotedCandidate(1, localId, voters), quorumStateStore.readElectionState());
+        context.pollUntilSend();
+        assertEquals(ElectionState.withVotedCandidate(1, LOCAL_ID, voters), context.quorumStateStore.readElectionState());
 
-        int correlationId = assertSentVoteRequest(1, 0, 0L);
-        deliverResponse(correlationId, otherNodeId, voteResponse(true, Optional.empty(), 1));
+        int correlationId = context.assertSentVoteRequest(1, 0, 0L);
+        context.deliverResponse(correlationId, otherNodeId, RaftClientTestContext.voteResponse(true, Optional.empty(), 1));
 
         // Become leader after receiving the vote
-        client.poll();
-        assertEquals(ElectionState.withElectedLeader(1, localId, voters), quorumStateStore.readElectionState());
-        long electionTimestamp = time.milliseconds();
+        context.client.poll();
+        assertEquals(ElectionState.withElectedLeader(1, LOCAL_ID, voters), context.quorumStateStore.readElectionState());
+        long electionTimestamp = context.time.milliseconds();
 
         // Leader change record appended
-        assertEquals(1L, log.endOffset().offset);
-        assertEquals(1L, log.lastFlushedOffset());
+        assertEquals(1L, context.log.endOffset().offset);
+        assertEquals(1L, context.log.lastFlushedOffset());
 
         // Send BeginQuorumEpoch to voters
-        client.poll();
-        assertSentBeginQuorumEpochRequest(1);
+        context.client.poll();
+        context.assertSentBeginQuorumEpochRequest(1);
 
-        Records records = log.read(0, Isolation.UNCOMMITTED).records;
+        Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
         RecordBatch batch = records.batches().iterator().next();
         assertTrue(batch.isControlBatch());
 
         Record record = batch.iterator().next();
         assertEquals(electionTimestamp, record.timestamp());
-        verifyLeaderChangeMessage(localId, Collections.singletonList(otherNodeId),
-            record.key(), record.value());
+        RaftClientTestContext.verifyLeaderChangeMessage(LOCAL_ID, Collections.singletonList(otherNodeId), record.key(), record.value());

Review comment:
       Done. https://issues.apache.org/jira/browse/KAFKA-10634




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r509840834



##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -90,470 +76,480 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class KafkaRaftClientTest {
-    private static final TopicPartition METADATA_PARTITION = new TopicPartition("metadata", 0);
-
-    private final int localId = 0;
-    private final int electionTimeoutMs = 10000;
-    private final int electionBackoffMaxMs = 100;
-    private final int fetchTimeoutMs = 50000;   // fetch timeout is usually larger than election timeout
-    private final int retryBackoffMs = 50;
-    private final int requestTimeoutMs = 5000;
-    private final int fetchMaxWaitMs = 0;
-
-    private final MockTime time = new MockTime();
-    private final MockLog log = new MockLog(METADATA_PARTITION);
-    private final MockNetworkChannel channel = new MockNetworkChannel();
-    private final Random random = Mockito.spy(new Random(1));
-    private final QuorumStateStore quorumStateStore = new MockQuorumStateStore();
-
-    @AfterEach
-    public void cleanUp() throws IOException {
-        quorumStateStore.clear();
-    }
-
-    private InetSocketAddress mockAddress(int id) {
-        return new InetSocketAddress("localhost", 9990 + id);
-    }
-
-    private KafkaRaftClient buildClient(Set<Integer> voters) throws IOException {
-        return buildClient(voters, new Metrics(time));
-    }
-
-    private KafkaRaftClient buildClient(Set<Integer> voters, Metrics metrics) throws IOException {
-        LogContext logContext = new LogContext();
-        QuorumState quorum = new QuorumState(localId, voters, electionTimeoutMs, fetchTimeoutMs,
-            quorumStateStore, time, logContext, random);
-
-        Map<Integer, InetSocketAddress> voterAddresses = voters.stream().collect(Collectors.toMap(
-            Function.identity(),
-            this::mockAddress
-        ));
-
-        KafkaRaftClient client = new KafkaRaftClient(channel, log, quorum, time, metrics,
-            new MockFuturePurgatory<>(time), new MockFuturePurgatory<>(time), voterAddresses,
-            electionBackoffMaxMs, retryBackoffMs, requestTimeoutMs, fetchMaxWaitMs, logContext, random);
-
-        client.initialize();
-
-        return client;
-    }
-
     @Test
     public void testInitializeSingleMemberQuorum() throws IOException {
-        buildClient(Collections.singleton(localId));
-        assertEquals(ElectionState.withElectedLeader(1, localId, Collections.singleton(localId)),
-            quorumStateStore.readElectionState());
+        RaftClientTestContext context = RaftClientTestContext.build(Collections.singleton(LOCAL_ID));
+        assertEquals(
+            ElectionState.withElectedLeader(1, LOCAL_ID, Collections.singleton(LOCAL_ID)),
+            context.quorumStateStore.readElectionState()
+        );
     }
 
     @Test
     public void testInitializeAsLeaderFromStateStoreSingleMemberQuorum() throws Exception {
         // Start off as leader. We should still bump the epoch after initialization
 
         int initialEpoch = 2;
-        Set<Integer> voters = Collections.singleton(localId);
-        quorumStateStore.writeElectionState(ElectionState.withElectedLeader(initialEpoch, localId, voters));
-
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(1L, log.endOffset().offset);
-        assertEquals(initialEpoch + 1, log.lastFetchedEpoch());
-        assertEquals(new LeaderAndEpoch(OptionalInt.of(localId), initialEpoch + 1),
-            client.currentLeaderAndEpoch());
-        assertEquals(ElectionState.withElectedLeader(initialEpoch + 1, localId, voters),
-            quorumStateStore.readElectionState());
+        Set<Integer> voters = Collections.singleton(LOCAL_ID);
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    quorumStateStore.writeElectionState(
+                        ElectionState.withElectedLeader(initialEpoch, LOCAL_ID, voters)
+                    );
+                });
+            })
+            .build(voters);
+
+        assertEquals(1L, context.log.endOffset().offset);
+        assertEquals(initialEpoch + 1, context.log.lastFetchedEpoch());
+        assertEquals(new LeaderAndEpoch(OptionalInt.of(LOCAL_ID), initialEpoch + 1),
+            context.client.currentLeaderAndEpoch());
+        assertEquals(ElectionState.withElectedLeader(initialEpoch + 1, LOCAL_ID, voters),
+            context.quorumStateStore.readElectionState());
     }
 
     @Test
     public void testInitializeAsLeaderFromStateStore() throws Exception {
-        Set<Integer> voters = Utils.mkSet(localId, 1);
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, 1);
         int epoch = 2;
 
-        Mockito.doReturn(0).when(random).nextInt(electionTimeoutMs);
-        quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, localId, voters));
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(0L, log.endOffset().offset);
-        assertEquals(ElectionState.withUnknownLeader(epoch, voters), quorumStateStore.readElectionState());
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateRandom(random -> {
+                Mockito.doReturn(0).when(random).nextInt(RaftClientTestContext.ELECTION_TIMEOUT_MS);
+            })
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, LOCAL_ID, voters));
+                });
+            })
+            .build(voters);
+
 
-        time.sleep(electionTimeoutMs);
-        pollUntilSend(client);
-        assertSentVoteRequest(epoch + 1, 0, 0L);
+        assertEquals(0L, context.log.endOffset().offset);
+        assertEquals(ElectionState.withUnknownLeader(epoch, voters), context.quorumStateStore.readElectionState());
+
+        context.time.sleep(RaftClientTestContext.ELECTION_TIMEOUT_MS);
+        context.pollUntilSend();
+        context.assertSentVoteRequest(epoch + 1, 0, 0L);
     }
 
     @Test
     public void testInitializeAsCandidateFromStateStore() throws Exception {
         // Need 3 node to require a 2-node majority
-        Set<Integer> voters = Utils.mkSet(localId, 1, 2);
-        quorumStateStore.writeElectionState(ElectionState.withVotedCandidate(2, localId, voters));
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, 1, 2);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    quorumStateStore.writeElectionState(ElectionState.withVotedCandidate(2, LOCAL_ID, voters));
+                });
+            })
+            .build(voters);
 
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(0L, log.endOffset().offset);
+        assertEquals(0L, context.log.endOffset().offset);
 
         // Send out vote requests.
-        client.poll();
+        context.client.poll();
 
-        List<RaftRequest.Outbound> voteRequests = collectVoteRequests(2, 0, 0);
+        List<RaftRequest.Outbound> voteRequests = context.collectVoteRequests(2, 0, 0);
         assertEquals(2, voteRequests.size());
     }
 
     @Test
     public void testInitializeAsCandidateAndBecomeLeader() throws Exception {
         final int otherNodeId = 1;
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
-        KafkaRaftClient client = buildClient(voters);
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
+        RaftClientTestContext context = RaftClientTestContext.build(voters);
 
-        assertEquals(ElectionState.withUnknownLeader(0, voters), quorumStateStore.readElectionState());
-        time.sleep(2 * electionTimeoutMs);
+        assertEquals(ElectionState.withUnknownLeader(0, voters), context.quorumStateStore.readElectionState());
+        context.time.sleep(2 * RaftClientTestContext.ELECTION_TIMEOUT_MS);
 
-        pollUntilSend(client);
-        assertEquals(ElectionState.withVotedCandidate(1, localId, voters), quorumStateStore.readElectionState());
+        context.pollUntilSend();
+        assertEquals(ElectionState.withVotedCandidate(1, LOCAL_ID, voters), context.quorumStateStore.readElectionState());
 
-        int correlationId = assertSentVoteRequest(1, 0, 0L);
-        deliverResponse(correlationId, otherNodeId, voteResponse(true, Optional.empty(), 1));
+        int correlationId = context.assertSentVoteRequest(1, 0, 0L);
+        context.deliverResponse(correlationId, otherNodeId, RaftClientTestContext.voteResponse(true, Optional.empty(), 1));
 
         // Become leader after receiving the vote
-        client.poll();
-        assertEquals(ElectionState.withElectedLeader(1, localId, voters), quorumStateStore.readElectionState());
-        long electionTimestamp = time.milliseconds();
+        context.client.poll();
+        assertEquals(ElectionState.withElectedLeader(1, LOCAL_ID, voters), context.quorumStateStore.readElectionState());
+        long electionTimestamp = context.time.milliseconds();
 
         // Leader change record appended
-        assertEquals(1L, log.endOffset().offset);
-        assertEquals(1L, log.lastFlushedOffset());
+        assertEquals(1L, context.log.endOffset().offset);
+        assertEquals(1L, context.log.lastFlushedOffset());
 
         // Send BeginQuorumEpoch to voters
-        client.poll();
-        assertSentBeginQuorumEpochRequest(1);
+        context.client.poll();
+        context.assertSentBeginQuorumEpochRequest(1);
 
-        Records records = log.read(0, Isolation.UNCOMMITTED).records;
+        Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
         RecordBatch batch = records.batches().iterator().next();
         assertTrue(batch.isControlBatch());
 
         Record record = batch.iterator().next();
         assertEquals(electionTimestamp, record.timestamp());
-        verifyLeaderChangeMessage(localId, Collections.singletonList(otherNodeId),
-            record.key(), record.value());
+        RaftClientTestContext.verifyLeaderChangeMessage(LOCAL_ID, Collections.singletonList(otherNodeId), record.key(), record.value());

Review comment:
       This is what the LeaderChangeMessage says:
   ```
         {"name": "VotedIds", "type": "[]int32", "versions": "0+",
          "about": "The IDs of the voters who voted for the current leader"},
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r509840265



##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -90,470 +76,480 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class KafkaRaftClientTest {
-    private static final TopicPartition METADATA_PARTITION = new TopicPartition("metadata", 0);
-
-    private final int localId = 0;
-    private final int electionTimeoutMs = 10000;
-    private final int electionBackoffMaxMs = 100;
-    private final int fetchTimeoutMs = 50000;   // fetch timeout is usually larger than election timeout
-    private final int retryBackoffMs = 50;
-    private final int requestTimeoutMs = 5000;
-    private final int fetchMaxWaitMs = 0;
-
-    private final MockTime time = new MockTime();
-    private final MockLog log = new MockLog(METADATA_PARTITION);
-    private final MockNetworkChannel channel = new MockNetworkChannel();
-    private final Random random = Mockito.spy(new Random(1));
-    private final QuorumStateStore quorumStateStore = new MockQuorumStateStore();
-
-    @AfterEach
-    public void cleanUp() throws IOException {
-        quorumStateStore.clear();
-    }
-
-    private InetSocketAddress mockAddress(int id) {
-        return new InetSocketAddress("localhost", 9990 + id);
-    }
-
-    private KafkaRaftClient buildClient(Set<Integer> voters) throws IOException {
-        return buildClient(voters, new Metrics(time));
-    }
-
-    private KafkaRaftClient buildClient(Set<Integer> voters, Metrics metrics) throws IOException {
-        LogContext logContext = new LogContext();
-        QuorumState quorum = new QuorumState(localId, voters, electionTimeoutMs, fetchTimeoutMs,
-            quorumStateStore, time, logContext, random);
-
-        Map<Integer, InetSocketAddress> voterAddresses = voters.stream().collect(Collectors.toMap(
-            Function.identity(),
-            this::mockAddress
-        ));
-
-        KafkaRaftClient client = new KafkaRaftClient(channel, log, quorum, time, metrics,
-            new MockFuturePurgatory<>(time), new MockFuturePurgatory<>(time), voterAddresses,
-            electionBackoffMaxMs, retryBackoffMs, requestTimeoutMs, fetchMaxWaitMs, logContext, random);
-
-        client.initialize();
-
-        return client;
-    }
-
     @Test
     public void testInitializeSingleMemberQuorum() throws IOException {
-        buildClient(Collections.singleton(localId));
-        assertEquals(ElectionState.withElectedLeader(1, localId, Collections.singleton(localId)),
-            quorumStateStore.readElectionState());
+        RaftClientTestContext context = RaftClientTestContext.build(Collections.singleton(LOCAL_ID));
+        assertEquals(
+            ElectionState.withElectedLeader(1, LOCAL_ID, Collections.singleton(LOCAL_ID)),
+            context.quorumStateStore.readElectionState()
+        );
     }
 
     @Test
     public void testInitializeAsLeaderFromStateStoreSingleMemberQuorum() throws Exception {
         // Start off as leader. We should still bump the epoch after initialization
 
         int initialEpoch = 2;
-        Set<Integer> voters = Collections.singleton(localId);
-        quorumStateStore.writeElectionState(ElectionState.withElectedLeader(initialEpoch, localId, voters));
-
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(1L, log.endOffset().offset);
-        assertEquals(initialEpoch + 1, log.lastFetchedEpoch());
-        assertEquals(new LeaderAndEpoch(OptionalInt.of(localId), initialEpoch + 1),
-            client.currentLeaderAndEpoch());
-        assertEquals(ElectionState.withElectedLeader(initialEpoch + 1, localId, voters),
-            quorumStateStore.readElectionState());
+        Set<Integer> voters = Collections.singleton(LOCAL_ID);
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    quorumStateStore.writeElectionState(
+                        ElectionState.withElectedLeader(initialEpoch, LOCAL_ID, voters)
+                    );
+                });
+            })
+            .build(voters);
+
+        assertEquals(1L, context.log.endOffset().offset);
+        assertEquals(initialEpoch + 1, context.log.lastFetchedEpoch());
+        assertEquals(new LeaderAndEpoch(OptionalInt.of(LOCAL_ID), initialEpoch + 1),
+            context.client.currentLeaderAndEpoch());
+        assertEquals(ElectionState.withElectedLeader(initialEpoch + 1, LOCAL_ID, voters),
+            context.quorumStateStore.readElectionState());
     }
 
     @Test
     public void testInitializeAsLeaderFromStateStore() throws Exception {
-        Set<Integer> voters = Utils.mkSet(localId, 1);
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, 1);
         int epoch = 2;
 
-        Mockito.doReturn(0).when(random).nextInt(electionTimeoutMs);
-        quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, localId, voters));
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(0L, log.endOffset().offset);
-        assertEquals(ElectionState.withUnknownLeader(epoch, voters), quorumStateStore.readElectionState());
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateRandom(random -> {
+                Mockito.doReturn(0).when(random).nextInt(RaftClientTestContext.ELECTION_TIMEOUT_MS);
+            })
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, LOCAL_ID, voters));
+                });
+            })
+            .build(voters);
+
 
-        time.sleep(electionTimeoutMs);
-        pollUntilSend(client);
-        assertSentVoteRequest(epoch + 1, 0, 0L);
+        assertEquals(0L, context.log.endOffset().offset);
+        assertEquals(ElectionState.withUnknownLeader(epoch, voters), context.quorumStateStore.readElectionState());
+
+        context.time.sleep(RaftClientTestContext.ELECTION_TIMEOUT_MS);
+        context.pollUntilSend();
+        context.assertSentVoteRequest(epoch + 1, 0, 0L);
     }
 
     @Test
     public void testInitializeAsCandidateFromStateStore() throws Exception {
         // Need 3 node to require a 2-node majority
-        Set<Integer> voters = Utils.mkSet(localId, 1, 2);
-        quorumStateStore.writeElectionState(ElectionState.withVotedCandidate(2, localId, voters));
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, 1, 2);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    quorumStateStore.writeElectionState(ElectionState.withVotedCandidate(2, LOCAL_ID, voters));
+                });
+            })
+            .build(voters);
 
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(0L, log.endOffset().offset);
+        assertEquals(0L, context.log.endOffset().offset);
 
         // Send out vote requests.
-        client.poll();
+        context.client.poll();
 
-        List<RaftRequest.Outbound> voteRequests = collectVoteRequests(2, 0, 0);
+        List<RaftRequest.Outbound> voteRequests = context.collectVoteRequests(2, 0, 0);
         assertEquals(2, voteRequests.size());
     }
 
     @Test
     public void testInitializeAsCandidateAndBecomeLeader() throws Exception {
         final int otherNodeId = 1;
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
-        KafkaRaftClient client = buildClient(voters);
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
+        RaftClientTestContext context = RaftClientTestContext.build(voters);
 
-        assertEquals(ElectionState.withUnknownLeader(0, voters), quorumStateStore.readElectionState());
-        time.sleep(2 * electionTimeoutMs);
+        assertEquals(ElectionState.withUnknownLeader(0, voters), context.quorumStateStore.readElectionState());
+        context.time.sleep(2 * RaftClientTestContext.ELECTION_TIMEOUT_MS);
 
-        pollUntilSend(client);
-        assertEquals(ElectionState.withVotedCandidate(1, localId, voters), quorumStateStore.readElectionState());
+        context.pollUntilSend();
+        assertEquals(ElectionState.withVotedCandidate(1, LOCAL_ID, voters), context.quorumStateStore.readElectionState());
 
-        int correlationId = assertSentVoteRequest(1, 0, 0L);
-        deliverResponse(correlationId, otherNodeId, voteResponse(true, Optional.empty(), 1));
+        int correlationId = context.assertSentVoteRequest(1, 0, 0L);
+        context.deliverResponse(correlationId, otherNodeId, RaftClientTestContext.voteResponse(true, Optional.empty(), 1));
 
         // Become leader after receiving the vote
-        client.poll();
-        assertEquals(ElectionState.withElectedLeader(1, localId, voters), quorumStateStore.readElectionState());
-        long electionTimestamp = time.milliseconds();
+        context.client.poll();
+        assertEquals(ElectionState.withElectedLeader(1, LOCAL_ID, voters), context.quorumStateStore.readElectionState());
+        long electionTimestamp = context.time.milliseconds();
 
         // Leader change record appended
-        assertEquals(1L, log.endOffset().offset);
-        assertEquals(1L, log.lastFlushedOffset());
+        assertEquals(1L, context.log.endOffset().offset);
+        assertEquals(1L, context.log.lastFlushedOffset());
 
         // Send BeginQuorumEpoch to voters
-        client.poll();
-        assertSentBeginQuorumEpochRequest(1);
+        context.client.poll();
+        context.assertSentBeginQuorumEpochRequest(1);
 
-        Records records = log.read(0, Isolation.UNCOMMITTED).records;
+        Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
         RecordBatch batch = records.batches().iterator().next();
         assertTrue(batch.isControlBatch());
 
         Record record = batch.iterator().next();
         assertEquals(electionTimestamp, record.timestamp());
-        verifyLeaderChangeMessage(localId, Collections.singletonList(otherNodeId),
-            record.key(), record.value());
+        RaftClientTestContext.verifyLeaderChangeMessage(LOCAL_ID, Collections.singletonList(otherNodeId), record.key(), record.value());

Review comment:
       @hachikuji is this a bug? Shouldn't the leader (LOCAL_ID) always be a voter (the second argument for this function)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r509857027



##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -1729,126 +1695,156 @@ public void testLeaderGracefulShutdownTimeout() throws Exception {
     public void testFollowerGracefulShutdown() throws Exception {
         int otherNodeId = 1;
         int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
 
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
-        quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters));
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), quorumStateStore.readElectionState());
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {

Review comment:
       Thanks for the suggestion. I implemented this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

Posted by GitBox <gi...@apache.org>.
jsancio commented on pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#issuecomment-715440476


   Thanks @hachikuji for reviewing and merging the PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r510449508



##########
File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
##########
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
+import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
+import org.apache.kafka.common.message.DescribeQuorumResponseData;
+import org.apache.kafka.common.message.EndQuorumEpochRequestData;
+import org.apache.kafka.common.message.EndQuorumEpochResponseData;
+import org.apache.kafka.common.message.FetchRequestData;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
+import org.apache.kafka.common.message.LeaderChangeMessage;
+import org.apache.kafka.common.message.VoteRequestData;
+import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.ControlRecordUtils;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.requests.BeginQuorumEpochResponse;
+import org.apache.kafka.common.requests.DescribeQuorumResponse;
+import org.apache.kafka.common.requests.VoteResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
+import org.mockito.Mockito;
+import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+final class RaftClientTestContext {
+    private static final int FETCH_MAX_WAIT_MS = 0;
+
+    static final TopicPartition METADATA_PARTITION = new TopicPartition("metadata", 0);
+    static final int LOCAL_ID = 0;

Review comment:
       Okay. I made it possible to easily add support of this in the future without breaking the existing tests.
   
   We can make this changeable in the `Builder` as we need it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r509787211



##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -1536,67 +1522,70 @@ public void testObserverLeaderRediscoveryAfterRequestTimeout() throws Exception
         int otherNodeId = 2;
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(leaderId, otherNodeId);
-        KafkaRaftClient client = buildClient(voters);
-        discoverLeaderAsObserver(client, voters, leaderId, epoch);
 
-        pollUntilSend(client);
-        RaftRequest.Outbound fetchRequest1 = assertSentFetchRequest();
+        RaftClientTestContext context = RaftClientTestContext.build(voters);
+
+        context.discoverLeaderAsObserver(voters, leaderId, epoch);
+
+        context.pollUntilSend();
+        RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
         assertEquals(leaderId, fetchRequest1.destinationId());
-        assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
+        RaftClientTestContext.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
 
-        time.sleep(requestTimeoutMs);
-        pollUntilSend(client);
+        context.time.sleep(REQUEST_TIMEOUT_MS);
+        context.pollUntilSend();
 
         // We should retry the Fetch against the other voter since the original
         // voter connection will be backing off.
-        RaftRequest.Outbound fetchRequest2 = assertSentFetchRequest();
+        RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
         assertNotEquals(leaderId, fetchRequest2.destinationId());
         assertTrue(voters.contains(fetchRequest2.destinationId()));
-        assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
+        RaftClientTestContext.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
 
-        deliverResponse(fetchRequest2.correlationId, fetchRequest2.destinationId(),
+        context.deliverResponse(fetchRequest2.correlationId, fetchRequest2.destinationId(),
             fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
-        client.poll();
+        context.client.poll();
 
-        assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), quorumStateStore.readElectionState());
+        assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), context.quorumStateStore.readElectionState());
     }
 
     @Test
     public void testLeaderGracefulShutdown() throws Exception {
         int otherNodeId = 1;
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
         int epoch = 1;
-        KafkaRaftClient client = initializeAsLeader(voters, epoch);
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
+
+        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(voters, epoch);
 
         // Now shutdown
         int shutdownTimeoutMs = 5000;
-        CompletableFuture<Void> shutdownFuture = client.shutdown(shutdownTimeoutMs);
+        CompletableFuture<Void> shutdownFuture = context.client.shutdown(shutdownTimeoutMs);
 
         // We should still be running until we have had a chance to send EndQuorumEpoch
-        assertTrue(client.isShuttingDown());
-        assertTrue(client.isRunning());
+        assertTrue(context.client.isShuttingDown());

Review comment:
       Yeah. Let me play around with this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r510387310



##########
File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
##########
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
+import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
+import org.apache.kafka.common.message.DescribeQuorumResponseData;
+import org.apache.kafka.common.message.EndQuorumEpochRequestData;
+import org.apache.kafka.common.message.EndQuorumEpochResponseData;
+import org.apache.kafka.common.message.FetchRequestData;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
+import org.apache.kafka.common.message.LeaderChangeMessage;
+import org.apache.kafka.common.message.VoteRequestData;
+import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.ControlRecordUtils;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.requests.BeginQuorumEpochResponse;
+import org.apache.kafka.common.requests.DescribeQuorumResponse;
+import org.apache.kafka.common.requests.VoteResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
+import org.mockito.Mockito;
+import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+final class RaftClientTestContext {
+    private static final int FETCH_MAX_WAIT_MS = 0;
+
+    static final TopicPartition METADATA_PARTITION = new TopicPartition("metadata", 0);
+    static final int LOCAL_ID = 0;

Review comment:
       I'm somewhat inclined to add the local id to the builder rather than making it constant. It makes the builder a bit more self-contained. 
   
   On a similar note, it would be nice to push these state config values into the builder as well.

##########
File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
##########
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
+import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
+import org.apache.kafka.common.message.DescribeQuorumResponseData;
+import org.apache.kafka.common.message.EndQuorumEpochRequestData;
+import org.apache.kafka.common.message.EndQuorumEpochResponseData;
+import org.apache.kafka.common.message.FetchRequestData;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
+import org.apache.kafka.common.message.LeaderChangeMessage;
+import org.apache.kafka.common.message.VoteRequestData;
+import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.ControlRecordUtils;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.requests.BeginQuorumEpochResponse;
+import org.apache.kafka.common.requests.DescribeQuorumResponse;
+import org.apache.kafka.common.requests.VoteResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
+import org.mockito.Mockito;
+import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+final class RaftClientTestContext {
+    private static final int FETCH_MAX_WAIT_MS = 0;
+
+    static final TopicPartition METADATA_PARTITION = new TopicPartition("metadata", 0);
+    static final int LOCAL_ID = 0;
+
+    static final int ELECTION_BACKOFF_MAX_MS = 100;
+    static final int ELECTION_TIMEOUT_MS = 10000;
+    // fetch timeout is usually larger than election timeout
+    static final int FETCH_TIMEOUT_MS = 50000;
+    static final int REQUEST_TIMEOUT_MS = 5000;
+    static final int RETRY_BACKOFF_MS = 50;
+
+    private final QuorumStateStore quorumStateStore;
+    private final Random random;
+
+    final KafkaRaftClient client;
+    final Metrics metrics;
+    final MockLog log;
+    final MockNetworkChannel channel;
+    final MockTime time;
+    final Set<Integer> voters;
+
+    public static final class Builder {
+        private final QuorumStateStore quorumStateStore = new MockQuorumStateStore();
+        private final Random random = Mockito.spy(new Random(1));
+        private final MockLog log = new MockLog(METADATA_PARTITION);
+        private final Set<Integer> voters;
+
+        Builder(Set<Integer> voters) {
+            this.voters = voters;
+        }
+
+        Builder withElectedLeader(int epoch, int leaderId) throws IOException {
+            quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, leaderId, voters));
+            return this;
+        }
+
+        Builder withUnknownLeader(int epoch) throws IOException {
+            quorumStateStore.writeElectionState(ElectionState.withUnknownLeader(epoch, voters));
+            return this;
+        }
+
+        Builder withVotedCandidate(int epoch, int votedId) throws IOException {
+            quorumStateStore.writeElectionState(ElectionState.withVotedCandidate(epoch, votedId, voters));
+            return this;
+        }
+
+        Builder updateRandom(Consumer<Random> consumer) {
+            consumer.accept(random);
+            return this;
+        }
+
+        Builder updateLog(Consumer<MockLog> consumer) {
+            consumer.accept(log);
+            return this;
+        }
+
+        RaftClientTestContext build() throws IOException {
+            MockTime time = new MockTime();
+            Metrics metrics = new Metrics(time);
+            MockNetworkChannel channel = new MockNetworkChannel();
+            LogContext logContext = new LogContext();
+            QuorumState quorum = new QuorumState(LOCAL_ID, voters, ELECTION_TIMEOUT_MS, FETCH_TIMEOUT_MS,
+                    quorumStateStore, time, logContext, random);
+
+            Map<Integer, InetSocketAddress> voterAddresses = voters.stream().collect(Collectors.toMap(
+                        Function.identity(),
+                        RaftClientTestContext::mockAddress
+                        ));

Review comment:
       nit: this indentation looks kind of funky

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -90,470 +76,480 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class KafkaRaftClientTest {
-    private static final TopicPartition METADATA_PARTITION = new TopicPartition("metadata", 0);
-
-    private final int localId = 0;
-    private final int electionTimeoutMs = 10000;
-    private final int electionBackoffMaxMs = 100;
-    private final int fetchTimeoutMs = 50000;   // fetch timeout is usually larger than election timeout
-    private final int retryBackoffMs = 50;
-    private final int requestTimeoutMs = 5000;
-    private final int fetchMaxWaitMs = 0;
-
-    private final MockTime time = new MockTime();
-    private final MockLog log = new MockLog(METADATA_PARTITION);
-    private final MockNetworkChannel channel = new MockNetworkChannel();
-    private final Random random = Mockito.spy(new Random(1));
-    private final QuorumStateStore quorumStateStore = new MockQuorumStateStore();
-
-    @AfterEach
-    public void cleanUp() throws IOException {
-        quorumStateStore.clear();
-    }
-
-    private InetSocketAddress mockAddress(int id) {
-        return new InetSocketAddress("localhost", 9990 + id);
-    }
-
-    private KafkaRaftClient buildClient(Set<Integer> voters) throws IOException {
-        return buildClient(voters, new Metrics(time));
-    }
-
-    private KafkaRaftClient buildClient(Set<Integer> voters, Metrics metrics) throws IOException {
-        LogContext logContext = new LogContext();
-        QuorumState quorum = new QuorumState(localId, voters, electionTimeoutMs, fetchTimeoutMs,
-            quorumStateStore, time, logContext, random);
-
-        Map<Integer, InetSocketAddress> voterAddresses = voters.stream().collect(Collectors.toMap(
-            Function.identity(),
-            this::mockAddress
-        ));
-
-        KafkaRaftClient client = new KafkaRaftClient(channel, log, quorum, time, metrics,
-            new MockFuturePurgatory<>(time), new MockFuturePurgatory<>(time), voterAddresses,
-            electionBackoffMaxMs, retryBackoffMs, requestTimeoutMs, fetchMaxWaitMs, logContext, random);
-
-        client.initialize();
-
-        return client;
-    }
-
     @Test
     public void testInitializeSingleMemberQuorum() throws IOException {
-        buildClient(Collections.singleton(localId));
-        assertEquals(ElectionState.withElectedLeader(1, localId, Collections.singleton(localId)),
-            quorumStateStore.readElectionState());
+        RaftClientTestContext context = RaftClientTestContext.build(Collections.singleton(LOCAL_ID));
+        assertEquals(
+            ElectionState.withElectedLeader(1, LOCAL_ID, Collections.singleton(LOCAL_ID)),
+            context.quorumStateStore.readElectionState()
+        );
     }
 
     @Test
     public void testInitializeAsLeaderFromStateStoreSingleMemberQuorum() throws Exception {
         // Start off as leader. We should still bump the epoch after initialization
 
         int initialEpoch = 2;
-        Set<Integer> voters = Collections.singleton(localId);
-        quorumStateStore.writeElectionState(ElectionState.withElectedLeader(initialEpoch, localId, voters));
-
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(1L, log.endOffset().offset);
-        assertEquals(initialEpoch + 1, log.lastFetchedEpoch());
-        assertEquals(new LeaderAndEpoch(OptionalInt.of(localId), initialEpoch + 1),
-            client.currentLeaderAndEpoch());
-        assertEquals(ElectionState.withElectedLeader(initialEpoch + 1, localId, voters),
-            quorumStateStore.readElectionState());
+        Set<Integer> voters = Collections.singleton(LOCAL_ID);
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    quorumStateStore.writeElectionState(
+                        ElectionState.withElectedLeader(initialEpoch, LOCAL_ID, voters)
+                    );
+                });
+            })
+            .build(voters);
+
+        assertEquals(1L, context.log.endOffset().offset);
+        assertEquals(initialEpoch + 1, context.log.lastFetchedEpoch());
+        assertEquals(new LeaderAndEpoch(OptionalInt.of(LOCAL_ID), initialEpoch + 1),
+            context.client.currentLeaderAndEpoch());
+        assertEquals(ElectionState.withElectedLeader(initialEpoch + 1, LOCAL_ID, voters),
+            context.quorumStateStore.readElectionState());
     }
 
     @Test
     public void testInitializeAsLeaderFromStateStore() throws Exception {
-        Set<Integer> voters = Utils.mkSet(localId, 1);
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, 1);
         int epoch = 2;
 
-        Mockito.doReturn(0).when(random).nextInt(electionTimeoutMs);
-        quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, localId, voters));
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(0L, log.endOffset().offset);
-        assertEquals(ElectionState.withUnknownLeader(epoch, voters), quorumStateStore.readElectionState());
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateRandom(random -> {
+                Mockito.doReturn(0).when(random).nextInt(RaftClientTestContext.ELECTION_TIMEOUT_MS);
+            })
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, LOCAL_ID, voters));
+                });
+            })
+            .build(voters);
+
 
-        time.sleep(electionTimeoutMs);
-        pollUntilSend(client);
-        assertSentVoteRequest(epoch + 1, 0, 0L);
+        assertEquals(0L, context.log.endOffset().offset);
+        assertEquals(ElectionState.withUnknownLeader(epoch, voters), context.quorumStateStore.readElectionState());
+
+        context.time.sleep(RaftClientTestContext.ELECTION_TIMEOUT_MS);
+        context.pollUntilSend();
+        context.assertSentVoteRequest(epoch + 1, 0, 0L);
     }
 
     @Test
     public void testInitializeAsCandidateFromStateStore() throws Exception {
         // Need 3 node to require a 2-node majority
-        Set<Integer> voters = Utils.mkSet(localId, 1, 2);
-        quorumStateStore.writeElectionState(ElectionState.withVotedCandidate(2, localId, voters));
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, 1, 2);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    quorumStateStore.writeElectionState(ElectionState.withVotedCandidate(2, LOCAL_ID, voters));
+                });
+            })
+            .build(voters);
 
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(0L, log.endOffset().offset);
+        assertEquals(0L, context.log.endOffset().offset);
 
         // Send out vote requests.
-        client.poll();
+        context.client.poll();
 
-        List<RaftRequest.Outbound> voteRequests = collectVoteRequests(2, 0, 0);
+        List<RaftRequest.Outbound> voteRequests = context.collectVoteRequests(2, 0, 0);
         assertEquals(2, voteRequests.size());
     }
 
     @Test
     public void testInitializeAsCandidateAndBecomeLeader() throws Exception {
         final int otherNodeId = 1;
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
-        KafkaRaftClient client = buildClient(voters);
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
+        RaftClientTestContext context = RaftClientTestContext.build(voters);
 
-        assertEquals(ElectionState.withUnknownLeader(0, voters), quorumStateStore.readElectionState());
-        time.sleep(2 * electionTimeoutMs);
+        assertEquals(ElectionState.withUnknownLeader(0, voters), context.quorumStateStore.readElectionState());
+        context.time.sleep(2 * RaftClientTestContext.ELECTION_TIMEOUT_MS);
 
-        pollUntilSend(client);
-        assertEquals(ElectionState.withVotedCandidate(1, localId, voters), quorumStateStore.readElectionState());
+        context.pollUntilSend();
+        assertEquals(ElectionState.withVotedCandidate(1, LOCAL_ID, voters), context.quorumStateStore.readElectionState());
 
-        int correlationId = assertSentVoteRequest(1, 0, 0L);
-        deliverResponse(correlationId, otherNodeId, voteResponse(true, Optional.empty(), 1));
+        int correlationId = context.assertSentVoteRequest(1, 0, 0L);
+        context.deliverResponse(correlationId, otherNodeId, RaftClientTestContext.voteResponse(true, Optional.empty(), 1));
 
         // Become leader after receiving the vote
-        client.poll();
-        assertEquals(ElectionState.withElectedLeader(1, localId, voters), quorumStateStore.readElectionState());
-        long electionTimestamp = time.milliseconds();
+        context.client.poll();
+        assertEquals(ElectionState.withElectedLeader(1, LOCAL_ID, voters), context.quorumStateStore.readElectionState());
+        long electionTimestamp = context.time.milliseconds();
 
         // Leader change record appended
-        assertEquals(1L, log.endOffset().offset);
-        assertEquals(1L, log.lastFlushedOffset());
+        assertEquals(1L, context.log.endOffset().offset);
+        assertEquals(1L, context.log.lastFlushedOffset());
 
         // Send BeginQuorumEpoch to voters
-        client.poll();
-        assertSentBeginQuorumEpochRequest(1);
+        context.client.poll();
+        context.assertSentBeginQuorumEpochRequest(1);
 
-        Records records = log.read(0, Isolation.UNCOMMITTED).records;
+        Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
         RecordBatch batch = records.batches().iterator().next();
         assertTrue(batch.isControlBatch());
 
         Record record = batch.iterator().next();
         assertEquals(electionTimestamp, record.timestamp());
-        verifyLeaderChangeMessage(localId, Collections.singletonList(otherNodeId),
-            record.key(), record.value());
+        RaftClientTestContext.verifyLeaderChangeMessage(LOCAL_ID, Collections.singletonList(otherNodeId), record.key(), record.value());

Review comment:
       Yeah, that's fair. It looks like the code current just includes all followers. I guess we need to carry over the voters into the `LeaderState` if we want to implement the description above. Let's open a separate sub-task for https://issues.apache.org/jira/browse/KAFKA-9876 and decide what we want to do there.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #9476:
URL: https://github.com/apache/kafka/pull/9476


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r510488902



##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -278,13 +265,13 @@ public void testEndQuorumStartsNewElectionAfterBackoffIfReceivedFromVotedCandida
             .withVotedCandidate(epoch, otherNodeId)

Review comment:
       The pattern I had in mind was a little different. I was thinking something like this:
   
   ```java
           int localId = 0;
           int otherNodeId = 1;
           int epoch = 2;
           Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
   
           RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
             .withVotedCandidate(epoch, otherNodeId)
             .build()
   ```
   
   Then we don't have the awkwardness of the partial reliance on the static `LOCAL_ID`. I like this better because the ids have to be explicitly declared in each test case, which makes it easier to follow.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r510498433



##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -278,13 +265,13 @@ public void testEndQuorumStartsNewElectionAfterBackoffIfReceivedFromVotedCandida
             .withVotedCandidate(epoch, otherNodeId)

Review comment:
       Makes sense. I implemented this suggestion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r509857105



##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -1729,126 +1695,156 @@ public void testLeaderGracefulShutdownTimeout() throws Exception {
     public void testFollowerGracefulShutdown() throws Exception {
         int otherNodeId = 1;
         int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
 
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
-        quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters));
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), quorumStateStore.readElectionState());
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters));
+                });
+            })
+            .build(voters);
 
-        client.poll();
+        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), context.quorumStateStore.readElectionState());
+
+        context.client.poll();
 
         int shutdownTimeoutMs = 5000;
-        CompletableFuture<Void> shutdownFuture = client.shutdown(shutdownTimeoutMs);
-        assertTrue(client.isRunning());
+        CompletableFuture<Void> shutdownFuture = context.client.shutdown(shutdownTimeoutMs);
+        assertTrue(context.client.isRunning());
         assertFalse(shutdownFuture.isDone());
 
-        client.poll();
-        assertFalse(client.isRunning());
+        context.client.poll();
+        assertFalse(context.client.isRunning());
         assertTrue(shutdownFuture.isDone());
         assertNull(shutdownFuture.get());
     }
 
     @Test
     public void testGracefulShutdownSingleMemberQuorum() throws IOException {
-        KafkaRaftClient client = buildClient(Collections.singleton(localId));
+        RaftClientTestContext context = RaftClientTestContext.build(Collections.singleton(LOCAL_ID));
+
         assertEquals(ElectionState.withElectedLeader(
-            1, localId, Collections.singleton(localId)), quorumStateStore.readElectionState());
-        client.poll();
-        assertEquals(0, channel.drainSendQueue().size());
+            1, LOCAL_ID, Collections.singleton(LOCAL_ID)), context.quorumStateStore.readElectionState());
+        context.client.poll();
+        assertEquals(0, context.channel.drainSendQueue().size());
         int shutdownTimeoutMs = 5000;
-        client.shutdown(shutdownTimeoutMs);
-        assertTrue(client.isRunning());
-        client.poll();
-        assertFalse(client.isRunning());
+        context.client.shutdown(shutdownTimeoutMs);
+        assertTrue(context.client.isRunning());
+        context.client.poll();
+        assertFalse(context.client.isRunning());
     }
 
     @Test
     public void testFollowerReplication() throws Exception {
         int otherNodeId = 1;
         int epoch = 5;
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
-        quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters));
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), quorumStateStore.readElectionState());
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters));
+                });
+            })
+            .build(voters);
+
+        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), context.quorumStateStore.readElectionState());
 
-        pollUntilSend(client);
+        context.pollUntilSend();
 
-        int fetchQuorumCorrelationId = assertSentFetchRequest(epoch, 0L, 0);
+        int fetchQuorumCorrelationId = context.assertSentFetchRequest(epoch, 0L, 0);
         Records records = MemoryRecords.withRecords(0L, CompressionType.NONE,
             3, new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes()));
         FetchResponseData response = fetchResponse(epoch, otherNodeId, records, 0L, Errors.NONE);
-        deliverResponse(fetchQuorumCorrelationId, otherNodeId, response);
+        context.deliverResponse(fetchQuorumCorrelationId, otherNodeId, response);
 
-        client.poll();
-        assertEquals(2L, log.endOffset().offset);
-        assertEquals(2L, log.lastFlushedOffset());
+        context.client.poll();
+        assertEquals(2L, context.log.endOffset().offset);
+        assertEquals(2L, context.log.lastFlushedOffset());
     }
 
     @Test
     public void testEmptyRecordSetInFetchResponse() throws Exception {
         int otherNodeId = 1;
         int epoch = 5;
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
-        quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters));
-        KafkaRaftClient client = buildClient(voters);
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
 
-        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), quorumStateStore.readElectionState());
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters));
+                });
+            })
+            .build(voters);
+
+        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), context.quorumStateStore.readElectionState());

Review comment:
       Thanks for the suggestion. I implemented this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#issuecomment-714846028


   The build failure appears to be unrelated. 
   ```
   
   [2020-10-23T01:08:22.296Z] [ERROR] Failed to execute goal org.apache.maven.plugins:maven-archetype-plugin:3.2.0:generate (default-cli) on project standalone-pom: A Maven project already exists in the directory /home/jenkins/workspace/Kafka_kafka-pr_PR-9476/streams/quickstart/test-streams-archetype/streams.examples -> [Help 1]
   [2020-10-23T01:08:22.296Z] [ERROR] 
   [2020-10-23T01:08:22.296Z] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
   [2020-10-23T01:08:22.297Z] [ERROR] Re-run Maven using the -X switch to enable full debug logging.
   [2020-10-23T01:08:22.297Z] [ERROR] 
   [2020-10-23T01:08:22.297Z] [ERROR] For more information about the errors and possible solutions, please read the following articles:
   [2020-10-23T01:08:22.298Z] [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
   ```
   
   I will merge to trunk.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r510331249



##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -1536,67 +1522,70 @@ public void testObserverLeaderRediscoveryAfterRequestTimeout() throws Exception
         int otherNodeId = 2;
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(leaderId, otherNodeId);
-        KafkaRaftClient client = buildClient(voters);
-        discoverLeaderAsObserver(client, voters, leaderId, epoch);
 
-        pollUntilSend(client);
-        RaftRequest.Outbound fetchRequest1 = assertSentFetchRequest();
+        RaftClientTestContext context = RaftClientTestContext.build(voters);
+
+        context.discoverLeaderAsObserver(voters, leaderId, epoch);
+
+        context.pollUntilSend();
+        RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
         assertEquals(leaderId, fetchRequest1.destinationId());
-        assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
+        RaftClientTestContext.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
 
-        time.sleep(requestTimeoutMs);
-        pollUntilSend(client);
+        context.time.sleep(REQUEST_TIMEOUT_MS);
+        context.pollUntilSend();
 
         // We should retry the Fetch against the other voter since the original
         // voter connection will be backing off.
-        RaftRequest.Outbound fetchRequest2 = assertSentFetchRequest();
+        RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
         assertNotEquals(leaderId, fetchRequest2.destinationId());
         assertTrue(voters.contains(fetchRequest2.destinationId()));
-        assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
+        RaftClientTestContext.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
 
-        deliverResponse(fetchRequest2.correlationId, fetchRequest2.destinationId(),
+        context.deliverResponse(fetchRequest2.correlationId, fetchRequest2.destinationId(),
             fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
-        client.poll();
+        context.client.poll();
 
-        assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), quorumStateStore.readElectionState());
+        assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), context.quorumStateStore.readElectionState());
     }
 
     @Test
     public void testLeaderGracefulShutdown() throws Exception {
         int otherNodeId = 1;
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
         int epoch = 1;
-        KafkaRaftClient client = initializeAsLeader(voters, epoch);
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
+
+        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(voters, epoch);
 
         // Now shutdown
         int shutdownTimeoutMs = 5000;
-        CompletableFuture<Void> shutdownFuture = client.shutdown(shutdownTimeoutMs);
+        CompletableFuture<Void> shutdownFuture = context.client.shutdown(shutdownTimeoutMs);
 
         // We should still be running until we have had a chance to send EndQuorumEpoch
-        assertTrue(client.isShuttingDown());
-        assertTrue(client.isRunning());
+        assertTrue(context.client.isShuttingDown());

Review comment:
       I thought about this last night and hack some solutions. I wasn't very pleased with the result. Let's explore this improvement in a future PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org