You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/23 15:53:53 UTC

[GitHub] [kafka] jsancio opened a new pull request #11116: KAFKA-13114: Revert state and reregister raft listener

jsancio opened a new pull request #11116:
URL: https://github.com/apache/kafka/pull/11116


   RaftClient's scheduleAppend may split the list of records into multiple
   batches. This means that it is possible for the active controller to
   see a committed offset for which it doesn't have an in-memory snapshot.
   
   If the active controller needs to renounce and it is missing an
   in-memory snapshot, then revert the state and reregister the Raft
   listener. This will cause the controller to replay the entire metadata
   partition.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #11116: KAFKA-13114: Revert state and reregister raft listener

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #11116:
URL: https://github.com/apache/kafka/pull/11116#issuecomment-887935935


   I'm trying to think of some approach for validating this logic. It is difficult because it is handling unexpected exceptions. One thought I had is implementing a poison message of some kind which could expire after some TTL. When the controller sees the poison message, it would check if it is still active and raise an exception accordingly. Something like that could be used in an integration test, which might be simpler than trying to induce a failure by mucking with internal state.
   
   Another idea is to corrupt the log on one of the nodes, but I'm not sure this would hit the right path. In fact, this is probably a gap at the moment. If the batch reader fails during iteration, we should probably resign and perhaps even fail. I'll file a separate JIRA for this.
   
   In any case, I think we should try to come up with some way to exercise this path. Otherwise it's hard to say if it even works (though it looks reasonable enough).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11116: KAFKA-13114: Revert state and reregister raft listener

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11116:
URL: https://github.com/apache/kafka/pull/11116#discussion_r680252689



##########
File path: metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
##########
@@ -631,7 +658,34 @@ public void unregister(RaftClient.Listener<ApiMessageAndVersion> listener) {
 
     @Override
     public Long scheduleAppend(int epoch, List<ApiMessageAndVersion> batch) {
-        return scheduleAtomicAppend(epoch, batch);
+        if (batch.isEmpty()) {
+            throw new IllegalArgumentException("Batch cannot be empty");
+        }
+
+        List<ApiMessageAndVersion> first = batch.subList(0, batch.size() / 2);
+        List<ApiMessageAndVersion> second = batch.subList(batch.size() / 2, batch.size());
+
+        assertEquals(batch.size(), first.size() + second.size());
+        assertFalse(second.isEmpty());
+
+        OptionalLong firstOffset = first
+            .stream()
+            .mapToLong(record -> scheduleAtomicAppend(epoch, Collections.singletonList(record)))
+            .max();
+
+        if (firstOffset.isPresent() && resignAfterNonAtomicCommit.getAndSet(false)) {
+            // Emulate losing leadering in them middle of a non-atomic append by not writing

Review comment:
       Fix.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11116: KAFKA-13114: Revert state and reregister raft listener

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11116:
URL: https://github.com/apache/kafka/pull/11116#discussion_r677635360



##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -783,6 +798,11 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) {
                 });
             } else if (curClaimEpoch != -1) {
                 appendControlEvent("handleRenounce[" + curClaimEpoch + "]", () -> {
+                    if (this != metaLogListener) {

Review comment:
       Good idea. Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11116: KAFKA-13114: Revert state and reregister raft listener

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11116:
URL: https://github.com/apache/kafka/pull/11116#discussion_r680252627



##########
File path: metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -607,4 +612,143 @@ public void testEarlyControllerResults() throws Throwable {
             }
         }
     }
+
+    @Test
+    public void testMissingInMemorySnapshot() throws Exception {
+        int numBrokers = 3;
+        int numPartitions = 3;
+        String topicName = "topic-name";
+
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) {
+            try (QuorumControllerTestEnv controlEnv =
+                     new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))) {
+                QuorumController controller = controlEnv.activeController();
+
+                Map<Integer, Long> brokerEpochs = registerBrokers(controller, numBrokers);
+
+                // Create a lot of partitions
+                List<CreatableReplicaAssignment> partitions = IntStream
+                    .range(0, numPartitions)
+                    .mapToObj(partitionIndex -> new CreatableReplicaAssignment()
+                        .setPartitionIndex(partitionIndex)
+                        .setBrokerIds(Arrays.asList(0, 1, 2))
+                    )
+                    .collect(Collectors.toList());
+
+                Uuid topicId = controller.createTopics(
+                    new CreateTopicsRequestData()
+                        .setTopics(
+                            new CreatableTopicCollection(
+                                Collections.singleton(
+                                    new CreatableTopic()
+                                        .setName(topicName)
+                                        .setNumPartitions(-1)
+                                        .setReplicationFactor((short) -1)
+                                        .setAssignments(new CreatableReplicaAssignmentCollection(partitions.iterator()))
+                                ).iterator()
+                            )
+                        )
+                ).get().topics().find(topicName).topicId();
+
+                // Create a lot of alter isr
+                List<AlterIsrRequestData.PartitionData> alterIsrs = IntStream
+                    .range(0, numPartitions)
+                    .mapToObj(partitionIndex -> {
+                        PartitionRegistration partitionRegistration = controller.replicationControl().getPartition(
+                            topicId,
+                            partitionIndex
+                        );
+
+                        return new AlterIsrRequestData.PartitionData()
+                            .setPartitionIndex(partitionIndex)
+                            .setLeaderEpoch(partitionRegistration.leaderEpoch)
+                            .setCurrentIsrVersion(partitionRegistration.partitionEpoch)
+                            .setNewIsr(Arrays.asList(0, 1));
+                    })
+                    .collect(Collectors.toList());
+
+                AlterIsrRequestData.TopicData topicData = new AlterIsrRequestData.TopicData()
+                        .setName(topicName);
+                topicData.partitions().addAll(alterIsrs);
+
+                int leaderId = 0;
+                AlterIsrRequestData alterIsrRequest = new AlterIsrRequestData()
+                    .setBrokerId(leaderId)
+                    .setBrokerEpoch(brokerEpochs.get(leaderId));
+                alterIsrRequest.topics().add(topicData);
+
+                logEnv.logManagers().get(0).resignAfterNonAtomicCommit();
+
+                assertThrows(
+                    ExecutionException.class,
+                    () -> controller.alterIsr(alterIsrRequest).get()
+                );
+
+                // Wait for the new active controller
+                final QuorumController newController = controlEnv.activeController();

Review comment:
       Good idea. Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11116: KAFKA-13114: Revert state and reregister raft listener

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11116:
URL: https://github.com/apache/kafka/pull/11116#discussion_r676949613



##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -783,6 +798,11 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) {
                 });
             } else if (curClaimEpoch != -1) {
                 appendControlEvent("handleRenounce[" + curClaimEpoch + "]", () -> {
+                    if (this != metaLogListener) {

Review comment:
       Maybe we can add a little helper? The only difference in all of these checks is the type of event.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11116: KAFKA-13114: Revert state and reregister raft listener

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11116:
URL: https://github.com/apache/kafka/pull/11116#discussion_r679534669



##########
File path: metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
##########
@@ -631,7 +658,34 @@ public void unregister(RaftClient.Listener<ApiMessageAndVersion> listener) {
 
     @Override
     public Long scheduleAppend(int epoch, List<ApiMessageAndVersion> batch) {
-        return scheduleAtomicAppend(epoch, batch);
+        if (batch.isEmpty()) {
+            throw new IllegalArgumentException("Batch cannot be empty");
+        }
+
+        List<ApiMessageAndVersion> first = batch.subList(0, batch.size() / 2);
+        List<ApiMessageAndVersion> second = batch.subList(batch.size() / 2, batch.size());
+
+        assertEquals(batch.size(), first.size() + second.size());
+        assertFalse(second.isEmpty());
+
+        OptionalLong firstOffset = first
+            .stream()
+            .mapToLong(record -> scheduleAtomicAppend(epoch, Collections.singletonList(record)))
+            .max();
+
+        if (firstOffset.isPresent() && resignAfterNonAtomicCommit.getAndSet(false)) {
+            // Emulate losing leadering in them middle of a non-atomic append by not writing

Review comment:
       nit: losing leadership?

##########
File path: metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -607,4 +612,143 @@ public void testEarlyControllerResults() throws Throwable {
             }
         }
     }
+
+    @Test
+    public void testMissingInMemorySnapshot() throws Exception {
+        int numBrokers = 3;
+        int numPartitions = 3;
+        String topicName = "topic-name";
+
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) {
+            try (QuorumControllerTestEnv controlEnv =

Review comment:
       nit: could we pull this into the first `try`?

##########
File path: metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -607,4 +612,143 @@ public void testEarlyControllerResults() throws Throwable {
             }
         }
     }
+
+    @Test
+    public void testMissingInMemorySnapshot() throws Exception {
+        int numBrokers = 3;
+        int numPartitions = 3;
+        String topicName = "topic-name";
+
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) {
+            try (QuorumControllerTestEnv controlEnv =
+                     new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))) {
+                QuorumController controller = controlEnv.activeController();
+
+                Map<Integer, Long> brokerEpochs = registerBrokers(controller, numBrokers);
+
+                // Create a lot of partitions
+                List<CreatableReplicaAssignment> partitions = IntStream
+                    .range(0, numPartitions)
+                    .mapToObj(partitionIndex -> new CreatableReplicaAssignment()
+                        .setPartitionIndex(partitionIndex)
+                        .setBrokerIds(Arrays.asList(0, 1, 2))
+                    )
+                    .collect(Collectors.toList());
+
+                Uuid topicId = controller.createTopics(
+                    new CreateTopicsRequestData()
+                        .setTopics(
+                            new CreatableTopicCollection(
+                                Collections.singleton(
+                                    new CreatableTopic()
+                                        .setName(topicName)
+                                        .setNumPartitions(-1)
+                                        .setReplicationFactor((short) -1)
+                                        .setAssignments(new CreatableReplicaAssignmentCollection(partitions.iterator()))
+                                ).iterator()
+                            )
+                        )
+                ).get().topics().find(topicName).topicId();
+
+                // Create a lot of alter isr
+                List<AlterIsrRequestData.PartitionData> alterIsrs = IntStream
+                    .range(0, numPartitions)
+                    .mapToObj(partitionIndex -> {
+                        PartitionRegistration partitionRegistration = controller.replicationControl().getPartition(
+                            topicId,
+                            partitionIndex
+                        );
+
+                        return new AlterIsrRequestData.PartitionData()
+                            .setPartitionIndex(partitionIndex)
+                            .setLeaderEpoch(partitionRegistration.leaderEpoch)
+                            .setCurrentIsrVersion(partitionRegistration.partitionEpoch)
+                            .setNewIsr(Arrays.asList(0, 1));
+                    })
+                    .collect(Collectors.toList());
+
+                AlterIsrRequestData.TopicData topicData = new AlterIsrRequestData.TopicData()
+                        .setName(topicName);
+                topicData.partitions().addAll(alterIsrs);
+
+                int leaderId = 0;
+                AlterIsrRequestData alterIsrRequest = new AlterIsrRequestData()
+                    .setBrokerId(leaderId)
+                    .setBrokerEpoch(brokerEpochs.get(leaderId));
+                alterIsrRequest.topics().add(topicData);
+
+                logEnv.logManagers().get(0).resignAfterNonAtomicCommit();
+
+                assertThrows(
+                    ExecutionException.class,
+                    () -> controller.alterIsr(alterIsrRequest).get()
+                );
+
+                // Wait for the new active controller
+                final QuorumController newController = controlEnv.activeController();

Review comment:
       This confused me a little bit since we are trying to verify that the state on the original controller resets properly. That is what is happening here since there is only one controller in the test, but it is obscured a little bit by the new variable. Maybe it would be clearer to use the original reference and write this as:
   ```java
   assertEquals(controller, controlEnv.activeController());
   ```
   Also, is there an epoch or something we can bump to ensure the transition?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11116: KAFKA-13114: Revert state and reregister raft listener

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11116:
URL: https://github.com/apache/kafka/pull/11116#discussion_r680248650



##########
File path: metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -607,4 +612,143 @@ public void testEarlyControllerResults() throws Throwable {
             }
         }
     }
+
+    @Test
+    public void testMissingInMemorySnapshot() throws Exception {
+        int numBrokers = 3;
+        int numPartitions = 3;
+        String topicName = "topic-name";
+
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) {
+            try (QuorumControllerTestEnv controlEnv =

Review comment:
       Cool. I didn't know that was valid Java.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11116: KAFKA-13114: Revert state and reregister raft listener

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11116:
URL: https://github.com/apache/kafka/pull/11116#discussion_r679475174



##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -580,12 +580,17 @@ public void run() throws Exception {
                 // written before we can return our result to the user.  Here, we hand off
                 // the batch of records to the raft client.  They will be written out
                 // asynchronously.
-                final long offset;
+                final Long offset;
                 if (result.isAtomic()) {
                     offset = raftClient.scheduleAtomicAppend(controllerEpoch, result.records());
                 } else {
                     offset = raftClient.scheduleAppend(controllerEpoch, result.records());
                 }
+                if (offset == null) {
+                    throw new IllegalStateException("The raft client was unable to allocate a buffer for an append");
+                } else if (offset == Long.MAX_VALUE) {
+                    throw new IllegalStateException("Unable to append records since this is not the leader");
+                }

Review comment:
       This is a partial fix until we merge https://github.com/apache/kafka/pull/10909




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11116: KAFKA-13114: Revert state and reregister raft listener

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11116:
URL: https://github.com/apache/kafka/pull/11116#discussion_r680256091



##########
File path: metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
##########
@@ -631,7 +658,34 @@ public void unregister(RaftClient.Listener<ApiMessageAndVersion> listener) {
 
     @Override
     public Long scheduleAppend(int epoch, List<ApiMessageAndVersion> batch) {
-        return scheduleAtomicAppend(epoch, batch);
+        if (batch.isEmpty()) {
+            throw new IllegalArgumentException("Batch cannot be empty");
+        }
+
+        List<ApiMessageAndVersion> first = batch.subList(0, batch.size() / 2);
+        List<ApiMessageAndVersion> second = batch.subList(batch.size() / 2, batch.size());
+
+        assertEquals(batch.size(), first.size() + second.size());
+        assertFalse(second.isEmpty());
+
+        OptionalLong firstOffset = first
+            .stream()
+            .mapToLong(record -> scheduleAtomicAppend(epoch, Collections.singletonList(record)))
+            .max();
+
+        if (firstOffset.isPresent() && resignAfterNonAtomicCommit.getAndSet(false)) {
+            // Emulate losing leadership in them middle of a non-atomic append by not writing

Review comment:
       ```suggestion
               // Emulate losing leadership in the middle of a non-atomic append by not writing
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11116: KAFKA-13114: Revert state and reregister raft listener

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11116:
URL: https://github.com/apache/kafka/pull/11116#discussion_r680254022



##########
File path: metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -619,108 +620,114 @@ public void testMissingInMemorySnapshot() throws Exception {
         int numPartitions = 3;
         String topicName = "topic-name";
 
-        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) {
-            try (QuorumControllerTestEnv controlEnv =
-                     new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))) {
-                QuorumController controller = controlEnv.activeController();
-
-                Map<Integer, Long> brokerEpochs = registerBrokers(controller, numBrokers);
-
-                // Create a lot of partitions
-                List<CreatableReplicaAssignment> partitions = IntStream
-                    .range(0, numPartitions)
-                    .mapToObj(partitionIndex -> new CreatableReplicaAssignment()
-                        .setPartitionIndex(partitionIndex)
-                        .setBrokerIds(Arrays.asList(0, 1, 2))
-                    )
-                    .collect(Collectors.toList());
-
-                Uuid topicId = controller.createTopics(
-                    new CreateTopicsRequestData()
-                        .setTopics(
-                            new CreatableTopicCollection(
-                                Collections.singleton(
-                                    new CreatableTopic()
-                                        .setName(topicName)
-                                        .setNumPartitions(-1)
-                                        .setReplicationFactor((short) -1)
-                                        .setAssignments(new CreatableReplicaAssignmentCollection(partitions.iterator()))
-                                ).iterator()
-                            )
+        try (
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
+            QuorumControllerTestEnv controlEnv =
+                new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))
+        ) {
+            QuorumController controller = controlEnv.activeController();
+
+            Map<Integer, Long> brokerEpochs = registerBrokers(controller, numBrokers);
+
+            // Create a lot of partitions
+            List<CreatableReplicaAssignment> partitions = IntStream
+                .range(0, numPartitions)
+                .mapToObj(partitionIndex -> new CreatableReplicaAssignment()
+                    .setPartitionIndex(partitionIndex)
+                    .setBrokerIds(Arrays.asList(0, 1, 2))
+                )
+                .collect(Collectors.toList());
+
+            Uuid topicId = controller.createTopics(
+                new CreateTopicsRequestData()
+                    .setTopics(
+                        new CreatableTopicCollection(
+                            Collections.singleton(
+                                new CreatableTopic()
+                                    .setName(topicName)
+                                    .setNumPartitions(-1)
+                                    .setReplicationFactor((short) -1)
+                                    .setAssignments(new CreatableReplicaAssignmentCollection(partitions.iterator()))
+                            ).iterator()
                         )
-                ).get().topics().find(topicName).topicId();
-
-                // Create a lot of alter isr
-                List<AlterIsrRequestData.PartitionData> alterIsrs = IntStream
-                    .range(0, numPartitions)
-                    .mapToObj(partitionIndex -> {
-                        PartitionRegistration partitionRegistration = controller.replicationControl().getPartition(
-                            topicId,
-                            partitionIndex
-                        );
-
-                        return new AlterIsrRequestData.PartitionData()
-                            .setPartitionIndex(partitionIndex)
-                            .setLeaderEpoch(partitionRegistration.leaderEpoch)
-                            .setCurrentIsrVersion(partitionRegistration.partitionEpoch)
-                            .setNewIsr(Arrays.asList(0, 1));
-                    })
-                    .collect(Collectors.toList());
-
-                AlterIsrRequestData.TopicData topicData = new AlterIsrRequestData.TopicData()
-                        .setName(topicName);
-                topicData.partitions().addAll(alterIsrs);
-
-                int leaderId = 0;
-                AlterIsrRequestData alterIsrRequest = new AlterIsrRequestData()
-                    .setBrokerId(leaderId)
-                    .setBrokerEpoch(brokerEpochs.get(leaderId));
-                alterIsrRequest.topics().add(topicData);
-
-                logEnv.logManagers().get(0).resignAfterNonAtomicCommit();
-
-                assertThrows(
-                    ExecutionException.class,
-                    () -> controller.alterIsr(alterIsrRequest).get()
-                );
-
-                // Wait for the new active controller
-                final QuorumController newController = controlEnv.activeController();
-
-                // Since the alterIsr partially failed we expect to see
-                // some partitions to still have 2 in the ISR.
-                int partitionsWithReplica2 = Utils.toList(
-                    newController
-                        .replicationControl()
-                        .brokersToIsrs()
-                        .partitionsWithBrokerInIsr(2)
-                ).size();
-                int partitionsWithReplica0 = Utils.toList(
-                    newController
-                        .replicationControl()
-                        .brokersToIsrs()
-                        .partitionsWithBrokerInIsr(0)
-                ).size();
-
-                assertEquals(numPartitions, partitionsWithReplica0);
-                assertNotEquals(0, partitionsWithReplica2);
-                assertTrue(
-                    partitionsWithReplica0 > partitionsWithReplica2,
-                    String.format(
-                        "partitionsWithReplica0 = %s, partitionsWithReplica2 = %s",
-                        partitionsWithReplica0,
-                        partitionsWithReplica2
                     )
-                );
-            }
+            ).get().topics().find(topicName).topicId();
+
+            // Create a lot of alter isr
+            List<AlterIsrRequestData.PartitionData> alterIsrs = IntStream
+                .range(0, numPartitions)
+                .mapToObj(partitionIndex -> {
+                    PartitionRegistration partitionRegistration = controller.replicationControl().getPartition(
+                        topicId,
+                        partitionIndex
+                    );
+
+                    return new AlterIsrRequestData.PartitionData()
+                        .setPartitionIndex(partitionIndex)
+                        .setLeaderEpoch(partitionRegistration.leaderEpoch)
+                        .setCurrentIsrVersion(partitionRegistration.partitionEpoch)
+                        .setNewIsr(Arrays.asList(0, 1));
+                })
+                .collect(Collectors.toList());
+
+            AlterIsrRequestData.TopicData topicData = new AlterIsrRequestData.TopicData()
+                .setName(topicName);
+            topicData.partitions().addAll(alterIsrs);
+
+            int leaderId = 0;
+            AlterIsrRequestData alterIsrRequest = new AlterIsrRequestData()
+                .setBrokerId(leaderId)
+                .setBrokerEpoch(brokerEpochs.get(leaderId));
+            alterIsrRequest.topics().add(topicData);
+
+            logEnv.logManagers().get(0).resignAfterNonAtomicCommit();
+
+            int oldClaimEpoch = controller.curClaimEpoch();
+            assertThrows(
+                ExecutionException.class,
+                () -> controller.alterIsr(alterIsrRequest).get()
+            );
+
+            // Wait for the controller to become active again
+            assertSame(controller, controlEnv.activeController());
+            assertTrue(
+                oldClaimEpoch < controller.curClaimEpoch(),
+                String.format("oldClaimEpoch = %s, newClaimEpoch = %s", oldClaimEpoch, controller.curClaimEpoch())
+            );

Review comment:
       Only this should have changed. The rest are indentation changes from the previous commit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #11116: KAFKA-13114: Revert state and reregister raft listener

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #11116:
URL: https://github.com/apache/kafka/pull/11116


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11116: KAFKA-13114: Revert state and reregister raft listener

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11116:
URL: https://github.com/apache/kafka/pull/11116#discussion_r676949613



##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -783,6 +798,11 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) {
                 });
             } else if (curClaimEpoch != -1) {
                 appendControlEvent("handleRenounce[" + curClaimEpoch + "]", () -> {
+                    if (this != metaLogListener) {

Review comment:
       Maybe we can add a little helper? The only thing difference in all of these checks is the type of event.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org