You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/02/11 23:01:48 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

hachikuji commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r803180976



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
##########
@@ -110,6 +110,14 @@
     LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS),
     ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, true);
 
+    /**
+     * ALTER_ISR was the old name for ALTER_PARTITION.
+     *
+     * @deprecated since 3.2.0. Use {@link #ALTER_PARTITION} instead
+     */
+    @Deprecated

Review comment:
       Is this necessary? I don't believe this is a public API.

##########
File path: checkstyle/suppressions.xml
##########
@@ -276,6 +276,8 @@
               files="(QuorumController|QuorumControllerTest|ReplicationControlManager|ReplicationControlManagerTest).java"/>
     <suppress checks="ClassFanOutComplexity"
               files="(QuorumController|ReplicationControlManager|ReplicationControlManagerTest).java"/>
+    <suppress checks="MethodLength"

Review comment:
       Probably a sign that `alterPartition` is getting out of control. Could we consider adding some helpers instead?

##########
File path: clients/src/main/resources/common/message/AlterPartitionRequest.json
##########
@@ -34,9 +34,11 @@
         { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
           "about": "The leader epoch of this partition" },
         { "name": "NewIsr", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
-          "about": "The ISR for this partition"},
-        { "name": "CurrentIsrVersion", "type": "int32", "versions": "0+",
-          "about": "The expected version of ISR which is being updated"}
+          "about": "The ISR for this partition" },
+        { "name": "LeaderRecoveryState", "type": "int8", "versions": "1+", "default": "0",
+          "about": "1 if the partition is recovering from an unclean leader election; 0 otherwise." },
+        { "name": "PartitionEpoch", "type": "int32", "versions": "0+",
+          "about": "The expected epoch of the partition which is being updated" }

Review comment:
       It might be useful to mention that this refers to the zk version for zk clusters? Same question for response.

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -147,56 +147,65 @@ sealed trait IsrState {
   def maximalIsr: Set[Int]
 
   /**
-   * Indicates if we have an AlterIsr request inflight.
+   * The leader recovery state. See the description for LeaderRecoveryState for details on the different values.
+   */
+  def leaderRecoveryState: LeaderRecoveryState
+
+  /**
+   * Indicates if we have an AlterPartition request inflight.
    */
   def isInflight: Boolean
 }
 
-sealed trait PendingIsrChange extends IsrState {
+sealed trait PendingPartitionChange extends PartitionState {
   def sentLeaderAndIsr: LeaderAndIsr
+
+  override val leaderRecoveryState: LeaderRecoveryState = LeaderRecoveryState.RECOVERED

Review comment:
       I can't say I like the idea of getting this implicitly. I do get the point that we would not send `AlterPartition` with anything except `LeaderRecoveryState.RECOVERED` (for now), but I think we would tend to overlook the implication when it is hidden here. Can we at least include it in the `toString` implementations?

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -1305,33 +1305,33 @@ private DescribeUserScramCredentialsResponse createDescribeUserScramCredentialsR
         return new DescribeUserScramCredentialsResponse(data);
     }
 
-    private AlterIsrRequest createAlterIsrRequest(short version) {
-        AlterIsrRequestData data = new AlterIsrRequestData()
+    private AlterPartitionRequest createAlterPartitionRequest(short version) {

Review comment:
       Shall we do any tests for the new fields?

##########
File path: core/src/main/scala/kafka/api/LeaderAndIsr.scala
##########
@@ -52,11 +63,12 @@ case class LeaderAndIsr(leader: Int,
     } else if (other == null) {
       false
     } else {
-      leader == other.leader && leaderEpoch == other.leaderEpoch && isr.equals(other.isr)
+      leader == other.leader && leaderEpoch == other.leaderEpoch && isr.equals(other.isr) &&
+      leaderRecoveryState == other.leaderRecoveryState

Review comment:
       nit: indent this?

##########
File path: core/src/main/scala/kafka/controller/Election.scala
##########
@@ -40,7 +40,14 @@ object Election {
         val newLeaderAndIsrOpt = leaderOpt.map { leader =>
           val newIsr = if (isr.contains(leader)) isr.filter(replica => controllerContext.isReplicaOnline(replica, partition))
           else List(leader)
-          leaderAndIsr.newLeaderAndIsr(leader, newIsr)
+
+          if (!isr.contains(leader)) {
+            // The new leader is not in the old ISR so mark the partition a RECOVERING
+            leaderAndIsr.newRecoveringLeaderAndIsr(leader, newIsr)
+          } else {
+            // Elect a new leader but keep the previous leader recovery state

Review comment:
       Is it possible for the state to be RECOVERING here? 

##########
File path: core/src/main/scala/kafka/controller/Election.scala
##########
@@ -53,17 +60,17 @@ object Election {
    * Elect leaders for new or offline partitions.
    *
    * @param controllerContext Context with the current state of the cluster
-   * @param partitionsWithUncleanLeaderElectionState A sequence of tuples representing the partitions
+   * @param partitionsWithUncleanLeaderLeaderRecoveryState A sequence of tuples representing the partitions

Review comment:
       nit: the original name seems fine for what it returns. "LeaderLeader"?

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -542,7 +551,8 @@ class Partition(val topicPartition: TopicPartition,
         assignment = partitionState.replicas.asScala.map(_.toInt),
         isr = isr,
         addingReplicas = addingReplicas,
-        removingReplicas = removingReplicas
+        removingReplicas = removingReplicas,
+        LeaderRecoveryState.RECOVERED

Review comment:
       Can we at least log a message if the LeaderAndIsr indicates recovery is needed? I think with this implementation, if there is only one replica, the state would remain RECOVERING indefinitely. Do I have that right?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -2263,37 +2271,43 @@ class KafkaController(val config: KafkaConfig,
             .groupBy { case (tp, _) => tp.topic }   // Group by topic
             .foreach { case (topic, partitions) =>
               // Add each topic part to the response
-              val topicResp = new AlterIsrResponseData.TopicData()
+              val topicResp = new AlterPartitionResponseData.TopicData()
                 .setName(topic)
                 .setPartitions(new util.ArrayList())
               resp.topics.add(topicResp)
               partitions.foreach { case (tp, errorOrIsr) =>
                 // Add each partition part to the response (new ISR or error)
                 errorOrIsr match {
                   case Left(error) => topicResp.partitions.add(
-                    new AlterIsrResponseData.PartitionData()
+                    new AlterPartitionResponseData.PartitionData()
                       .setPartitionIndex(tp.partition)
                       .setErrorCode(error.code))
                   case Right(leaderAndIsr) => topicResp.partitions.add(
-                    new AlterIsrResponseData.PartitionData()
+                    new AlterPartitionResponseData.PartitionData()
                       .setPartitionIndex(tp.partition)
                       .setLeaderId(leaderAndIsr.leader)
                       .setLeaderEpoch(leaderAndIsr.leaderEpoch)
                       .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
-                      .setCurrentIsrVersion(leaderAndIsr.zkVersion))
+                      .setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)

Review comment:
       Do we need to make this field ignorable in the response?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -2781,8 +2812,9 @@ case object IsrChangeNotification extends ControllerEvent {
   override def preempt(): Unit = {}
 }
 
-case class AlterIsrReceived(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr],
-                            callback: AlterIsrCallback) extends ControllerEvent {
+case class AlterPartitionReceived(
+  brokerId: Int, brokerEpoch: Long, partitionssToAlter: Map[TopicPartition, LeaderAndIsr], callback: AlterPartitionCallback

Review comment:
       nit: extra 's' in "partitionss"

##########
File path: core/src/test/scala/kafka/api/LeaderAndIsrTest.scala
##########
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import org.apache.kafka.metadata.LeaderRecoveryState
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+final class LeaderAndIsrTest {
+  @Test
+  def testRecoveringLeaderAndIsr(): Unit = {
+    val leaderAndIsr = LeaderAndIsr(1, List(1, 2))
+    val recoveringLeaderAndIsr = leaderAndIsr.newRecoveringLeaderAndIsr(3, List(3))
+

Review comment:
       nit: extra newline

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -250,23 +260,35 @@ class DefaultAlterIsrManager(
         val partitionResponses: mutable.Map[TopicPartition, Either[Errors, LeaderAndIsr]] =
           new mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()
         data.topics.forEach { topic =>
-          topic.partitions().forEach(partition => {
+          topic.partitions().forEach { partition =>
             val tp = new TopicPartition(topic.name, partition.partitionIndex)
-            val error = Errors.forCode(partition.errorCode())
+            val apiError = Errors.forCode(partition.errorCode())
             debug(s"Controller successfully handled AlterIsr request for $tp: $partition")
-            if (error == Errors.NONE) {
-              val newLeaderAndIsr = new LeaderAndIsr(partition.leaderId, partition.leaderEpoch,
-                partition.isr.asScala.toList.map(_.toInt), partition.currentIsrVersion)
-              partitionResponses(tp) = Right(newLeaderAndIsr)
+            if (apiError == Errors.NONE) {
+              try {
+                partitionResponses(tp) = Right(
+                  LeaderAndIsr(
+                    partition.leaderId,
+                    partition.leaderEpoch,
+                    partition.isr.asScala.toList.map(_.toInt),
+                    LeaderRecoveryState.of(partition.leaderRecoveryState),
+                    partition.partitionEpoch
+                  )
+                )
+              } catch {
+                case e: IllegalArgumentException =>

Review comment:
       It might be cleaner to let `LeaderRecoveryState.of` return `Optional<LeaderRecoveryState>`

##########
File path: metadata/src/main/java/org/apache/kafka/metadata/LeaderRecoveryState.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+public enum LeaderRecoveryState {
+    /**
+     * Represent that the election for the partition was either an ISR election or the
+     * leader recovered from an unclean leader election.
+     */
+    RECOVERED((byte) 0),
+
+    /**
+     * Represent that the election for the partition was an unclean leader election and
+     * that the leader is recovering from it.
+     */
+    RECOVERING((byte) 1);
+
+    /**
+     * A special value used to represent that the LeaderRecoveryState field of a
+     * PartitionChangeRecord didn't change.
+     */
+    private static final byte NO_CHANGE = (byte) -1;
+
+    private static final Map<Byte, LeaderRecoveryState> VALUE_TO_ENUM;

Review comment:
       nit: the map seems like overkill with two values

##########
File path: core/src/test/scala/kafka/zk/TopicPartitionStateZNodeTest.scala
##########
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.zk
+
+import TopicPartitionStateZNode.decode
+import TopicPartitionStateZNode.encode
+import kafka.api.LeaderAndIsr
+import kafka.controller.LeaderIsrAndControllerEpoch
+import org.apache.kafka.metadata.LeaderRecoveryState
+import org.apache.zookeeper.data.Stat
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.mockito.Mockito.mock
+import org.mockito.Mockito.when
+
+final class TopicPartitionStateZNodeTest {
+
+  @Test
+  def testEncodeDecode(): Unit = {

Review comment:
       Maybe check the default case as well when leader recovery state is not defined?

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1080,6 +1092,9 @@ class Partition(val topicPartition: TopicPartition,
     // decide whether to only fetch from leader
     val localLog = localLogWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader)
 
+    // Check that the partition leader is recovering from an unclean leader election.
+    validateLeaderRecoveryState()

Review comment:
       Are we doing this on writes as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org