You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "CalvinConfluent (via GitHub)" <gi...@apache.org> on 2023/03/16 22:20:10 UTC

[GitHub] [kafka] CalvinConfluent opened a new pull request, #13408: KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request.

CalvinConfluent opened a new pull request, #13408:
URL: https://github.com/apache/kafka/pull/13408

   As the second part of the [KIP-903](https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR), it updates the AlterPartitionRequest:
   
   - Deprecate the NewIsr field
   - Create a new field BrokerState with BrokerId and BrokerEpoch
   - Bump the AlterPartition version to 3
   
   Also in this change, The Kraft controller will be enabled to reject stale AlterPartition request.
   
   https://issues.apache.org/jira/browse/KAFKA-14617


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13408: KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request.

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13408:
URL: https://github.com/apache/kafka/pull/13408#discussion_r1152957342


##########
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java:
##########
@@ -861,8 +872,9 @@ public void testEarlyControllerResults() throws Throwable {
     }
 
     @Disabled // TODO: need to fix leader election in LocalLog.
-    @Test
-    public void testMissingInMemorySnapshot() throws Exception {
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)

Review Comment:
   ditto?



##########
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java:
##########
@@ -343,8 +348,9 @@ public void testFenceMultipleBrokers() throws Throwable {
         }
     }
 
-    @Test
-    public void testBalancePartitionLeaders() throws Throwable {
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)

Review Comment:
   Why are we doing this change?



##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -283,7 +283,7 @@ class DefaultAlterPartitionManager(
         val partitionData = new AlterPartitionRequestData.PartitionData()
           .setPartitionIndex(item.topicIdPartition.partition)
           .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
-          .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+          .setNewIsrWithEpochs(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(item.leaderAndIsr.isr.map(Integer.valueOf).asJava))

Review Comment:
   nit: It seems that `item.leaderAndIsr.isr.map(Integer.valueOf).asJava` will create an intermediate collection that is used to create a new collection in `newIsrToSimpleNewIsrWithBrokerEpochs`. Should we combine the two and directly create the `BrokerState` here?



##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -847,15 +849,15 @@ public void testShrinkAndExpandIsr() throws Exception {
         assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
         long brokerEpoch = ctx.currentBrokerEpoch(0);
         PartitionData shrinkIsrRequest = newAlterPartition(
-            replicationControl, topicIdPartition, asList(0, 1), LeaderRecoveryState.RECOVERED);
+            replicationControl, topicIdPartition, generateIsrWithTestDefaultEpoch(asList(0, 1)), LeaderRecoveryState.RECOVERED);

Review Comment:
   nit: How about having `isrWithDefaultEpoch(0, 1)`? It seems that we could remove the `asList`. 



##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -950,41 +952,52 @@ public void testInvalidAlterPartitionRequests() throws Exception {
 
         // Invalid ISR (3 is not a valid replica)
         PartitionData invalidIsrRequest1 = newAlterPartition(
-            replicationControl, topicIdPartition, asList(0, 1, 3), LeaderRecoveryState.RECOVERED);
+            replicationControl, topicIdPartition, generateIsrWithTestDefaultEpoch(asList(0, 1, 3)), LeaderRecoveryState.RECOVERED);
         ControllerResult<AlterPartitionResponseData> invalidIsrResult1 = sendAlterPartition(
             replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
             topicIdPartition.topicId(), invalidIsrRequest1);
         assertAlterPartitionResponse(invalidIsrResult1, topicIdPartition, Errors.INVALID_REQUEST);
 
         // Invalid ISR (does not include leader 0)
         PartitionData invalidIsrRequest2 = newAlterPartition(
-            replicationControl, topicIdPartition, asList(1, 2), LeaderRecoveryState.RECOVERED);
+            replicationControl, topicIdPartition, generateIsrWithTestDefaultEpoch(asList(1, 2)), LeaderRecoveryState.RECOVERED);
         ControllerResult<AlterPartitionResponseData> invalidIsrResult2 = sendAlterPartition(
             replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
             topicIdPartition.topicId(), invalidIsrRequest2);
         assertAlterPartitionResponse(invalidIsrResult2, topicIdPartition, Errors.INVALID_REQUEST);
 
         // Invalid ISR length and recovery state
         PartitionData invalidIsrRecoveryRequest = newAlterPartition(
-            replicationControl, topicIdPartition, asList(0, 1), LeaderRecoveryState.RECOVERING);
+            replicationControl, topicIdPartition, generateIsrWithTestDefaultEpoch(asList(0, 1)), LeaderRecoveryState.RECOVERING);
         ControllerResult<AlterPartitionResponseData> invalidIsrRecoveryResult = sendAlterPartition(
             replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
             topicIdPartition.topicId(), invalidIsrRecoveryRequest);
         assertAlterPartitionResponse(invalidIsrRecoveryResult, topicIdPartition, Errors.INVALID_REQUEST);
 
         // Invalid recovery state transition from RECOVERED to RECOVERING
         PartitionData invalidRecoveryRequest = newAlterPartition(
-            replicationControl, topicIdPartition, asList(0), LeaderRecoveryState.RECOVERING);
+            replicationControl, topicIdPartition, generateIsrWithTestDefaultEpoch(asList(0)), LeaderRecoveryState.RECOVERING);
         ControllerResult<AlterPartitionResponseData> invalidRecoveryResult = sendAlterPartition(
             replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
             topicIdPartition.topicId(), invalidRecoveryRequest);
         assertAlterPartitionResponse(invalidRecoveryResult, topicIdPartition, Errors.INVALID_REQUEST);
+
+        // Stale ISR broker epoch request.

Review Comment:
   It seems that this unit test is more about testing `INVALID_REQUEST` cases. Should we just remove this case? It seems that it validates exactly what `testAlterPartitionShouldRejectBrokersWithStaleEpoch` validates as well, no?



##########
clients/src/test/java/org/apache/kafka/common/requests/AlterPartitionRequestTest.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.AlterPartitionRequestData;
+import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState;
+import org.apache.kafka.common.message.AlterPartitionRequestData.PartitionData;
+import org.apache.kafka.common.message.AlterPartitionRequestData.TopicData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.params.ParameterizedTest;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class AlterPartitionRequestTest {
+    String topic = "test-topic";
+    Uuid topicId = Uuid.randomUuid();
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+    public void testBuildAlterPartitionRequest(short version) {
+        AlterPartitionRequestData request = new AlterPartitionRequestData()
+            .setBrokerId(1)
+            .setBrokerEpoch(1);
+
+        TopicData topicData = new TopicData()
+            .setTopicId(topicId)
+            .setTopicName(topic);
+
+        List<BrokerState> newIsrWithBrokerEpoch = new LinkedList<>();
+        newIsrWithBrokerEpoch.add(new BrokerState().setBrokerId(1).setBrokerEpoch(1001));
+        newIsrWithBrokerEpoch.add(new BrokerState().setBrokerId(2).setBrokerEpoch(1002));
+        newIsrWithBrokerEpoch.add(new BrokerState().setBrokerId(3).setBrokerEpoch(1003));
+
+        topicData.partitions().add(new PartitionData()
+            .setPartitionIndex(0)
+            .setLeaderEpoch(1)
+            .setPartitionEpoch(10)
+            .setNewIsrWithEpochs(newIsrWithBrokerEpoch));
+
+        request.topics().add(topicData);
+
+        AlterPartitionRequest.Builder builder = new AlterPartitionRequest.Builder(request, version > 1);
+        AlterPartitionRequest alterPartitionRequest = builder.build(version);
+        assertEquals(1, alterPartitionRequest.data().topics().size());
+        assertEquals(1, alterPartitionRequest.data().topics().get(0).partitions().size());
+        PartitionData partitionData = alterPartitionRequest.data().topics().get(0).partitions().get(0);
+        assertEquals(version >= 3, partitionData.newIsr().isEmpty());
+        assertEquals(version < 3, partitionData.newIsrWithEpochs().isEmpty());

Review Comment:
   nit: Could we put those two in the if/else below? That would be clearer, I think.



##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -311,7 +313,7 @@ void createPartitions(int count, String name,
         void registerBrokers(Integer... brokerIds) throws Exception {
             for (int brokerId : brokerIds) {
                 RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
-                    setBrokerEpoch(brokerId + 100).setBrokerId(brokerId).setRack(null);
+                    setBrokerEpoch(generateTestDefaultBrokerEpoch(brokerId)).setBrokerId(brokerId).setRack(null);

Review Comment:
   nit: How about `defaultBrokerEpoch`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #13408: KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request.

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on PR #13408:
URL: https://github.com/apache/kafka/pull/13408#issuecomment-1489935597

   @CalvinConfluent Could you rebase to include f8d0fc835bf952f7a2ea24a8abb7e750533705c2? The build failed due to it, I think.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13408: KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request.

Posted by "CalvinConfluent (via GitHub)" <gi...@apache.org>.
CalvinConfluent commented on code in PR #13408:
URL: https://github.com/apache/kafka/pull/13408#discussion_r1153812757


##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -950,41 +952,52 @@ public void testInvalidAlterPartitionRequests() throws Exception {
 
         // Invalid ISR (3 is not a valid replica)
         PartitionData invalidIsrRequest1 = newAlterPartition(
-            replicationControl, topicIdPartition, asList(0, 1, 3), LeaderRecoveryState.RECOVERED);
+            replicationControl, topicIdPartition, generateIsrWithTestDefaultEpoch(asList(0, 1, 3)), LeaderRecoveryState.RECOVERED);
         ControllerResult<AlterPartitionResponseData> invalidIsrResult1 = sendAlterPartition(
             replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
             topicIdPartition.topicId(), invalidIsrRequest1);
         assertAlterPartitionResponse(invalidIsrResult1, topicIdPartition, Errors.INVALID_REQUEST);
 
         // Invalid ISR (does not include leader 0)
         PartitionData invalidIsrRequest2 = newAlterPartition(
-            replicationControl, topicIdPartition, asList(1, 2), LeaderRecoveryState.RECOVERED);
+            replicationControl, topicIdPartition, generateIsrWithTestDefaultEpoch(asList(1, 2)), LeaderRecoveryState.RECOVERED);
         ControllerResult<AlterPartitionResponseData> invalidIsrResult2 = sendAlterPartition(
             replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
             topicIdPartition.topicId(), invalidIsrRequest2);
         assertAlterPartitionResponse(invalidIsrResult2, topicIdPartition, Errors.INVALID_REQUEST);
 
         // Invalid ISR length and recovery state
         PartitionData invalidIsrRecoveryRequest = newAlterPartition(
-            replicationControl, topicIdPartition, asList(0, 1), LeaderRecoveryState.RECOVERING);
+            replicationControl, topicIdPartition, generateIsrWithTestDefaultEpoch(asList(0, 1)), LeaderRecoveryState.RECOVERING);
         ControllerResult<AlterPartitionResponseData> invalidIsrRecoveryResult = sendAlterPartition(
             replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
             topicIdPartition.topicId(), invalidIsrRecoveryRequest);
         assertAlterPartitionResponse(invalidIsrRecoveryResult, topicIdPartition, Errors.INVALID_REQUEST);
 
         // Invalid recovery state transition from RECOVERED to RECOVERING
         PartitionData invalidRecoveryRequest = newAlterPartition(
-            replicationControl, topicIdPartition, asList(0), LeaderRecoveryState.RECOVERING);
+            replicationControl, topicIdPartition, generateIsrWithTestDefaultEpoch(asList(0)), LeaderRecoveryState.RECOVERING);
         ControllerResult<AlterPartitionResponseData> invalidRecoveryResult = sendAlterPartition(
             replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
             topicIdPartition.topicId(), invalidRecoveryRequest);
         assertAlterPartitionResponse(invalidRecoveryResult, topicIdPartition, Errors.INVALID_REQUEST);
+
+        // Stale ISR broker epoch request.

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac merged pull request #13408: KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request.

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac merged PR #13408:
URL: https://github.com/apache/kafka/pull/13408


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13408: KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request.

Posted by "CalvinConfluent (via GitHub)" <gi...@apache.org>.
CalvinConfluent commented on code in PR #13408:
URL: https://github.com/apache/kafka/pull/13408#discussion_r1153870072


##########
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java:
##########
@@ -343,8 +348,9 @@ public void testFenceMultipleBrokers() throws Throwable {
         }
     }
 
-    @Test
-    public void testBalancePartitionLeaders() throws Throwable {
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)

Review Comment:
   reverted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13408: KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request.

Posted by "CalvinConfluent (via GitHub)" <gi...@apache.org>.
CalvinConfluent commented on code in PR #13408:
URL: https://github.com/apache/kafka/pull/13408#discussion_r1153870001


##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -283,7 +283,7 @@ class DefaultAlterPartitionManager(
         val partitionData = new AlterPartitionRequestData.PartitionData()
           .setPartitionIndex(item.topicIdPartition.partition)
           .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
-          .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+          .setNewIsrWithEpochs(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(item.leaderAndIsr.isr.map(Integer.valueOf).asJava))

Review Comment:
   I agree it is wasteful to have an intermediate collection of converting a scala list to a java list. I did not find a good way to modify the newIsrToSimpleNewIsrWithBrokerEpochs to accept a scala list of _Int_. So I just put 2 extra lines in this file.
   But anyway, in part 3, the newIsrToSimpleNewIsrWithBrokerEpochs will be replaced with real data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13408: KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request.

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13408:
URL: https://github.com/apache/kafka/pull/13408#discussion_r1148431780


##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -596,6 +596,11 @@ public void checkBrokerEpoch(int brokerId, long brokerEpoch) {
         }
     }
 
+    public long brokerEpoch(int brokerId) {
+        BrokerRegistration registration = brokerRegistrations.get(brokerId);
+        return registration == null ? -1 : registration.epoch();

Review Comment:
   Returning -1 when the broker is a bit risky in my opinion because we also use -1 in the request. It would be better to return null for instance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13408: KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request.

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13408:
URL: https://github.com/apache/kafka/pull/13408#discussion_r1148431864


##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1156,9 +1169,32 @@ private Errors validateAlterPartitionData(
             }
         }
 
+        List<BrokerEpochMismatchReplica> brokerEpochMismatchReplicas =
+            brokerEpochMismatchReplicasForIsr(partitionData.newIsrWithEpochs());
+        if (!brokerEpochMismatchReplicas.isEmpty()) {
+            log.info("Rejecting AlterPartition request from node {} for {}-{} because " +
+                    "it specified some invalid ISR members {} that the broker epochs mismatch.",
+                    brokerId, topic.name, partitionId, brokerEpochMismatchReplicas);
+
+            return INELIGIBLE_REPLICA;
+        }
+
         return Errors.NONE;
     }
 
+    private List<BrokerEpochMismatchReplica> brokerEpochMismatchReplicasForIsr(List<BrokerState> brokerStates) {

Review Comment:
   I wonder if we could combine this with ineligibleReplicasForIsr. Did you consider/try this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13408: KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request.

Posted by "CalvinConfluent (via GitHub)" <gi...@apache.org>.
CalvinConfluent commented on code in PR #13408:
URL: https://github.com/apache/kafka/pull/13408#discussion_r1148108087


##########
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java:
##########
@@ -449,7 +450,8 @@ public void testBalancePartitionLeaders() throws Throwable {
                 .setBrokerEpoch(brokerEpochs.get(partitionRegistration.leader));
             alterPartitionRequest.topics().add(topicData);
 
-            active.alterPartition(ANONYMOUS_CONTEXT, alterPartitionRequest).get();
+            short version = ANONYMOUS_CONTEXT.requestHeader().requestApiVersion();

Review Comment:
   Changed to test all AlterPartition versions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13408: KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request.

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13408:
URL: https://github.com/apache/kafka/pull/13408#discussion_r1148431107


##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1930,4 +1966,24 @@ public String toString() {
             return replicaId + " (" + reason + ")";
         }
     }
+
+
+    private static final class BrokerEpochMismatchReplica {

Review Comment:
   Could we reuse IneligibleReplica?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] junrao commented on a diff in pull request #13408: KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request.

Posted by "junrao (via GitHub)" <gi...@apache.org>.
junrao commented on code in PR #13408:
URL: https://github.com/apache/kafka/pull/13408#discussion_r1148021906


##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -596,6 +596,11 @@ public void checkBrokerEpoch(int brokerId, long brokerEpoch) {
         }
     }
 
+    public long getBrokerEpoch(int brokerId) {

Review Comment:
   We typically don't use getters. So this can just be `brokerEpoch`.



##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -2296,4 +2379,17 @@ public void testProcessBrokerHeartbeatInControlledShutdown(MetadataVersion metad
 
         assertEquals(expectedRecords, result.records());
     }
+
+    private static BrokerState getBrokerState(int brokerId, Long brokerEpoch) {

Review Comment:
   We typically don't use getters. So this can just be `brokerState`.



##########
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java:
##########
@@ -449,7 +450,8 @@ public void testBalancePartitionLeaders() throws Throwable {
                 .setBrokerEpoch(brokerEpochs.get(partitionRegistration.leader));
             alterPartitionRequest.topics().add(topicData);
 
-            active.alterPartition(ANONYMOUS_CONTEXT, alterPartitionRequest).get();
+            short version = ANONYMOUS_CONTEXT.requestHeader().requestApiVersion();

Review Comment:
   Hmm, is this right? ANONYMOUS_CONTEXT's requestApiVersion seems to be always 0?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13408: KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request.

Posted by "CalvinConfluent (via GitHub)" <gi...@apache.org>.
CalvinConfluent commented on code in PR #13408:
URL: https://github.com/apache/kafka/pull/13408#discussion_r1148848433


##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1930,4 +1966,24 @@ public String toString() {
             return replicaId + " (" + reason + ")";
         }
     }
+
+
+    private static final class BrokerEpochMismatchReplica {

Review Comment:
   Refactored.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13408: KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request.

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13408:
URL: https://github.com/apache/kafka/pull/13408#discussion_r1150489001


##########
core/src/main/scala/kafka/controller/KafkaController.scala:
##########
@@ -2324,12 +2324,18 @@ class KafkaController(val config: KafkaConfig,
 
         case Some(topicName) =>
           topicReq.partitions.forEach { partitionReq =>
+            var isr = List[Int]()

Review Comment:
   nit: Using a `var` is not required here. How about using `val isr = if (....) {`?



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1108,19 +1116,23 @@ private Errors validateAlterPartitionData(
 
             return INVALID_UPDATE_VERSION;
         }
-        int[] newIsr = Replicas.toArray(partitionData.newIsr());
+
+        int[] newIsr = partitionData.newIsrWithEpochs().stream()
+            .map(brokerState -> brokerState.brokerId()).collect(Collectors.toList())
+            .stream().mapToInt(Integer::intValue).toArray();

Review Comment:
   Do we really need the intermediate `.collect(Collectors.toList())`? If it is, it may be better to add another method to `Replicas`: `Replicas.toArray(partitionData.newIsrWithEpochs())`. There we could initialize an array with the right size and set the values.



##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -1608,6 +1622,75 @@ public void testAlterPartitionShouldRejectFencedBrokers(short version) throws Ex
             alterPartitionResult.response());
     }
 
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+    public void testAlterPartitionShouldRejectRebootBrokers(short version) throws Exception {

Review Comment:
   nit: `...ShouldRejectBrokersWithStaleEpoch`?



##########
clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java:
##########
@@ -77,6 +82,18 @@ public Builder(AlterPartitionRequestData data, boolean canUseTopicIds) {
 
         @Override
         public AlterPartitionRequest build(short version) {
+            if (version < 3) {
+                data.topics().forEach(topicData -> {
+                    topicData.partitions().forEach(partitionData -> {
+                        List<Integer> newIsr = new LinkedList<>();

Review Comment:
   nit: Should we use an ArrayList initialized with the correct size?



##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -1505,17 +1518,18 @@ public void testReassignPartitions(short version) throws Exception {
         log.info("running final alterPartition...");
         ControllerRequestContext requestContext =
             anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
-        ControllerResult<AlterPartitionResponseData> alterPartitionResult = replication.alterPartition(
-            requestContext,
-            new AlterPartitionRequestData().setBrokerId(3).setBrokerEpoch(103).
+        AlterPartitionRequestData alterPartitionRequestData = new AlterPartitionRequestData().setBrokerId(3).setBrokerEpoch(103).

Review Comment:
   nit: Could we put `setBrokerId` and `setBrokerEpoch` on new lines in oder to keep the format of the code consistent?



##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -1608,6 +1622,75 @@ public void testAlterPartitionShouldRejectFencedBrokers(short version) throws Ex
             alterPartitionResult.response());
     }
 
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+    public void testAlterPartitionShouldRejectRebootBrokers(short version) throws Exception {
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2, 3, 4);
+        ctx.unfenceBrokers(0, 1, 2, 3, 4);
+        Uuid fooId = ctx.createTestTopic(
+            "foo",
+            new int[][] {new int[] {1, 2, 3, 4}}
+        ).topicId();
+        ctx.alterPartition(new TopicIdPartition(fooId, 0), 1, generateIsrWithTestDefaultEpoch(asList(1, 2, 3)), LeaderRecoveryState.RECOVERED);
+
+        // First, the leader is constructing an AlterPartition request.
+        AlterPartitionRequestData alterIsrRequest = new AlterPartitionRequestData()
+            .setBrokerId(1)
+            .setBrokerEpoch(101)
+            .setTopics(asList(new TopicData()
+                .setTopicName(version <= 1 ? "foo" : "")
+                .setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID)
+                .setPartitions(asList(new PartitionData()
+                    .setPartitionIndex(0)
+                    .setPartitionEpoch(1)
+                    .setLeaderEpoch(1)
+                    .setNewIsrWithEpochs(generateIsrWithTestDefaultEpoch(asList(1, 2, 3, 4)))))));
+
+        // The broker 4 has failed silently and now registers again.
+        long newEpoch = generateTestDefaultBrokerEpoch(4) + 1000;
+        RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
+            setBrokerEpoch(newEpoch).setBrokerId(4).setRack(null);
+        brokerRecord.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint().
+            setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
+            setPort((short) 9092 + 4).
+            setName("PLAINTEXT").
+            setHost("localhost"));
+        ctx.replay(Collections.singletonList(new ApiMessageAndVersion(brokerRecord, (short) 0)));
+
+        // Unfence the broker 4.
+        ControllerResult<BrokerHeartbeatReply> result = ctx.replicationControl.
+            processBrokerHeartbeat(new BrokerHeartbeatRequestData().
+                setBrokerId(4).setBrokerEpoch(newEpoch).
+                setCurrentMetadataOffset(1).
+                setWantFence(false).setWantShutDown(false), 0);
+        assertEquals(new BrokerHeartbeatReply(true, false, false, false),
+            result.response());
+        ctx.replay(result.records());
+
+        ControllerRequestContext requestContext =
+            anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
+
+        ControllerResult<AlterPartitionResponseData> alterPartitionResult =
+            replication.alterPartition(requestContext, new AlterPartitionRequest.Builder(alterIsrRequest, version > 1).build(version).data());
+
+        // The late arrived AlterPartition request should be rejected when version >= 3.
+        if (version >= 3) {
+            assertEquals(
+                new AlterPartitionResponseData()
+                    .setTopics(asList(new AlterPartitionResponseData.TopicData()
+                        .setTopicName(version <= 1 ? "foo" : "")
+                        .setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID)
+                        .setPartitions(asList(new AlterPartitionResponseData.PartitionData()
+                            .setPartitionIndex(0)
+                            .setErrorCode(INELIGIBLE_REPLICA.code()))))),
+                alterPartitionResult.response());
+        } else {
+            assertEquals((short) 0, alterPartitionResult.response().errorCode());

Review Comment:
   nit: Could we use `NONE.code()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13408: KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request.

Posted by "CalvinConfluent (via GitHub)" <gi...@apache.org>.
CalvinConfluent commented on code in PR #13408:
URL: https://github.com/apache/kafka/pull/13408#discussion_r1153811968


##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -847,15 +849,15 @@ public void testShrinkAndExpandIsr() throws Exception {
         assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
         long brokerEpoch = ctx.currentBrokerEpoch(0);
         PartitionData shrinkIsrRequest = newAlterPartition(
-            replicationControl, topicIdPartition, asList(0, 1), LeaderRecoveryState.RECOVERED);
+            replicationControl, topicIdPartition, generateIsrWithTestDefaultEpoch(asList(0, 1)), LeaderRecoveryState.RECOVERED);

Review Comment:
   Change to use varargs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13408: KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request.

Posted by "CalvinConfluent (via GitHub)" <gi...@apache.org>.
CalvinConfluent commented on code in PR #13408:
URL: https://github.com/apache/kafka/pull/13408#discussion_r1148848632


##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -596,6 +596,11 @@ public void checkBrokerEpoch(int brokerId, long brokerEpoch) {
         }
     }
 
+    public long brokerEpoch(int brokerId) {
+        BrokerRegistration registration = brokerRegistrations.get(brokerId);
+        return registration == null ? -1 : registration.epoch();

Review Comment:
   Change made.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13408: KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request.

Posted by "CalvinConfluent (via GitHub)" <gi...@apache.org>.
CalvinConfluent commented on code in PR #13408:
URL: https://github.com/apache/kafka/pull/13408#discussion_r1148848920


##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -596,6 +596,11 @@ public void checkBrokerEpoch(int brokerId, long brokerEpoch) {
         }
     }
 
+    public long brokerEpoch(int brokerId) {
+        BrokerRegistration registration = brokerRegistrations.get(brokerId);
+        return registration == null ? -1 : registration.epoch();

Review Comment:
   Refactored. This change is reverted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13408: KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request.

Posted by "CalvinConfluent (via GitHub)" <gi...@apache.org>.
CalvinConfluent commented on code in PR #13408:
URL: https://github.com/apache/kafka/pull/13408#discussion_r1149550057


##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1159,16 +1171,20 @@ private Errors validateAlterPartitionData(
         return Errors.NONE;
     }
 
-    private List<IneligibleReplica> ineligibleReplicasForIsr(int[] replicas) {
+    private List<IneligibleReplica> ineligibleReplicasForIsr(List<BrokerState> brokerStates) {
         List<IneligibleReplica> ineligibleReplicas = new ArrayList<>(0);
-        for (Integer replicaId : replicas) {
-            BrokerRegistration registration = clusterControl.registration(replicaId);
+        for (BrokerState brokerState : brokerStates) {
+            int brokerId = brokerState.brokerId();
+            BrokerRegistration registration = clusterControl.registration(brokerId);
             if (registration == null) {
-                ineligibleReplicas.add(new IneligibleReplica(replicaId, "not registered"));
+                ineligibleReplicas.add(new IneligibleReplica(brokerId, "not registered"));
             } else if (registration.inControlledShutdown()) {
-                ineligibleReplicas.add(new IneligibleReplica(replicaId, "shutting down"));
+                ineligibleReplicas.add(new IneligibleReplica(brokerId, "shutting down"));
             } else if (registration.fenced()) {
-                ineligibleReplicas.add(new IneligibleReplica(replicaId, "fenced"));
+                ineligibleReplicas.add(new IneligibleReplica(brokerId, "fenced"));
+            } else if (brokerState.brokerEpoch() != -1 && registration.epoch() != brokerState.brokerEpoch()) {
+                ineligibleReplicas.add(new IneligibleReplica(brokerId,
+                    "broker epoch mismatch:" + brokerState.brokerEpoch() + " vs " + registration.epoch()));

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13408: KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request.

Posted by "CalvinConfluent (via GitHub)" <gi...@apache.org>.
CalvinConfluent commented on code in PR #13408:
URL: https://github.com/apache/kafka/pull/13408#discussion_r1150881334


##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1108,19 +1116,23 @@ private Errors validateAlterPartitionData(
 
             return INVALID_UPDATE_VERSION;
         }
-        int[] newIsr = Replicas.toArray(partitionData.newIsr());
+
+        int[] newIsr = partitionData.newIsrWithEpochs().stream()
+            .map(brokerState -> brokerState.brokerId()).collect(Collectors.toList())
+            .stream().mapToInt(Integer::intValue).toArray();

Review Comment:
   You are right, it is not needed. Thanks for pointing this out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] junrao commented on a diff in pull request #13408: KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request.

Posted by "junrao (via GitHub)" <gi...@apache.org>.
junrao commented on code in PR #13408:
URL: https://github.com/apache/kafka/pull/13408#discussion_r1149498020


##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1159,16 +1171,20 @@ private Errors validateAlterPartitionData(
         return Errors.NONE;
     }
 
-    private List<IneligibleReplica> ineligibleReplicasForIsr(int[] replicas) {
+    private List<IneligibleReplica> ineligibleReplicasForIsr(List<BrokerState> brokerStates) {
         List<IneligibleReplica> ineligibleReplicas = new ArrayList<>(0);
-        for (Integer replicaId : replicas) {
-            BrokerRegistration registration = clusterControl.registration(replicaId);
+        for (BrokerState brokerState : brokerStates) {
+            int brokerId = brokerState.brokerId();
+            BrokerRegistration registration = clusterControl.registration(brokerId);
             if (registration == null) {
-                ineligibleReplicas.add(new IneligibleReplica(replicaId, "not registered"));
+                ineligibleReplicas.add(new IneligibleReplica(brokerId, "not registered"));
             } else if (registration.inControlledShutdown()) {
-                ineligibleReplicas.add(new IneligibleReplica(replicaId, "shutting down"));
+                ineligibleReplicas.add(new IneligibleReplica(brokerId, "shutting down"));
             } else if (registration.fenced()) {
-                ineligibleReplicas.add(new IneligibleReplica(replicaId, "fenced"));
+                ineligibleReplicas.add(new IneligibleReplica(brokerId, "fenced"));
+            } else if (brokerState.brokerEpoch() != -1 && registration.epoch() != brokerState.brokerEpoch()) {
+                ineligibleReplicas.add(new IneligibleReplica(brokerId,
+                    "broker epoch mismatch:" + brokerState.brokerEpoch() + " vs " + registration.epoch()));

Review Comment:
   It would be better if we could make it clear what's the requested and the expected epoch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #13408: KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request.

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on PR #13408:
URL: https://github.com/apache/kafka/pull/13408#issuecomment-1491621526

   Failed tests are not related. Merging to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org