You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/09 01:09:04 UTC

[GitHub] [kafka] jsancio commented on a diff in pull request #12181: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 2)

jsancio commented on code in PR #12181:
URL: https://github.com/apache/kafka/pull/12181#discussion_r892961128


##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -217,36 +226,56 @@ class DefaultAlterPartitionManager(
       })
   }
 
-  private def buildRequest(inflightAlterPartitionItems: Seq[AlterPartitionItem]): AlterPartitionRequestData = {
+  private def buildRequest(
+    inflightAlterPartitionItems: Seq[AlterPartitionItem],
+  ): (AlterPartitionRequestData, Boolean, mutable.Map[Uuid, String]) = {
+    val metadataVersion = metadataVersionSupplier()
+    var canUseTopicIds = metadataVersion.isAtLeast(MetadataVersion.IBP_2_8_IV0)

Review Comment:
   How about `metadataVersion.isTopidIdSupported`?



##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -217,36 +226,56 @@ class DefaultAlterPartitionManager(
       })
   }
 
-  private def buildRequest(inflightAlterPartitionItems: Seq[AlterPartitionItem]): AlterPartitionRequestData = {
+  private def buildRequest(
+    inflightAlterPartitionItems: Seq[AlterPartitionItem],
+  ): (AlterPartitionRequestData, Boolean, mutable.Map[Uuid, String]) = {
+    val metadataVersion = metadataVersionSupplier()
+    var canUseTopicIds = metadataVersion.isAtLeast(MetadataVersion.IBP_2_8_IV0)
+    // We build this mapping in order to map topic id back to their name when we
+    // receive the response. We cannot rely on the metadata cache for this because
+    // the metadata cache is updated after the partition state so it might not know
+    // yet about a topic id already used here.
+    val topicNamesByIds = mutable.HashMap[Uuid, String]()
+
     val message = new AlterPartitionRequestData()
       .setBrokerId(brokerId)
-      .setBrokerEpoch(brokerEpochSupplier.apply())
+      .setBrokerEpoch(brokerEpochSupplier())
+
+    inflightAlterPartitionItems.groupBy(_.topicIdPartition.topic).foreach { case (topicName, items) =>
+      val topicId = items.head.topicIdPartition.topicId
+      // We use topic ids only if all the topics have one defined.
+      canUseTopicIds &= topicId != Uuid.ZERO_UUID
+      topicNamesByIds(topicId) = topicName
 
-      inflightAlterPartitionItems.groupBy(_.topicPartition.topic).foreach { case (topic, items) =>
       val topicData = new AlterPartitionRequestData.TopicData()
-        .setName(topic)
+        .setTopicName(topicName)
+        .setTopicId(topicId)
       message.topics.add(topicData)
+
       items.foreach { item =>
         val partitionData = new AlterPartitionRequestData.PartitionData()
-          .setPartitionIndex(item.topicPartition.partition)
+          .setPartitionIndex(item.topicIdPartition.partition)
           .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
           .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
           .setPartitionEpoch(item.leaderAndIsr.partitionEpoch)
 
-        if (metadataVersionSupplier().isAtLeast(MetadataVersion.IBP_3_2_IV0)) {
+        if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_2_IV0)) {
           partitionData.setLeaderRecoveryState(item.leaderAndIsr.leaderRecoveryState.value)
         }
 
         topicData.partitions.add(partitionData)
       }
     }
-    message
+
+    (message, canUseTopicIds, if (canUseTopicIds) topicNamesByIds else mutable.Map.empty[Uuid, String])

Review Comment:
   I wonder if we can simplify the return values by just always returning `topicNamesByIds`. E.g. `(message, canUseTopicIds, topicNamesByIds)`



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -847,21 +862,29 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   private def needsExpandIsr(followerReplica: Replica): Boolean = {
-    canAddReplicaToIsr(followerReplica.brokerId) && isFollowerAtHighwatermark(followerReplica)
+    canAddReplicaToIsr(followerReplica.brokerId) && isFollowerInSync(followerReplica)
   }
 
   private def canAddReplicaToIsr(followerReplicaId: Int): Boolean = {
     val current = partitionState
-    !current.isInflight && !current.isr.contains(followerReplicaId)
+    !current.isInflight &&
+      !current.isr.contains(followerReplicaId) &&
+      isBrokerIsrEligible(followerReplicaId)
   }
 
-  private def isFollowerAtHighwatermark(followerReplica: Replica): Boolean = {
+  private def isFollowerInSync(followerReplica: Replica): Boolean = {
     leaderLogIfLocal.exists { leaderLog =>
       val followerEndOffset = followerReplica.stateSnapshot.logEndOffset
       followerEndOffset >= leaderLog.highWatermark && leaderEpochStartOffsetOpt.exists(followerEndOffset >= _)
     }
   }
 
+  private def isBrokerIsrEligible(brokerId: Int): Boolean = {

Review Comment:
   Minor. The replica manager refers to brokers as replicas. Why not `isReplicaIsrEligible` instead of `isBrokerIsrEligible`?



##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -217,36 +226,56 @@ class DefaultAlterPartitionManager(
       })
   }
 
-  private def buildRequest(inflightAlterPartitionItems: Seq[AlterPartitionItem]): AlterPartitionRequestData = {
+  private def buildRequest(
+    inflightAlterPartitionItems: Seq[AlterPartitionItem],
+  ): (AlterPartitionRequestData, Boolean, mutable.Map[Uuid, String]) = {
+    val metadataVersion = metadataVersionSupplier()
+    var canUseTopicIds = metadataVersion.isAtLeast(MetadataVersion.IBP_2_8_IV0)
+    // We build this mapping in order to map topic id back to their name when we
+    // receive the response. We cannot rely on the metadata cache for this because
+    // the metadata cache is updated after the partition state so it might not know
+    // yet about a topic id already used here.
+    val topicNamesByIds = mutable.HashMap[Uuid, String]()

Review Comment:
   I think we can remove the need to compute this map if you agree with my comment regarding `unsentIsrUpdates`.



##########
core/src/main/scala/kafka/server/MetadataCache.scala:
##########
@@ -58,6 +58,10 @@ trait MetadataCache {
 
   def hasAliveBroker(brokerId: Int): Boolean
 
+  def isBrokerFenced(brokerId: Int): Boolean
+
+  def isBrokerInControlledShutdown(brokerId: Int): Boolean

Review Comment:
   I wonder if we should remove these methods from this trait and the `ZkMetadataCache` type. We can move this complexity to `Partition.isBrokerIsrEligible`. You already document this subtle semantic there. I think this change would make that semantic explicit.



##########
clients/src/main/resources/common/message/AlterPartitionRequest.json:
##########
@@ -18,16 +18,21 @@
   "type": "request",
   "listeners": ["zkBroker", "controller"],
   "name": "AlterPartitionRequest",
-  "validVersions": "0-1",
+  // Version 1 adds LeaderRecoveryState field (KIP-704).
+  //
+  // Version 2 adds TopicId field to replace TopicName field (KIP-841).
+  "validVersions": "0-2",

Review Comment:
   I think it would be good to document when version 1 vs version 2 is sent. I get the impression that a few things need to be true. For example, if topic id is enabled (IBP 2.8) is it guarantee that every topic will have an id? I ask this because it is possible for this RPC to send multi topics in the request.



##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -262,29 +291,35 @@ class DefaultAlterPartitionManager(
         // Collect partition-level responses to pass to the callbacks
         val partitionResponses = new mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()

Review Comment:
   Similar to my other comment, but I think you can simplify this handling and remove the need to have `topicNamesByIds` if we make this a map of `Map[TopicIdPartition, Etiher[Errors, LeaderAndIsr]/`



##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -217,36 +226,56 @@ class DefaultAlterPartitionManager(
       })
   }
 
-  private def buildRequest(inflightAlterPartitionItems: Seq[AlterPartitionItem]): AlterPartitionRequestData = {
+  private def buildRequest(
+    inflightAlterPartitionItems: Seq[AlterPartitionItem],
+  ): (AlterPartitionRequestData, Boolean, mutable.Map[Uuid, String]) = {

Review Comment:
   I know that this is a private method but the return type is important. Can we add a JavaDoc that explains each element of the tuple returned?



##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -217,36 +226,56 @@ class DefaultAlterPartitionManager(
       })
   }
 
-  private def buildRequest(inflightAlterPartitionItems: Seq[AlterPartitionItem]): AlterPartitionRequestData = {
+  private def buildRequest(
+    inflightAlterPartitionItems: Seq[AlterPartitionItem],
+  ): (AlterPartitionRequestData, Boolean, mutable.Map[Uuid, String]) = {
+    val metadataVersion = metadataVersionSupplier()
+    var canUseTopicIds = metadataVersion.isAtLeast(MetadataVersion.IBP_2_8_IV0)
+    // We build this mapping in order to map topic id back to their name when we
+    // receive the response. We cannot rely on the metadata cache for this because
+    // the metadata cache is updated after the partition state so it might not know
+    // yet about a topic id already used here.
+    val topicNamesByIds = mutable.HashMap[Uuid, String]()
+
     val message = new AlterPartitionRequestData()
       .setBrokerId(brokerId)
-      .setBrokerEpoch(brokerEpochSupplier.apply())
+      .setBrokerEpoch(brokerEpochSupplier())
+
+    inflightAlterPartitionItems.groupBy(_.topicIdPartition.topic).foreach { case (topicName, items) =>
+      val topicId = items.head.topicIdPartition.topicId
+      // We use topic ids only if all the topics have one defined.
+      canUseTopicIds &= topicId != Uuid.ZERO_UUID
+      topicNamesByIds(topicId) = topicName
 
-      inflightAlterPartitionItems.groupBy(_.topicPartition.topic).foreach { case (topic, items) =>
       val topicData = new AlterPartitionRequestData.TopicData()
-        .setName(topic)
+        .setTopicName(topicName)
+        .setTopicId(topicId)
       message.topics.add(topicData)
+
       items.foreach { item =>
         val partitionData = new AlterPartitionRequestData.PartitionData()
-          .setPartitionIndex(item.topicPartition.partition)
+          .setPartitionIndex(item.topicIdPartition.partition)
           .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
           .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
           .setPartitionEpoch(item.leaderAndIsr.partitionEpoch)
 
-        if (metadataVersionSupplier().isAtLeast(MetadataVersion.IBP_3_2_IV0)) {
+        if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_2_IV0)) {

Review Comment:
   Do you mind changing this to `metadataVersion.isLeaderRecoverySupported`?



##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -124,7 +126,9 @@ class DefaultAlterPartitionManager(
   val metadataVersionSupplier: () => MetadataVersion
 ) extends AlterPartitionManager with Logging with KafkaMetricsGroup {
 
-  // Used to allow only one pending ISR update per partition (visible for testing)
+  // Used to allow only one pending ISR update per partition (visible for testing).
+  // Note that we key items by TopicPartition despite using TopicIdPartition while
+  // submitting it. We do this because we don't always have a topic id to rely on.

Review Comment:
   You should be able to use `TopidIdPartition` with the zero uuid if topic id is not supported. This is correct because equality for `kafka.common.TopidIdPartition` requires that all three parts are equal: topid id, topic name, topic partition.



##########
metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java:
##########
@@ -38,18 +43,49 @@ public static OptionalLong requestTimeoutMsToDeadlineNs(
         return OptionalLong.of(time.nanoseconds() + NANOSECONDS.convert(millisecondsOffset, MILLISECONDS));
     }
 
-    private final KafkaPrincipal principal;
+    public static ControllerRequestContext anonymousContextFor(ApiKeys apiKeys) {

Review Comment:
   Similar comment for this method. I don't think this is safe to have in `src/main` I don't see us using this in the future. If you agree, lets move this to `src/test`.



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -159,48 +159,63 @@ sealed trait PartitionState {
 }
 
 sealed trait PendingPartitionChange extends PartitionState {
+  def lastCommittedState: PartitionState

Review Comment:
   This is a great idea. I have been meaning to do this for a while.
   
   Is the type `PartitionState` too flexible? Should this always be `CommittedPartitionState`? Maybe the easiest way to do this with the current code structure is to move this to `PartitionState` with `CommittedPartitionState` implementing this method with `this`.
   
   Now that we have `lastCommittedState`, I think that we have redundant data in both `PendingPartitionChange` types. For example, `isr` is always set to `lastCommittedState.isr`.
   
   What do you think?



##########
metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java:
##########
@@ -17,18 +17,23 @@
 
 package org.apache.kafka.controller;
 
+import org.apache.kafka.common.message.RequestHeaderData;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.utils.Time;
 
 import java.util.OptionalLong;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 
 public class ControllerRequestContext {
     public static final ControllerRequestContext ANONYMOUS_CONTEXT =

Review Comment:
   Is this true that this is the anonymous context now that we have the request header. Having this in `src/main` is not safe. It looks like we only use this in tests. Should we move this helper to `src/main`?



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -915,22 +918,31 @@ Set<TopicIdPartition> imbalancedPartitions() {
         return new HashSet<>(imbalancedPartitions);
     }
 
-    ControllerResult<AlterPartitionResponseData> alterPartition(AlterPartitionRequestData request) {
+    ControllerResult<AlterPartitionResponseData> alterPartition(
+        ControllerRequestContext context,
+        AlterPartitionRequestData request
+    ) {
+        short requestVersion = context.requestHeader().requestApiVersion();
         clusterControl.checkBrokerEpoch(request.brokerId(), request.brokerEpoch());
         AlterPartitionResponseData response = new AlterPartitionResponseData();
         List<ApiMessageAndVersion> records = new ArrayList<>();
         for (AlterPartitionRequestData.TopicData topicData : request.topics()) {
             AlterPartitionResponseData.TopicData responseTopicData =
-                new AlterPartitionResponseData.TopicData().setName(topicData.name());
+                new AlterPartitionResponseData.TopicData().
+                    setTopicName(topicData.topicName()).
+                    setTopicId(topicData.topicId());
             response.topics().add(responseTopicData);
-            Uuid topicId = topicsByName.get(topicData.name());
-            if (topicId == null || !topics.containsKey(topicId)) {
+
+            Uuid topicId = requestVersion > 1 ? topicData.topicId() : topicsByName.get(topicData.topicName());

Review Comment:
   Btw, since the default value for topicId is `ZERO_UUID` and the default value topicName is `""` you maybe able to do this without needed the request version.



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -915,22 +918,31 @@ Set<TopicIdPartition> imbalancedPartitions() {
         return new HashSet<>(imbalancedPartitions);
     }
 
-    ControllerResult<AlterPartitionResponseData> alterPartition(AlterPartitionRequestData request) {
+    ControllerResult<AlterPartitionResponseData> alterPartition(
+        ControllerRequestContext context,
+        AlterPartitionRequestData request

Review Comment:
   Outside the scope of this PR but I wonder if we need a type for this? E.g.
   
   ```java
   class ControllerRequest<T extends ApiMessage> {
       ControllerRequesContext context;
       T request;
   }



##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -217,36 +225,51 @@ class DefaultAlterPartitionManager(
       })
   }
 
-  private def buildRequest(inflightAlterPartitionItems: Seq[AlterPartitionItem]): AlterPartitionRequestData = {
+  private def buildRequest(
+    inflightAlterPartitionItems: Seq[AlterPartitionItem],
+  ): (AlterPartitionRequestData, Boolean, mutable.Map[Uuid, String]) = {
+    val metadataVersion = metadataVersionSupplier()
+    var canUseTopicIds = metadataVersion.isAtLeast(MetadataVersion.IBP_2_8_IV0)
+    val topicNamesByIds = mutable.HashMap[Uuid, String]()
+
     val message = new AlterPartitionRequestData()
       .setBrokerId(brokerId)
-      .setBrokerEpoch(brokerEpochSupplier.apply())
+      .setBrokerEpoch(brokerEpochSupplier())
+
+    inflightAlterPartitionItems.groupBy(_.topicIdPartition.topic).foreach { case (topicName, items) =>
+      val topicId = items.head.topicIdPartition.topicId
+      // We use topic ids only if all the topics have one defined.
+      canUseTopicIds &= topicId != Uuid.ZERO_UUID
+      topicNamesByIds(topicId) = topicName
 
-      inflightAlterPartitionItems.groupBy(_.topicPartition.topic).foreach { case (topic, items) =>
       val topicData = new AlterPartitionRequestData.TopicData()
-        .setName(topic)
+        .setTopicName(topicName)
+        .setTopicId(topicId)

Review Comment:
   Yeah, this is the reason why @dajac had to mark both `TopicName` and `TopicId` as `ignorable`. Ideally we wouldn't do that.
   
   It would be difficult and error prone to expose the ApiVersion to this layer as it would change when the active controller fails over.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org