You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/05 21:52:48 UTC

[GitHub] [kafka] jsancio commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

jsancio commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r607353807



##########
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##########
@@ -48,13 +49,16 @@
     private final Set<Integer> grantingVoters = new HashSet<>();
     private final Logger log;
 
+    private final BatchAccumulator<T> accumulator;
+
     protected LeaderState(
         int localId,
         int epoch,
         long epochStartOffset,
         Set<Integer> voters,
         Set<Integer> grantingVoters,
-        LogContext logContext
+        LogContext logContext,
+        BatchAccumulator<T> accumulator

Review comment:
       I would keep `LogContext` as the last argument.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2252,9 +2247,12 @@ public Long scheduleAtomicAppend(int epoch, List<T> records) {
         return append(epoch, records, true);
     }
 
+    @SuppressWarnings("unchecked")
     private Long append(int epoch, List<T> records, boolean isAtomic) {
-        BatchAccumulator<T> accumulator = this.accumulator;
-        if (accumulator == null) {
+        BatchAccumulator<T> accumulator;
+        try {
+            accumulator =  (BatchAccumulator<T>) quorum.leaderStateOrThrow().accumulator();

Review comment:
       I think you should be able to remove this cast.

##########
File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
##########
@@ -36,30 +36,31 @@
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class LeaderStateTest {
+public class LeaderStateTest<T> {
     private final int localId = 0;
     private final int epoch = 5;
     private final LogContext logContext = new LogContext();
 
-    private LeaderState newLeaderState(
+    private LeaderState<T> newLeaderState(
         Set<Integer> voters,
         long epochStartOffset
     ) {
-        return new LeaderState(
+        return new LeaderState<>(
             localId,
             epoch,
             epochStartOffset,
             voters,
             voters,
-            logContext
+            logContext,
+            null

Review comment:
       I would not pass a `null` and add a test checking that this field is returned correctly.

##########
File path: raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
##########
@@ -269,7 +269,7 @@ public void testCandidateToLeader() throws IOException {
         assertTrue(state.isCandidate());
         assertEquals(1, state.epoch());
 
-        state.transitionToLeader(0L);
+        state.transitionToLeader(0L, null);

Review comment:
       Again, the code in `KafkaRaftClient` assumes that this field cannot be `null`.

##########
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##########
@@ -319,6 +328,10 @@ public String name() {
     }
 
     @Override
-    public void close() {}
+    public void close() {
+        if (accumulator != null) {

Review comment:
       When would accumulator be null?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org