You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/02/04 03:37:41 UTC

[GitHub] [kafka] jsancio opened a new pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

jsancio opened a new pull request #11733:
URL: https://github.com/apache/kafka/pull/11733


   TODO:  Add a description for this PR!!
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r813163349



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
##########
@@ -110,6 +110,14 @@
     LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS),
     ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, true);
 
+    /**
+     * ALTER_ISR was the old name for ALTER_PARTITION.
+     *
+     * @deprecated since 3.2.0. Use {@link #ALTER_PARTITION} instead
+     */
+    @Deprecated

Review comment:
       I think I'd just get rid of it. Deprecating it sort of sends the wrong signal that it is a public API, when it is definitely not.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r814304074



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -760,44 +719,132 @@ BrokersToIsrs brokersToIsrs() {
                     }
                     if (change.leader() != request.brokerId() &&
                             change.leader() != NO_LEADER_CHANGE) {
-                        // Normally, an alterIsr request, which is made by the partition
+                        // Normally, an alterPartition request, which is made by the partition
                         // leader itself, is not allowed to modify the partition leader.
                         // However, if there is an ongoing partition reassignment and the
                         // ISR change completes it, then the leader may change as part of
                         // the changes made during reassignment cleanup.
                         //
                         // In this case, we report back FENCED_LEADER_EPOCH to the leader
-                        // which made the alterIsr request. This lets it know that it must
+                        // which made the alterPartition request. This lets it know that it must
                         // fetch new metadata before trying again. This return code is
                         // unusual because we both return an error and generate a new
                         // metadata record. We usually only do one or the other.
-                        log.info("AlterIsr request from node {} for {}-{} completed " +
+                        log.info("AlterPartition request from node {} for {}-{} completed " +
                             "the ongoing partition reassignment and triggered a " +
-                            "leadership change. Reutrning FENCED_LEADER_EPOCH.",
+                            "leadership change. Returning FENCED_LEADER_EPOCH.",
                             request.brokerId(), topic.name, partitionId);
-                        responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
+                        responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().
                             setPartitionIndex(partitionId).
                             setErrorCode(FENCED_LEADER_EPOCH.code()));
                         continue;
                     } else if (change.removingReplicas() != null ||
                             change.addingReplicas() != null) {
-                        log.info("AlterIsr request from node {} for {}-{} completed " +
+                        log.info("AlterPartition request from node {} for {}-{} completed " +
                             "the ongoing partition reassignment.", request.brokerId(),
                             topic.name, partitionId);
                     }
                 }
-                responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
+
+                responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().
                     setPartitionIndex(partitionId).
                     setErrorCode(result.code()).
                     setLeaderId(partition.leader).
+                    setIsr(Replicas.toList(partition.isr)).
+                    setLeaderRecoveryState(partition.leaderRecoveryState.value()).

Review comment:
       Hmm, it does not seem to be tagged.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r813998749



##########
File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##########
@@ -104,6 +107,14 @@ public PartitionChangeBuilder setTargetAdding(List<Integer> targetAdding) {
         return this;
     }
 
+    public PartitionChangeBuilder setTargetLeaderRecoveryState(LeaderRecoveryState targetLeaderRecoveryState) {
+        this.targetLeaderRecoveryState = targetLeaderRecoveryState;
+        return this;
+    }
+
+    // TODO: We need to make sure that the LeaderRecoveryState is not lost when the partition transitions from

Review comment:
       Added two tests:
   1. Show that going from online to offline to online preserves the leader recovery state
   2. Show that performing an unclean leader election set the leader recovery state to RECOVERING




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r812227025



##########
File path: metadata/src/main/java/org/apache/kafka/metadata/LeaderRecoveryState.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+public enum LeaderRecoveryState {
+    /**
+     * Represent that the election for the partition was either an ISR election or the
+     * leader recovered from an unclean leader election.
+     */
+    RECOVERED((byte) 0),
+
+    /**
+     * Represent that the election for the partition was an unclean leader election and
+     * that the leader is recovering from it.
+     */
+    RECOVERING((byte) 1);
+
+    /**
+     * A special value used to represent that the LeaderRecoveryState field of a
+     * PartitionChangeRecord didn't change.
+     */
+    private static final byte NO_CHANGE = (byte) -1;
+
+    private static final Map<Byte, LeaderRecoveryState> VALUE_TO_ENUM;

Review comment:
       Removed the map.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r812253132



##########
File path: core/src/test/scala/kafka/zk/TopicPartitionStateZNodeTest.scala
##########
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.zk
+
+import TopicPartitionStateZNode.decode
+import TopicPartitionStateZNode.encode
+import kafka.api.LeaderAndIsr
+import kafka.controller.LeaderIsrAndControllerEpoch
+import org.apache.kafka.metadata.LeaderRecoveryState
+import org.apache.zookeeper.data.Stat
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.mockito.Mockito.mock
+import org.mockito.Mockito.when
+
+final class TopicPartitionStateZNodeTest {
+
+  @Test
+  def testEncodeDecode(): Unit = {

Review comment:
       Yes. Added another test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r813999251



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -760,44 +719,132 @@ BrokersToIsrs brokersToIsrs() {
                     }
                     if (change.leader() != request.brokerId() &&
                             change.leader() != NO_LEADER_CHANGE) {
-                        // Normally, an alterIsr request, which is made by the partition
+                        // Normally, an alterPartition request, which is made by the partition
                         // leader itself, is not allowed to modify the partition leader.
                         // However, if there is an ongoing partition reassignment and the
                         // ISR change completes it, then the leader may change as part of
                         // the changes made during reassignment cleanup.
                         //
                         // In this case, we report back FENCED_LEADER_EPOCH to the leader
-                        // which made the alterIsr request. This lets it know that it must
+                        // which made the alterPartition request. This lets it know that it must
                         // fetch new metadata before trying again. This return code is
                         // unusual because we both return an error and generate a new
                         // metadata record. We usually only do one or the other.
-                        log.info("AlterIsr request from node {} for {}-{} completed " +
+                        log.info("AlterPartition request from node {} for {}-{} completed " +
                             "the ongoing partition reassignment and triggered a " +
-                            "leadership change. Reutrning FENCED_LEADER_EPOCH.",
+                            "leadership change. Returning FENCED_LEADER_EPOCH.",
                             request.brokerId(), topic.name, partitionId);
-                        responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
+                        responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().
                             setPartitionIndex(partitionId).
                             setErrorCode(FENCED_LEADER_EPOCH.code()));
                         continue;
                     } else if (change.removingReplicas() != null ||
                             change.addingReplicas() != null) {
-                        log.info("AlterIsr request from node {} for {}-{} completed " +
+                        log.info("AlterPartition request from node {} for {}-{} completed " +
                             "the ongoing partition reassignment.", request.brokerId(),
                             topic.name, partitionId);
                     }
                 }
-                responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
+
+                responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().
                     setPartitionIndex(partitionId).
                     setErrorCode(result.code()).
                     setLeaderId(partition.leader).
+                    setIsr(Replicas.toList(partition.isr)).
+                    setLeaderRecoveryState(partition.leaderRecoveryState.value()).
                     setLeaderEpoch(partition.leaderEpoch).
-                    setCurrentIsrVersion(partition.partitionEpoch).
-                    setIsr(Replicas.toList(partition.isr)));
+                    setPartitionEpoch(partition.partitionEpoch));
             }
         }
+
         return ControllerResult.of(records, response);
     }
 
+    /**
+     * Validate the partition information included in the alter partition request.
+     *
+     * @param brokerId id of the broker requesting the alter partition
+     * @param topic current topic information store by the replication manager
+     * @param partitionId partition id being altered
+     * @param partition current partition registration for the partition being altered
+     * @param partitionData partition data from the alter partition request
+     *
+     * @return Errors.NONE for valid alter partition data; otherwise the validation error
+     */
+    private Errors validateAlterPartitionData(
+        int brokerId,
+        TopicControlInfo topic,
+        int partitionId,
+        PartitionRegistration partition,
+        AlterPartitionRequestData.PartitionData partitionData
+    ) {
+        if (partition == null) {
+            log.info("Rejecting alterPartition request for unknown partition {}-{}.",

Review comment:
       Fixed throughout the file.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r813995003



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -760,44 +719,132 @@ BrokersToIsrs brokersToIsrs() {
                     }
                     if (change.leader() != request.brokerId() &&
                             change.leader() != NO_LEADER_CHANGE) {
-                        // Normally, an alterIsr request, which is made by the partition
+                        // Normally, an alterPartition request, which is made by the partition
                         // leader itself, is not allowed to modify the partition leader.
                         // However, if there is an ongoing partition reassignment and the
                         // ISR change completes it, then the leader may change as part of
                         // the changes made during reassignment cleanup.
                         //
                         // In this case, we report back FENCED_LEADER_EPOCH to the leader
-                        // which made the alterIsr request. This lets it know that it must
+                        // which made the alterPartition request. This lets it know that it must
                         // fetch new metadata before trying again. This return code is
                         // unusual because we both return an error and generate a new
                         // metadata record. We usually only do one or the other.
-                        log.info("AlterIsr request from node {} for {}-{} completed " +
+                        log.info("AlterPartition request from node {} for {}-{} completed " +
                             "the ongoing partition reassignment and triggered a " +
-                            "leadership change. Reutrning FENCED_LEADER_EPOCH.",
+                            "leadership change. Returning FENCED_LEADER_EPOCH.",
                             request.brokerId(), topic.name, partitionId);
-                        responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
+                        responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().
                             setPartitionIndex(partitionId).
                             setErrorCode(FENCED_LEADER_EPOCH.code()));
                         continue;
                     } else if (change.removingReplicas() != null ||
                             change.addingReplicas() != null) {
-                        log.info("AlterIsr request from node {} for {}-{} completed " +
+                        log.info("AlterPartition request from node {} for {}-{} completed " +
                             "the ongoing partition reassignment.", request.brokerId(),
                             topic.name, partitionId);
                     }
                 }
-                responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
+
+                responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().
                     setPartitionIndex(partitionId).
                     setErrorCode(result.code()).
                     setLeaderId(partition.leader).
+                    setIsr(Replicas.toList(partition.isr)).
+                    setLeaderRecoveryState(partition.leaderRecoveryState.value()).

Review comment:
       It is a tagged field so we can't. I think this is okay for the following reason:
   1. If the leader doesn't support this feature it will send version 0 of the AlterPartition request which will set the leader recovery state to RECOVERED
   2. The leader only change value if:
     a. a AlterPartition request is sent that increases the ISR
     b. another unclean leader election is performed which sets (keeps) the leader recovery state as RECOVERING
   
   Note that the broker to controller channel uses the ApiVersions response  to determine which version of AlterPartition to send.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r814964024



##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -2781,8 +2812,9 @@ case object IsrChangeNotification extends ControllerEvent {
   override def preempt(): Unit = {}
 }
 
-case class AlterIsrReceived(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr],
-                            callback: AlterIsrCallback) extends ControllerEvent {
+case class AlterPartitionReceived(
+  brokerId: Int, brokerEpoch: Long, partitionssToAlter: Map[TopicPartition, LeaderAndIsr], callback: AlterPartitionCallback

Review comment:
       Fix.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r805916080



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
##########
@@ -110,6 +110,14 @@
     LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS),
     ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, true);
 
+    /**
+     * ALTER_ISR was the old name for ALTER_PARTITION.
+     *
+     * @deprecated since 3.2.0. Use {@link #ALTER_PARTITION} instead
+     */
+    @Deprecated

Review comment:
       Okay. I wasn't sure if the symbols in `common/protocol/ApiKeys.java` were public. I am okay keeping it as it is a light way of keeping backward compatibility just in case external code is using this `enum`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r814407849



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -760,44 +719,132 @@ BrokersToIsrs brokersToIsrs() {
                     }
                     if (change.leader() != request.brokerId() &&
                             change.leader() != NO_LEADER_CHANGE) {
-                        // Normally, an alterIsr request, which is made by the partition
+                        // Normally, an alterPartition request, which is made by the partition
                         // leader itself, is not allowed to modify the partition leader.
                         // However, if there is an ongoing partition reassignment and the
                         // ISR change completes it, then the leader may change as part of
                         // the changes made during reassignment cleanup.
                         //
                         // In this case, we report back FENCED_LEADER_EPOCH to the leader
-                        // which made the alterIsr request. This lets it know that it must
+                        // which made the alterPartition request. This lets it know that it must
                         // fetch new metadata before trying again. This return code is
                         // unusual because we both return an error and generate a new
                         // metadata record. We usually only do one or the other.
-                        log.info("AlterIsr request from node {} for {}-{} completed " +
+                        log.info("AlterPartition request from node {} for {}-{} completed " +
                             "the ongoing partition reassignment and triggered a " +
-                            "leadership change. Reutrning FENCED_LEADER_EPOCH.",
+                            "leadership change. Returning FENCED_LEADER_EPOCH.",
                             request.brokerId(), topic.name, partitionId);
-                        responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
+                        responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().
                             setPartitionIndex(partitionId).
                             setErrorCode(FENCED_LEADER_EPOCH.code()));
                         continue;
                     } else if (change.removingReplicas() != null ||
                             change.addingReplicas() != null) {
-                        log.info("AlterIsr request from node {} for {}-{} completed " +
+                        log.info("AlterPartition request from node {} for {}-{} completed " +
                             "the ongoing partition reassignment.", request.brokerId(),
                             topic.name, partitionId);
                     }
                 }
-                responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
+
+                responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().
                     setPartitionIndex(partitionId).
                     setErrorCode(result.code()).
                     setLeaderId(partition.leader).
+                    setIsr(Replicas.toList(partition.isr)).
+                    setLeaderRecoveryState(partition.leaderRecoveryState.value()).

Review comment:
       Yes. You are correct, The `LeaderRecoveryState` field for `AlterPartitionResponse` is not a tagged field.
   
   The argument for why this is safe is the same as for `KafkaController`. In the success case the `LeaderRecoveryState` in the response is the same as the request. For version 0, this is the default, `RECOVERED`. If the AlterPartition request fails then both controller implementation set the `ErrorCode` field in the response and leave the rest of the fields in their default value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r814423091



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -542,7 +551,8 @@ class Partition(val topicPartition: TopicPartition,
         assignment = partitionState.replicas.asScala.map(_.toInt),
         isr = isr,
         addingReplicas = addingReplicas,
-        removingReplicas = removingReplicas
+        removingReplicas = removingReplicas,
+        LeaderRecoveryState.RECOVERED

Review comment:
       I'll log a message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#issuecomment-1050557349


   In regard to this [comment](https://github.com/apache/kafka/pull/11733#discussion_r814422959):
   
   > Unclean leader election is not possible when there is only one replica since the ISR set always equals the replica set. That means that if the replica is online then it will be elected as part of regular election and if it is offline the unclean leader election is a noop.
   
   Hmm, suppose we have a partition with replicas=(0) and isr=(0), but 0 is offline. The user chooses to reassign to node 1 and perform an unclean election to make it the leader. That would allow the reassignment to complete, but the partition state would be RECOVERING, right? How do we get out of that state?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r814961954



##########
File path: core/src/main/scala/kafka/controller/Election.scala
##########
@@ -40,7 +40,14 @@ object Election {
         val newLeaderAndIsrOpt = leaderOpt.map { leader =>
           val newIsr = if (isr.contains(leader)) isr.filter(replica => controllerContext.isReplicaOnline(replica, partition))
           else List(leader)
-          leaderAndIsr.newLeaderAndIsr(leader, newIsr)
+
+          if (!isr.contains(leader)) {
+            // The new leader is not in the old ISR so mark the partition a RECOVERING
+            leaderAndIsr.newRecoveringLeaderAndIsr(leader, newIsr)
+          } else {
+            // Elect a new leader but keep the previous leader recovery state

Review comment:
       Yes. The case that I had in mind is:
   1. Leader is elected using unclean leader election. E.g. leader: 1, recoveryState: RECOVERING
   2. Leader never sends AlterPartition and goes offline. E.g. leader: -1, recoveryState: RECOVERING
   3. Only ISR member (id 1) comes back online. E.g. leader:1, recoveryState: RECOVERING




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r813998423



##########
File path: core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
##########
@@ -2015,6 +2067,8 @@ class PartitionTest extends AbstractPartitionTest {
     verify(spyConfigRepository, times(2)).topicConfig(topicPartition.topic())
   }
 
+  // TODO: Add a test that shows that the follower rejects reads until a recovered leader and isr

Review comment:
       Done. Added this test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#issuecomment-1051066928


   > For this [comment](https://github.com/apache/kafka/pull/11733#discussion_r812044247):
   > 
   > > If the version is `0` then this is guaranteed to be the default value `0` so the serialization will succeed. This is true because we only write these values in the response when the operation success. If the operation fails then we skip writing these values and instead just write the error code.
   > 
   > This is a bit subjective, so feel free to disregard, but I do feel like some of the implicit assumptions might be causing some unnecessary obscurity. This is one case where a version check might actually be clearer and prevent the need for the extra comment.
   > 
   > A second case is implicitly setting RECOVERED in `PendingPartitionChange`. I had a comment about this [here](https://github.com/apache/kafka/pull/11733/files#r805005385), which might have been missed. This is fine at the moment because the current patch does not do any actual recovery operation, but I think we should reconsider it when we do. Otherwise I do think it's easy to overlook the implication when making other partition state changes.
   
   Okay. As you suggested, I marked the LeaderRecoveryState field for the AlterParititon response as ignorable. This has the same behavior we want but implemented in the serialization layer instead of in the controller logic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#issuecomment-1051077042


   @hachikuji I pushed a commit so that both controller implementations only persist the leader recovery state if the cluster supports leader recovery state (IBP is greater than 3.2). I am currently working on the tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r812227886



##########
File path: clients/src/main/resources/common/message/AlterPartitionRequest.json
##########
@@ -34,9 +34,11 @@
         { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
           "about": "The leader epoch of this partition" },
         { "name": "NewIsr", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
-          "about": "The ISR for this partition"},
-        { "name": "CurrentIsrVersion", "type": "int32", "versions": "0+",
-          "about": "The expected version of ISR which is being updated"}
+          "about": "The ISR for this partition" },
+        { "name": "LeaderRecoveryState", "type": "int8", "versions": "1+", "default": "0",
+          "about": "1 if the partition is recovering from an unclean leader election; 0 otherwise." },
+        { "name": "PartitionEpoch", "type": "int32", "versions": "0+",
+          "about": "The expected epoch of the partition which is being updated" }

Review comment:
       Fixed both the request and the response.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r825493909



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -147,56 +147,65 @@ sealed trait IsrState {
   def maximalIsr: Set[Int]
 
   /**
-   * Indicates if we have an AlterIsr request inflight.
+   * The leader recovery state. See the description for LeaderRecoveryState for details on the different values.
+   */
+  def leaderRecoveryState: LeaderRecoveryState
+
+  /**
+   * Indicates if we have an AlterPartition request inflight.
    */
   def isInflight: Boolean
 }
 
-sealed trait PendingIsrChange extends IsrState {
+sealed trait PendingPartitionChange extends PartitionState {
   def sentLeaderAndIsr: LeaderAndIsr
+
+  override val leaderRecoveryState: LeaderRecoveryState = LeaderRecoveryState.RECOVERED

Review comment:
       I added `leaderRecoveryState` to the `toString` implementation.
   
   Regarding your other comment, I'll address it in the next PR when I implement https://issues.apache.org/jira/browse/KAFKA-13696




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio merged pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio merged pull request #11733:
URL: https://github.com/apache/kafka/pull/11733


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r814424904



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1080,6 +1092,9 @@ class Partition(val topicPartition: TopicPartition,
     // decide whether to only fetch from leader
     val localLog = localLogWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader)
 
+    // Check that the partition leader is recovering from an unclean leader election.
+    validateLeaderRecoveryState()

Review comment:
       I didn't implement this now because only the leader can accept PRODUCE requests and the leader recovers immediately. In this implementation only the follower needs to wait for the leader to recover so it can accept FETCH requests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r805920416



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -250,23 +260,35 @@ class DefaultAlterIsrManager(
         val partitionResponses: mutable.Map[TopicPartition, Either[Errors, LeaderAndIsr]] =
           new mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()
         data.topics.forEach { topic =>
-          topic.partitions().forEach(partition => {
+          topic.partitions().forEach { partition =>
             val tp = new TopicPartition(topic.name, partition.partitionIndex)
-            val error = Errors.forCode(partition.errorCode())
+            val apiError = Errors.forCode(partition.errorCode())
             debug(s"Controller successfully handled AlterIsr request for $tp: $partition")
-            if (error == Errors.NONE) {
-              val newLeaderAndIsr = new LeaderAndIsr(partition.leaderId, partition.leaderEpoch,
-                partition.isr.asScala.toList.map(_.toInt), partition.currentIsrVersion)
-              partitionResponses(tp) = Right(newLeaderAndIsr)
+            if (apiError == Errors.NONE) {
+              try {
+                partitionResponses(tp) = Right(
+                  LeaderAndIsr(
+                    partition.leaderId,
+                    partition.leaderEpoch,
+                    partition.isr.asScala.toList.map(_.toInt),
+                    LeaderRecoveryState.of(partition.leaderRecoveryState),
+                    partition.partitionEpoch
+                  )
+                )
+              } catch {
+                case e: IllegalArgumentException =>

Review comment:
       Yeah. I went back and forth of this. There are some code paths were we want to throw an exception as in most cases this means a bug in Kafka. I think this was the only case where we want to handle this exception. I'll take a look again.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r812228372



##########
File path: checkstyle/suppressions.xml
##########
@@ -276,6 +276,8 @@
               files="(QuorumController|QuorumControllerTest|ReplicationControlManager|ReplicationControlManagerTest).java"/>
     <suppress checks="ClassFanOutComplexity"
               files="(QuorumController|ReplicationControlManager|ReplicationControlManagerTest).java"/>
+    <suppress checks="MethodLength"

Review comment:
       Remove this suppression since I fixed the alterPartition method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r813161823



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -250,23 +260,35 @@ class DefaultAlterIsrManager(
         val partitionResponses: mutable.Map[TopicPartition, Either[Errors, LeaderAndIsr]] =
           new mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()
         data.topics.forEach { topic =>
-          topic.partitions().forEach(partition => {
+          topic.partitions().forEach { partition =>
             val tp = new TopicPartition(topic.name, partition.partitionIndex)
-            val error = Errors.forCode(partition.errorCode())
+            val apiError = Errors.forCode(partition.errorCode())
             debug(s"Controller successfully handled AlterIsr request for $tp: $partition")
-            if (error == Errors.NONE) {
-              val newLeaderAndIsr = new LeaderAndIsr(partition.leaderId, partition.leaderEpoch,
-                partition.isr.asScala.toList.map(_.toInt), partition.currentIsrVersion)
-              partitionResponses(tp) = Right(newLeaderAndIsr)
+            if (apiError == Errors.NONE) {
+              try {
+                partitionResponses(tp) = Right(
+                  LeaderAndIsr(
+                    partition.leaderId,
+                    partition.leaderEpoch,
+                    partition.isr.asScala.toList.map(_.toInt),
+                    LeaderRecoveryState.of(partition.leaderRecoveryState),
+                    partition.partitionEpoch
+                  )
+                )
+              } catch {
+                case e: IllegalArgumentException =>

Review comment:
       Yeah, I think the main thing I didn't like is that `IllegalArgumentException` is such a generic error. So using it to infer a particular case seemed a little slippery.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r813996872



##########
File path: core/src/test/scala/kafka/zk/TopicPartitionStateZNodeTest.scala
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.zk
+
+import TopicPartitionStateZNode.decode
+import TopicPartitionStateZNode.encode
+import kafka.api.LeaderAndIsr
+import kafka.controller.LeaderIsrAndControllerEpoch
+import org.apache.kafka.metadata.LeaderRecoveryState
+import org.apache.zookeeper.data.Stat
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.mockito.Mockito.mock
+import org.mockito.Mockito.when
+
+final class TopicPartitionStateZNodeTest {
+
+  @Test
+  def testEncodeDecodeRecovering(): Unit = {
+    val zkVersion = 5
+    val stat = mock(classOf[Stat])
+    when(stat.getVersion).thenReturn(zkVersion)
+
+    val expected = LeaderIsrAndControllerEpoch(LeaderAndIsr(1, 6, List(1), LeaderRecoveryState.RECOVERING, zkVersion), 10)
+
+    assertEquals(Some(expected), decode(encode(expected), stat))
+  }
+
+  @Test
+  def testEncodeDecodeRecovered(): Unit = {

Review comment:
       We were covering it implicitly. Added another test case that shows the behavior explicitly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#issuecomment-1071229122


   > The PR looks good overall. I think there is one problem with the fetch validation. We are expecting that followers will detect the RECOVERED state through a `LeaderAndIsr` request from the controller. However, leaders/followers only accept`LeaderAndIsr` requests if there is an epoch bump, and that does not happen for `AlterPartition` requests. For KRaft, I think it is not a problem.
   
   You are correct. I removed the FETCH request validation looking at the leader recovery state and file this issue: https://issues.apache.org/jira/browse/KAFKA-13754
   
   This is okay because at the moment the topic partition leader immediately marks the partition as recovered.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r825494849



##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -2263,37 +2271,43 @@ class KafkaController(val config: KafkaConfig,
             .groupBy { case (tp, _) => tp.topic }   // Group by topic
             .foreach { case (topic, partitions) =>
               // Add each topic part to the response
-              val topicResp = new AlterIsrResponseData.TopicData()
+              val topicResp = new AlterPartitionResponseData.TopicData()
                 .setName(topic)
                 .setPartitions(new util.ArrayList())
               resp.topics.add(topicResp)
               partitions.foreach { case (tp, errorOrIsr) =>
                 // Add each partition part to the response (new ISR or error)
                 errorOrIsr match {
                   case Left(error) => topicResp.partitions.add(
-                    new AlterIsrResponseData.PartitionData()
+                    new AlterPartitionResponseData.PartitionData()
                       .setPartitionIndex(tp.partition)
                       .setErrorCode(error.code))
                   case Right(leaderAndIsr) => topicResp.partitions.add(
-                    new AlterIsrResponseData.PartitionData()
+                    new AlterPartitionResponseData.PartitionData()
                       .setPartitionIndex(tp.partition)
                       .setLeaderId(leaderAndIsr.leader)
                       .setLeaderEpoch(leaderAndIsr.leaderEpoch)
                       .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
-                      .setCurrentIsrVersion(leaderAndIsr.zkVersion))
+                      .setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)

Review comment:
       Based on this discussion https://github.com/apache/kafka/pull/11733#issuecomment-1051066928, I decided to mark this field as ignorable in the response.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r813997211



##########
File path: core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
##########
@@ -1077,6 +1078,58 @@ class PartitionTest extends AbstractPartitionTest {
 
   }
 
+  @Test
+  def testInvalidAlterPartitionAreNotRetried(): Unit = {
+    val log = logManager.getOrCreateLog(topicPartition, topicId = None)
+    seedLogData(log, numRecords = 10, leaderEpoch = 4)
+
+    val controllerEpoch = 0
+    val leaderEpoch = 5
+    val remoteBrokerId = brokerId + 1
+    val replicas = List[Integer](brokerId, remoteBrokerId).asJava
+    val isr = List[Integer](brokerId).asJava
+
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
+    assertTrue(partition.makeLeader(
+        new LeaderAndIsrPartitionState()
+          .setControllerEpoch(controllerEpoch)
+          .setLeader(brokerId)
+          .setLeaderEpoch(leaderEpoch)
+          .setIsr(isr)
+          .setZkVersion(1)
+          .setReplicas(replicas)
+          .setIsNew(true),
+        offsetCheckpoints, None), "Expected become leader transition to succeed")
+    assertEquals(Set(brokerId), partition.partitionState.isr)
+
+    val remoteReplica = partition.getReplica(remoteBrokerId).get
+    assertEquals(LogOffsetMetadata.UnknownOffsetMetadata.messageOffset, remoteReplica.logEndOffset)
+    assertEquals(UnifiedLog.UnknownOffset, remoteReplica.logStartOffset)
+
+    partition.updateFollowerFetchState(remoteBrokerId,
+      followerFetchOffsetMetadata = LogOffsetMetadata(10),
+      followerStartOffset = 0L,
+      followerFetchTimeMs = time.milliseconds(),
+      leaderEndOffset = 10L)
+
+    // Check that the isr didn't change and alter update is scheduled

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r813998423



##########
File path: core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
##########
@@ -2015,6 +2067,8 @@ class PartitionTest extends AbstractPartitionTest {
     verify(spyConfigRepository, times(2)).topicConfig(topicPartition.topic())
   }
 
+  // TODO: Add a test that shows that the follower rejects reads until a recovered leader and isr

Review comment:
       Added two tests:
   1. Show that going from online to offline to online preserves the leader recovery state
   2. Show that performing an unclean leader election set the leader recovery state to RECOVERING




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r813988441



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -760,44 +719,132 @@ BrokersToIsrs brokersToIsrs() {
                     }
                     if (change.leader() != request.brokerId() &&
                             change.leader() != NO_LEADER_CHANGE) {
-                        // Normally, an alterIsr request, which is made by the partition
+                        // Normally, an alterPartition request, which is made by the partition
                         // leader itself, is not allowed to modify the partition leader.
                         // However, if there is an ongoing partition reassignment and the
                         // ISR change completes it, then the leader may change as part of
                         // the changes made during reassignment cleanup.
                         //
                         // In this case, we report back FENCED_LEADER_EPOCH to the leader
-                        // which made the alterIsr request. This lets it know that it must
+                        // which made the alterPartition request. This lets it know that it must
                         // fetch new metadata before trying again. This return code is
                         // unusual because we both return an error and generate a new
                         // metadata record. We usually only do one or the other.
-                        log.info("AlterIsr request from node {} for {}-{} completed " +
+                        log.info("AlterPartition request from node {} for {}-{} completed " +
                             "the ongoing partition reassignment and triggered a " +
-                            "leadership change. Reutrning FENCED_LEADER_EPOCH.",
+                            "leadership change. Returning FENCED_LEADER_EPOCH.",
                             request.brokerId(), topic.name, partitionId);
-                        responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
+                        responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().
                             setPartitionIndex(partitionId).
                             setErrorCode(FENCED_LEADER_EPOCH.code()));
                         continue;
                     } else if (change.removingReplicas() != null ||
                             change.addingReplicas() != null) {
-                        log.info("AlterIsr request from node {} for {}-{} completed " +
+                        log.info("AlterPartition request from node {} for {}-{} completed " +
                             "the ongoing partition reassignment.", request.brokerId(),
                             topic.name, partitionId);
                     }
                 }
-                responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
+
+                responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().
                     setPartitionIndex(partitionId).
                     setErrorCode(result.code()).
                     setLeaderId(partition.leader).
+                    setIsr(Replicas.toList(partition.isr)).
+                    setLeaderRecoveryState(partition.leaderRecoveryState.value()).
                     setLeaderEpoch(partition.leaderEpoch).
-                    setCurrentIsrVersion(partition.partitionEpoch).
-                    setIsr(Replicas.toList(partition.isr)));
+                    setPartitionEpoch(partition.partitionEpoch));
             }
         }
+
         return ControllerResult.of(records, response);
     }
 
+    /**
+     * Validate the partition information included in the alter partition request.
+     *
+     * @param brokerId id of the broker requesting the alter partition
+     * @param topic current topic information store by the replication manager
+     * @param partitionId partition id being altered
+     * @param partition current partition registration for the partition being altered
+     * @param partitionData partition data from the alter partition request
+     *
+     * @return Errors.NONE for valid alter partition data; otherwise the validation error
+     */
+    private Errors validateAlterPartitionData(
+        int brokerId,
+        TopicControlInfo topic,
+        int partitionId,
+        PartitionRegistration partition,
+        AlterPartitionRequestData.PartitionData partitionData
+    ) {
+        if (partition == null) {
+            log.info("Rejecting alterPartition request for unknown partition {}-{}.",
+                    topic.name, partitionId);
+
+            return UNKNOWN_TOPIC_OR_PARTITION;
+        }
+        if (partitionData.leaderEpoch() != partition.leaderEpoch) {

Review comment:
       Yeah. Maybe a INVALID_REQUEST is better. It looks like the legacy controller just processes the request if the leader has a higher epoch then what the controller knows.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r812227573



##########
File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -1305,33 +1305,33 @@ private DescribeUserScramCredentialsResponse createDescribeUserScramCredentialsR
         return new DescribeUserScramCredentialsResponse(data);
     }
 
-    private AlterIsrRequest createAlterIsrRequest(short version) {
-        AlterIsrRequestData data = new AlterIsrRequestData()
+    private AlterPartitionRequest createAlterPartitionRequest(short version) {

Review comment:
       Updated the test to take into count the version passed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r814089014



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
##########
@@ -110,6 +110,14 @@
     LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS),
     ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, true);
 
+    /**
+     * ALTER_ISR was the old name for ALTER_PARTITION.
+     *
+     * @deprecated since 3.2.0. Use {@link #ALTER_PARTITION} instead
+     */
+    @Deprecated

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r814963257



##########
File path: core/src/main/scala/kafka/controller/Election.scala
##########
@@ -53,17 +60,17 @@ object Election {
    * Elect leaders for new or offline partitions.
    *
    * @param controllerContext Context with the current state of the cluster
-   * @param partitionsWithUncleanLeaderElectionState A sequence of tuples representing the partitions
+   * @param partitionsWithUncleanLeaderLeaderRecoveryState A sequence of tuples representing the partitions

Review comment:
       Yes. Fixed. I think I did a search and replace at some point.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r814422959



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -542,7 +551,8 @@ class Partition(val topicPartition: TopicPartition,
         assignment = partitionState.replicas.asScala.map(_.toInt),
         isr = isr,
         addingReplicas = addingReplicas,
-        removingReplicas = removingReplicas
+        removingReplicas = removingReplicas,
+        LeaderRecoveryState.RECOVERED

Review comment:
       Yes and no. Unclean leader election is not possible when there is only one replica since the ISR set always equals the replica set. That means that if the replica is online then it will be elected as part of regular election and if it is offline the unclean leader election is a noop. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#issuecomment-1071229122


   > The PR looks good overall. I think there is one problem with the fetch validation. We are expecting that followers will detect the RECOVERED state through a `LeaderAndIsr` request from the controller. However, leaders/followers only accept`LeaderAndIsr` requests if there is an epoch bump, and that does not happen for `AlterPartition` requests. For KRaft, I think it is not a problem.
   
   You are correct. I removed the FETCH request validation looking at the leader recovery state and file this issue: https://issues.apache.org/jira/browse/KAFKA-13754
   
   This is okay because at the moment the topic partition leader immediately marks the partition as recovered.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r812044247



##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -2263,37 +2271,43 @@ class KafkaController(val config: KafkaConfig,
             .groupBy { case (tp, _) => tp.topic }   // Group by topic
             .foreach { case (topic, partitions) =>
               // Add each topic part to the response
-              val topicResp = new AlterIsrResponseData.TopicData()
+              val topicResp = new AlterPartitionResponseData.TopicData()
                 .setName(topic)
                 .setPartitions(new util.ArrayList())
               resp.topics.add(topicResp)
               partitions.foreach { case (tp, errorOrIsr) =>
                 // Add each partition part to the response (new ISR or error)
                 errorOrIsr match {
                   case Left(error) => topicResp.partitions.add(
-                    new AlterIsrResponseData.PartitionData()
+                    new AlterPartitionResponseData.PartitionData()
                       .setPartitionIndex(tp.partition)
                       .setErrorCode(error.code))
                   case Right(leaderAndIsr) => topicResp.partitions.add(
-                    new AlterIsrResponseData.PartitionData()
+                    new AlterPartitionResponseData.PartitionData()
                       .setPartitionIndex(tp.partition)
                       .setLeaderId(leaderAndIsr.leader)
                       .setLeaderEpoch(leaderAndIsr.leaderEpoch)
                       .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
-                      .setCurrentIsrVersion(leaderAndIsr.zkVersion))
+                      .setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)

Review comment:
       It maybe a little subtle but I don't think we need to. If the version is `0` then this is guaranteed to be the default value `0` so the serialization will succeed. This is true because we only write these values in the response when the operation success. If the operation fails then we skip writing these values and instead just write the error code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r812101522



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -250,23 +260,35 @@ class DefaultAlterIsrManager(
         val partitionResponses: mutable.Map[TopicPartition, Either[Errors, LeaderAndIsr]] =
           new mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()
         data.topics.forEach { topic =>
-          topic.partitions().forEach(partition => {
+          topic.partitions().forEach { partition =>
             val tp = new TopicPartition(topic.name, partition.partitionIndex)
-            val error = Errors.forCode(partition.errorCode())
+            val apiError = Errors.forCode(partition.errorCode())
             debug(s"Controller successfully handled AlterIsr request for $tp: $partition")
-            if (error == Errors.NONE) {
-              val newLeaderAndIsr = new LeaderAndIsr(partition.leaderId, partition.leaderEpoch,
-                partition.isr.asScala.toList.map(_.toInt), partition.currentIsrVersion)
-              partitionResponses(tp) = Right(newLeaderAndIsr)
+            if (apiError == Errors.NONE) {
+              try {
+                partitionResponses(tp) = Right(
+                  LeaderAndIsr(
+                    partition.leaderId,
+                    partition.leaderEpoch,
+                    partition.isr.asScala.toList.map(_.toInt),
+                    LeaderRecoveryState.of(partition.leaderRecoveryState),
+                    partition.partitionEpoch
+                  )
+                )
+              } catch {
+                case e: IllegalArgumentException =>

Review comment:
       Okay. We have both: `of` which throws and `optionalOf` which returns an `Optional`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r813188764



##########
File path: core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
##########
@@ -1077,6 +1078,58 @@ class PartitionTest extends AbstractPartitionTest {
 
   }
 
+  @Test
+  def testInvalidAlterPartitionAreNotRetried(): Unit = {
+    val log = logManager.getOrCreateLog(topicPartition, topicId = None)
+    seedLogData(log, numRecords = 10, leaderEpoch = 4)
+
+    val controllerEpoch = 0
+    val leaderEpoch = 5
+    val remoteBrokerId = brokerId + 1
+    val replicas = List[Integer](brokerId, remoteBrokerId).asJava
+    val isr = List[Integer](brokerId).asJava
+
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
+    assertTrue(partition.makeLeader(
+        new LeaderAndIsrPartitionState()
+          .setControllerEpoch(controllerEpoch)
+          .setLeader(brokerId)
+          .setLeaderEpoch(leaderEpoch)
+          .setIsr(isr)
+          .setZkVersion(1)
+          .setReplicas(replicas)
+          .setIsNew(true),
+        offsetCheckpoints, None), "Expected become leader transition to succeed")
+    assertEquals(Set(brokerId), partition.partitionState.isr)
+
+    val remoteReplica = partition.getReplica(remoteBrokerId).get
+    assertEquals(LogOffsetMetadata.UnknownOffsetMetadata.messageOffset, remoteReplica.logEndOffset)
+    assertEquals(UnifiedLog.UnknownOffset, remoteReplica.logStartOffset)
+
+    partition.updateFollowerFetchState(remoteBrokerId,
+      followerFetchOffsetMetadata = LogOffsetMetadata(10),
+      followerStartOffset = 0L,
+      followerFetchTimeMs = time.milliseconds(),
+      leaderEndOffset = 10L)
+
+    // Check that the isr didn't change and alter update is scheduled

Review comment:
       Could we assert something about the inflight ISR?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##########
@@ -104,6 +107,14 @@ public PartitionChangeBuilder setTargetAdding(List<Integer> targetAdding) {
         return this;
     }
 
+    public PartitionChangeBuilder setTargetLeaderRecoveryState(LeaderRecoveryState targetLeaderRecoveryState) {
+        this.targetLeaderRecoveryState = targetLeaderRecoveryState;
+        return this;
+    }
+
+    // TODO: We need to make sure that the LeaderRecoveryState is not lost when the partition transitions from

Review comment:
       TODO here

##########
File path: core/src/test/scala/kafka/zk/TopicPartitionStateZNodeTest.scala
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.zk
+
+import TopicPartitionStateZNode.decode
+import TopicPartitionStateZNode.encode
+import kafka.api.LeaderAndIsr
+import kafka.controller.LeaderIsrAndControllerEpoch
+import org.apache.kafka.metadata.LeaderRecoveryState
+import org.apache.zookeeper.data.Stat
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.mockito.Mockito.mock
+import org.mockito.Mockito.when
+
+final class TopicPartitionStateZNodeTest {
+
+  @Test
+  def testEncodeDecodeRecovering(): Unit = {
+    val zkVersion = 5
+    val stat = mock(classOf[Stat])
+    when(stat.getVersion).thenReturn(zkVersion)
+
+    val expected = LeaderIsrAndControllerEpoch(LeaderAndIsr(1, 6, List(1), LeaderRecoveryState.RECOVERING, zkVersion), 10)
+
+    assertEquals(Some(expected), decode(encode(expected), stat))
+  }
+
+  @Test
+  def testEncodeDecodeRecovered(): Unit = {

Review comment:
       Do we have the case covered when we are decoding an old value without the leader recovery state field?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -760,44 +719,132 @@ BrokersToIsrs brokersToIsrs() {
                     }
                     if (change.leader() != request.brokerId() &&
                             change.leader() != NO_LEADER_CHANGE) {
-                        // Normally, an alterIsr request, which is made by the partition
+                        // Normally, an alterPartition request, which is made by the partition
                         // leader itself, is not allowed to modify the partition leader.
                         // However, if there is an ongoing partition reassignment and the
                         // ISR change completes it, then the leader may change as part of
                         // the changes made during reassignment cleanup.
                         //
                         // In this case, we report back FENCED_LEADER_EPOCH to the leader
-                        // which made the alterIsr request. This lets it know that it must
+                        // which made the alterPartition request. This lets it know that it must
                         // fetch new metadata before trying again. This return code is
                         // unusual because we both return an error and generate a new
                         // metadata record. We usually only do one or the other.
-                        log.info("AlterIsr request from node {} for {}-{} completed " +
+                        log.info("AlterPartition request from node {} for {}-{} completed " +
                             "the ongoing partition reassignment and triggered a " +
-                            "leadership change. Reutrning FENCED_LEADER_EPOCH.",
+                            "leadership change. Returning FENCED_LEADER_EPOCH.",
                             request.brokerId(), topic.name, partitionId);
-                        responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
+                        responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().
                             setPartitionIndex(partitionId).
                             setErrorCode(FENCED_LEADER_EPOCH.code()));
                         continue;
                     } else if (change.removingReplicas() != null ||
                             change.addingReplicas() != null) {
-                        log.info("AlterIsr request from node {} for {}-{} completed " +
+                        log.info("AlterPartition request from node {} for {}-{} completed " +
                             "the ongoing partition reassignment.", request.brokerId(),
                             topic.name, partitionId);
                     }
                 }
-                responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
+
+                responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().
                     setPartitionIndex(partitionId).
                     setErrorCode(result.code()).
                     setLeaderId(partition.leader).
+                    setIsr(Replicas.toList(partition.isr)).
+                    setLeaderRecoveryState(partition.leaderRecoveryState.value()).
                     setLeaderEpoch(partition.leaderEpoch).
-                    setCurrentIsrVersion(partition.partitionEpoch).
-                    setIsr(Replicas.toList(partition.isr)));
+                    setPartitionEpoch(partition.partitionEpoch));
             }
         }
+
         return ControllerResult.of(records, response);
     }
 
+    /**
+     * Validate the partition information included in the alter partition request.
+     *
+     * @param brokerId id of the broker requesting the alter partition
+     * @param topic current topic information store by the replication manager
+     * @param partitionId partition id being altered
+     * @param partition current partition registration for the partition being altered
+     * @param partitionData partition data from the alter partition request
+     *
+     * @return Errors.NONE for valid alter partition data; otherwise the validation error
+     */
+    private Errors validateAlterPartitionData(
+        int brokerId,
+        TopicControlInfo topic,
+        int partitionId,
+        PartitionRegistration partition,
+        AlterPartitionRequestData.PartitionData partitionData
+    ) {
+        if (partition == null) {
+            log.info("Rejecting alterPartition request for unknown partition {}-{}.",
+                    topic.name, partitionId);
+
+            return UNKNOWN_TOPIC_OR_PARTITION;
+        }
+        if (partitionData.leaderEpoch() != partition.leaderEpoch) {

Review comment:
       Not part of this patch, but I was wondering if FENCED_LEADER_EPOCH is the right error code when the request epoch is larger than the registration epoch. 

##########
File path: core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
##########
@@ -1077,6 +1078,58 @@ class PartitionTest extends AbstractPartitionTest {
 
   }
 
+  @Test
+  def testInvalidAlterPartitionAreNotRetried(): Unit = {

Review comment:
       nit: perhaps `testInvalidAlterPartitionRequestsAreNotRetried`?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -760,44 +719,132 @@ BrokersToIsrs brokersToIsrs() {
                     }
                     if (change.leader() != request.brokerId() &&
                             change.leader() != NO_LEADER_CHANGE) {
-                        // Normally, an alterIsr request, which is made by the partition
+                        // Normally, an alterPartition request, which is made by the partition
                         // leader itself, is not allowed to modify the partition leader.
                         // However, if there is an ongoing partition reassignment and the
                         // ISR change completes it, then the leader may change as part of
                         // the changes made during reassignment cleanup.
                         //
                         // In this case, we report back FENCED_LEADER_EPOCH to the leader
-                        // which made the alterIsr request. This lets it know that it must
+                        // which made the alterPartition request. This lets it know that it must
                         // fetch new metadata before trying again. This return code is
                         // unusual because we both return an error and generate a new
                         // metadata record. We usually only do one or the other.
-                        log.info("AlterIsr request from node {} for {}-{} completed " +
+                        log.info("AlterPartition request from node {} for {}-{} completed " +
                             "the ongoing partition reassignment and triggered a " +
-                            "leadership change. Reutrning FENCED_LEADER_EPOCH.",
+                            "leadership change. Returning FENCED_LEADER_EPOCH.",
                             request.brokerId(), topic.name, partitionId);
-                        responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
+                        responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().
                             setPartitionIndex(partitionId).
                             setErrorCode(FENCED_LEADER_EPOCH.code()));
                         continue;
                     } else if (change.removingReplicas() != null ||
                             change.addingReplicas() != null) {
-                        log.info("AlterIsr request from node {} for {}-{} completed " +
+                        log.info("AlterPartition request from node {} for {}-{} completed " +
                             "the ongoing partition reassignment.", request.brokerId(),
                             topic.name, partitionId);
                     }
                 }
-                responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
+
+                responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().
                     setPartitionIndex(partitionId).
                     setErrorCode(result.code()).
                     setLeaderId(partition.leader).
+                    setIsr(Replicas.toList(partition.isr)).
+                    setLeaderRecoveryState(partition.leaderRecoveryState.value()).

Review comment:
       Hmm, how do we guarantee that this field is supported?

##########
File path: core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
##########
@@ -2015,6 +2067,8 @@ class PartitionTest extends AbstractPartitionTest {
     verify(spyConfigRepository, times(2)).topicConfig(topicPartition.topic())
   }
 
+  // TODO: Add a test that shows that the follower rejects reads until a recovered leader and isr

Review comment:
       Don't forget about TODO

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -760,44 +719,132 @@ BrokersToIsrs brokersToIsrs() {
                     }
                     if (change.leader() != request.brokerId() &&
                             change.leader() != NO_LEADER_CHANGE) {
-                        // Normally, an alterIsr request, which is made by the partition
+                        // Normally, an alterPartition request, which is made by the partition
                         // leader itself, is not allowed to modify the partition leader.
                         // However, if there is an ongoing partition reassignment and the
                         // ISR change completes it, then the leader may change as part of
                         // the changes made during reassignment cleanup.
                         //
                         // In this case, we report back FENCED_LEADER_EPOCH to the leader
-                        // which made the alterIsr request. This lets it know that it must
+                        // which made the alterPartition request. This lets it know that it must
                         // fetch new metadata before trying again. This return code is
                         // unusual because we both return an error and generate a new
                         // metadata record. We usually only do one or the other.
-                        log.info("AlterIsr request from node {} for {}-{} completed " +
+                        log.info("AlterPartition request from node {} for {}-{} completed " +
                             "the ongoing partition reassignment and triggered a " +
-                            "leadership change. Reutrning FENCED_LEADER_EPOCH.",
+                            "leadership change. Returning FENCED_LEADER_EPOCH.",
                             request.brokerId(), topic.name, partitionId);
-                        responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
+                        responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().
                             setPartitionIndex(partitionId).
                             setErrorCode(FENCED_LEADER_EPOCH.code()));
                         continue;
                     } else if (change.removingReplicas() != null ||
                             change.addingReplicas() != null) {
-                        log.info("AlterIsr request from node {} for {}-{} completed " +
+                        log.info("AlterPartition request from node {} for {}-{} completed " +
                             "the ongoing partition reassignment.", request.brokerId(),
                             topic.name, partitionId);
                     }
                 }
-                responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
+
+                responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().
                     setPartitionIndex(partitionId).
                     setErrorCode(result.code()).
                     setLeaderId(partition.leader).
+                    setIsr(Replicas.toList(partition.isr)).
+                    setLeaderRecoveryState(partition.leaderRecoveryState.value()).
                     setLeaderEpoch(partition.leaderEpoch).
-                    setCurrentIsrVersion(partition.partitionEpoch).
-                    setIsr(Replicas.toList(partition.isr)));
+                    setPartitionEpoch(partition.partitionEpoch));
             }
         }
+
         return ControllerResult.of(records, response);
     }
 
+    /**
+     * Validate the partition information included in the alter partition request.
+     *
+     * @param brokerId id of the broker requesting the alter partition
+     * @param topic current topic information store by the replication manager
+     * @param partitionId partition id being altered
+     * @param partition current partition registration for the partition being altered
+     * @param partitionData partition data from the alter partition request
+     *
+     * @return Errors.NONE for valid alter partition data; otherwise the validation error
+     */
+    private Errors validateAlterPartitionData(
+        int brokerId,
+        TopicControlInfo topic,
+        int partitionId,
+        PartitionRegistration partition,
+        AlterPartitionRequestData.PartitionData partitionData
+    ) {
+        if (partition == null) {
+            log.info("Rejecting alterPartition request for unknown partition {}-{}.",

Review comment:
       nit: I think we typically capitalize request names in logging




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#issuecomment-1050566273


   For this [comment](https://github.com/apache/kafka/pull/11733#discussion_r812044247):
   
   > If the version is `0` then this is guaranteed to be the default value `0` so the serialization will succeed. This is true because we only write these values in the response when the operation success. If the operation fails then we skip writing these values and instead just write the error code.
   
   This is a bit subjective, so feel free to disregard, but I do feel like some of the implicit assumptions might be causing some unnecessary obscurity. This is one case where a version check might actually be clearer and prevent the need for the extra comment.
    
   A second case is implicitly setting RECOVERED in `PendingPartitionChange`. I had a comment about this [here](https://github.com/apache/kafka/pull/11733/files#r805005385), which might have been missed. This is fine at the moment because the current patch does not do any actual recovery operation, but I think we should reconsider it when we do. Otherwise I do think it's easy to overlook the implication when making other partition state changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
jsancio commented on pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#issuecomment-1051064227


   > Hmm, suppose we have a partition with replicas=(0) and isr=(0), but 0 is offline. The user chooses to reassign to node 1 and perform an unclean election to make it the leader. That would allow the reassignment to complete, but the partition state would be RECOVERING, right? How do we get out of that state?
   
   I think that is correct. I think what we can do here is make sure that the leader sends an AlterPartition request when it changes from RECOVERING to RECOVERED. I didn't implement it in this PR since this implementation is a noop for the recovering state. Do you mind if I implement this in a future PR? I filed: https://issues.apache.org/jira/browse/KAFKA-13696


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r803180976



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
##########
@@ -110,6 +110,14 @@
     LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS),
     ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, true);
 
+    /**
+     * ALTER_ISR was the old name for ALTER_PARTITION.
+     *
+     * @deprecated since 3.2.0. Use {@link #ALTER_PARTITION} instead
+     */
+    @Deprecated

Review comment:
       Is this necessary? I don't believe this is a public API.

##########
File path: checkstyle/suppressions.xml
##########
@@ -276,6 +276,8 @@
               files="(QuorumController|QuorumControllerTest|ReplicationControlManager|ReplicationControlManagerTest).java"/>
     <suppress checks="ClassFanOutComplexity"
               files="(QuorumController|ReplicationControlManager|ReplicationControlManagerTest).java"/>
+    <suppress checks="MethodLength"

Review comment:
       Probably a sign that `alterPartition` is getting out of control. Could we consider adding some helpers instead?

##########
File path: clients/src/main/resources/common/message/AlterPartitionRequest.json
##########
@@ -34,9 +34,11 @@
         { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
           "about": "The leader epoch of this partition" },
         { "name": "NewIsr", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
-          "about": "The ISR for this partition"},
-        { "name": "CurrentIsrVersion", "type": "int32", "versions": "0+",
-          "about": "The expected version of ISR which is being updated"}
+          "about": "The ISR for this partition" },
+        { "name": "LeaderRecoveryState", "type": "int8", "versions": "1+", "default": "0",
+          "about": "1 if the partition is recovering from an unclean leader election; 0 otherwise." },
+        { "name": "PartitionEpoch", "type": "int32", "versions": "0+",
+          "about": "The expected epoch of the partition which is being updated" }

Review comment:
       It might be useful to mention that this refers to the zk version for zk clusters? Same question for response.

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -147,56 +147,65 @@ sealed trait IsrState {
   def maximalIsr: Set[Int]
 
   /**
-   * Indicates if we have an AlterIsr request inflight.
+   * The leader recovery state. See the description for LeaderRecoveryState for details on the different values.
+   */
+  def leaderRecoveryState: LeaderRecoveryState
+
+  /**
+   * Indicates if we have an AlterPartition request inflight.
    */
   def isInflight: Boolean
 }
 
-sealed trait PendingIsrChange extends IsrState {
+sealed trait PendingPartitionChange extends PartitionState {
   def sentLeaderAndIsr: LeaderAndIsr
+
+  override val leaderRecoveryState: LeaderRecoveryState = LeaderRecoveryState.RECOVERED

Review comment:
       I can't say I like the idea of getting this implicitly. I do get the point that we would not send `AlterPartition` with anything except `LeaderRecoveryState.RECOVERED` (for now), but I think we would tend to overlook the implication when it is hidden here. Can we at least include it in the `toString` implementations?

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -1305,33 +1305,33 @@ private DescribeUserScramCredentialsResponse createDescribeUserScramCredentialsR
         return new DescribeUserScramCredentialsResponse(data);
     }
 
-    private AlterIsrRequest createAlterIsrRequest(short version) {
-        AlterIsrRequestData data = new AlterIsrRequestData()
+    private AlterPartitionRequest createAlterPartitionRequest(short version) {

Review comment:
       Shall we do any tests for the new fields?

##########
File path: core/src/main/scala/kafka/api/LeaderAndIsr.scala
##########
@@ -52,11 +63,12 @@ case class LeaderAndIsr(leader: Int,
     } else if (other == null) {
       false
     } else {
-      leader == other.leader && leaderEpoch == other.leaderEpoch && isr.equals(other.isr)
+      leader == other.leader && leaderEpoch == other.leaderEpoch && isr.equals(other.isr) &&
+      leaderRecoveryState == other.leaderRecoveryState

Review comment:
       nit: indent this?

##########
File path: core/src/main/scala/kafka/controller/Election.scala
##########
@@ -40,7 +40,14 @@ object Election {
         val newLeaderAndIsrOpt = leaderOpt.map { leader =>
           val newIsr = if (isr.contains(leader)) isr.filter(replica => controllerContext.isReplicaOnline(replica, partition))
           else List(leader)
-          leaderAndIsr.newLeaderAndIsr(leader, newIsr)
+
+          if (!isr.contains(leader)) {
+            // The new leader is not in the old ISR so mark the partition a RECOVERING
+            leaderAndIsr.newRecoveringLeaderAndIsr(leader, newIsr)
+          } else {
+            // Elect a new leader but keep the previous leader recovery state

Review comment:
       Is it possible for the state to be RECOVERING here? 

##########
File path: core/src/main/scala/kafka/controller/Election.scala
##########
@@ -53,17 +60,17 @@ object Election {
    * Elect leaders for new or offline partitions.
    *
    * @param controllerContext Context with the current state of the cluster
-   * @param partitionsWithUncleanLeaderElectionState A sequence of tuples representing the partitions
+   * @param partitionsWithUncleanLeaderLeaderRecoveryState A sequence of tuples representing the partitions

Review comment:
       nit: the original name seems fine for what it returns. "LeaderLeader"?

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -542,7 +551,8 @@ class Partition(val topicPartition: TopicPartition,
         assignment = partitionState.replicas.asScala.map(_.toInt),
         isr = isr,
         addingReplicas = addingReplicas,
-        removingReplicas = removingReplicas
+        removingReplicas = removingReplicas,
+        LeaderRecoveryState.RECOVERED

Review comment:
       Can we at least log a message if the LeaderAndIsr indicates recovery is needed? I think with this implementation, if there is only one replica, the state would remain RECOVERING indefinitely. Do I have that right?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -2263,37 +2271,43 @@ class KafkaController(val config: KafkaConfig,
             .groupBy { case (tp, _) => tp.topic }   // Group by topic
             .foreach { case (topic, partitions) =>
               // Add each topic part to the response
-              val topicResp = new AlterIsrResponseData.TopicData()
+              val topicResp = new AlterPartitionResponseData.TopicData()
                 .setName(topic)
                 .setPartitions(new util.ArrayList())
               resp.topics.add(topicResp)
               partitions.foreach { case (tp, errorOrIsr) =>
                 // Add each partition part to the response (new ISR or error)
                 errorOrIsr match {
                   case Left(error) => topicResp.partitions.add(
-                    new AlterIsrResponseData.PartitionData()
+                    new AlterPartitionResponseData.PartitionData()
                       .setPartitionIndex(tp.partition)
                       .setErrorCode(error.code))
                   case Right(leaderAndIsr) => topicResp.partitions.add(
-                    new AlterIsrResponseData.PartitionData()
+                    new AlterPartitionResponseData.PartitionData()
                       .setPartitionIndex(tp.partition)
                       .setLeaderId(leaderAndIsr.leader)
                       .setLeaderEpoch(leaderAndIsr.leaderEpoch)
                       .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
-                      .setCurrentIsrVersion(leaderAndIsr.zkVersion))
+                      .setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)

Review comment:
       Do we need to make this field ignorable in the response?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -2781,8 +2812,9 @@ case object IsrChangeNotification extends ControllerEvent {
   override def preempt(): Unit = {}
 }
 
-case class AlterIsrReceived(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr],
-                            callback: AlterIsrCallback) extends ControllerEvent {
+case class AlterPartitionReceived(
+  brokerId: Int, brokerEpoch: Long, partitionssToAlter: Map[TopicPartition, LeaderAndIsr], callback: AlterPartitionCallback

Review comment:
       nit: extra 's' in "partitionss"

##########
File path: core/src/test/scala/kafka/api/LeaderAndIsrTest.scala
##########
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import org.apache.kafka.metadata.LeaderRecoveryState
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+final class LeaderAndIsrTest {
+  @Test
+  def testRecoveringLeaderAndIsr(): Unit = {
+    val leaderAndIsr = LeaderAndIsr(1, List(1, 2))
+    val recoveringLeaderAndIsr = leaderAndIsr.newRecoveringLeaderAndIsr(3, List(3))
+

Review comment:
       nit: extra newline

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -250,23 +260,35 @@ class DefaultAlterIsrManager(
         val partitionResponses: mutable.Map[TopicPartition, Either[Errors, LeaderAndIsr]] =
           new mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()
         data.topics.forEach { topic =>
-          topic.partitions().forEach(partition => {
+          topic.partitions().forEach { partition =>
             val tp = new TopicPartition(topic.name, partition.partitionIndex)
-            val error = Errors.forCode(partition.errorCode())
+            val apiError = Errors.forCode(partition.errorCode())
             debug(s"Controller successfully handled AlterIsr request for $tp: $partition")
-            if (error == Errors.NONE) {
-              val newLeaderAndIsr = new LeaderAndIsr(partition.leaderId, partition.leaderEpoch,
-                partition.isr.asScala.toList.map(_.toInt), partition.currentIsrVersion)
-              partitionResponses(tp) = Right(newLeaderAndIsr)
+            if (apiError == Errors.NONE) {
+              try {
+                partitionResponses(tp) = Right(
+                  LeaderAndIsr(
+                    partition.leaderId,
+                    partition.leaderEpoch,
+                    partition.isr.asScala.toList.map(_.toInt),
+                    LeaderRecoveryState.of(partition.leaderRecoveryState),
+                    partition.partitionEpoch
+                  )
+                )
+              } catch {
+                case e: IllegalArgumentException =>

Review comment:
       It might be cleaner to let `LeaderRecoveryState.of` return `Optional<LeaderRecoveryState>`

##########
File path: metadata/src/main/java/org/apache/kafka/metadata/LeaderRecoveryState.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+public enum LeaderRecoveryState {
+    /**
+     * Represent that the election for the partition was either an ISR election or the
+     * leader recovered from an unclean leader election.
+     */
+    RECOVERED((byte) 0),
+
+    /**
+     * Represent that the election for the partition was an unclean leader election and
+     * that the leader is recovering from it.
+     */
+    RECOVERING((byte) 1);
+
+    /**
+     * A special value used to represent that the LeaderRecoveryState field of a
+     * PartitionChangeRecord didn't change.
+     */
+    private static final byte NO_CHANGE = (byte) -1;
+
+    private static final Map<Byte, LeaderRecoveryState> VALUE_TO_ENUM;

Review comment:
       nit: the map seems like overkill with two values

##########
File path: core/src/test/scala/kafka/zk/TopicPartitionStateZNodeTest.scala
##########
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.zk
+
+import TopicPartitionStateZNode.decode
+import TopicPartitionStateZNode.encode
+import kafka.api.LeaderAndIsr
+import kafka.controller.LeaderIsrAndControllerEpoch
+import org.apache.kafka.metadata.LeaderRecoveryState
+import org.apache.zookeeper.data.Stat
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.mockito.Mockito.mock
+import org.mockito.Mockito.when
+
+final class TopicPartitionStateZNodeTest {
+
+  @Test
+  def testEncodeDecode(): Unit = {

Review comment:
       Maybe check the default case as well when leader recovery state is not defined?

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1080,6 +1092,9 @@ class Partition(val topicPartition: TopicPartition,
     // decide whether to only fetch from leader
     val localLog = localLogWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader)
 
+    // Check that the partition leader is recovering from an unclean leader election.
+    validateLeaderRecoveryState()

Review comment:
       Are we doing this on writes as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org