You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/02 13:38:48 UTC

[GitHub] [kafka] dajac opened a new pull request, #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

dajac opened a new pull request, #12240:
URL: https://github.com/apache/kafka/pull/12240

   This PR implements a first part of [KIP-841](https://cwiki.apache.org/confluence/display/KAFKA/KIP-841%3A+Fenced+replicas+should+not+be+allowed+to+join+the+ISR+in+KRaft). Specifically, it implements the following:
   * Adds a new metadata version.
   * Adds the InControlledShutdown field to the BrokerRegistrationRecord and BrokerRegistrationChangeRecord and bump their versions. The newest versions are only used if the new metadata version is enabled.
   * Writes a BrokerRegistrationChangeRecord with InControlledShutdown set when a broker requests a controlled shutdown.
   * Ensures that fenced and in controlled shutdown replicas are not picked as leaders nor included in the ISR.
   * Adds or extends unit tests.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r889854431


##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -491,6 +540,18 @@ public boolean unfenced(int brokerId) {
         return !registration.fenced();
     }
 
+    public boolean inControlledShutdown(int brokerId) {

Review Comment:
   It is actually now used in `src/main` so it is better to keep the current signature.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r890922496


##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -132,15 +143,27 @@ ClusterControlManager build() {
                 replicaPlacer = new StripedReplicaPlacer(new Random());
             }
             if (controllerMetrics == null) {
-                throw new RuntimeException("You must specify controllerMetrics");
+                throw new RuntimeException("You must specify ControllerMetrics");
+            }
+            if (featureControl == null) {
+                featureControl = new FeatureControlManager.Builder().
+                    setLogContext(logContext).
+                    setSnapshotRegistry(snapshotRegistry).
+                    setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
+                        QuorumFeatures.defaultFeatureMap(),
+                        singletonList(0))).
+                    setMetadataVersion(MetadataVersion.latest()).
+                    build();

Review Comment:
   Yeah, I agree. Let me change that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r889877186


##########
metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java:
##########
@@ -152,13 +156,22 @@ public boolean fenced() {
         return fenced;
     }
 
-    public ApiMessageAndVersion toRecord() {
+    public boolean inControlledShutdown() {
+        return inControlledShutdown;
+    }
+
+    public ApiMessageAndVersion toRecord(MetadataVersion metadataVersion) {
         RegisterBrokerRecord registrationRecord = new RegisterBrokerRecord().
             setBrokerId(id).
             setRack(rack.orElse(null)).
             setBrokerEpoch(epoch).
             setIncarnationId(incarnationId).
             setFenced(fenced);
+
+        if (metadataVersion.isInControlledShutdownStateSupported()) {
+            registrationRecord.setInControlledShutdown(inControlledShutdown);
+        }

Review Comment:
   Hmm. I am not sure about this. In the case of a "unclean" downgrade, we need to write the state with an older version and we could have `inControlledShutdown` set in the `BrokerRegistration` if we briefly used the metadata version supporting that. It seems better to me to be defensive here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r889876056


##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -682,25 +683,35 @@ private ApiError createTopic(CreatableTopic topic,
             short replicationFactor = topic.replicationFactor() == -1 ?
                 defaultReplicationFactor : topic.replicationFactor();
             try {
-                List<List<Integer>> replicas = clusterControl.replicaPlacer().place(new PlacementSpec(
+                List<List<Integer>> partitions = clusterControl.replicaPlacer().place(new PlacementSpec(
                     0,
                     numPartitions,
                     replicationFactor
                 ), clusterDescriber);
-                for (int partitionId = 0; partitionId < replicas.size(); partitionId++) {
-                    int[] r = Replicas.toArray(replicas.get(partitionId));
+                for (int partitionId = 0; partitionId < partitions.size(); partitionId++) {
+                    List<Integer> replicas = partitions.get(partitionId);
+                    List<Integer> isr = replicas.stream().
+                        filter(clusterControl::active).collect(Collectors.toList());
+                    // We need to have at least one replica in the ISR.
+                    if (isr.isEmpty()) isr.add(replicas.get(0));

Review Comment:
   That's a good point. Thinking more about this, I think that we should just fail the creation in this case. At the moment, the placer fails the assignment if all the brokers are fenced so extending this to include in-controlled-shutdown brokers seems appropriate.
   
   I still handle the case here. However, I think that we should update the placer to directly handle this. I would like to do this as a follow-up if you don't mind. That's a big change. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r890548627


##########
metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java:
##########
@@ -152,13 +156,22 @@ public boolean fenced() {
         return fenced;
     }
 
-    public ApiMessageAndVersion toRecord() {
+    public boolean inControlledShutdown() {
+        return inControlledShutdown;
+    }
+
+    public ApiMessageAndVersion toRecord(MetadataVersion metadataVersion) {
         RegisterBrokerRecord registrationRecord = new RegisterBrokerRecord().
             setBrokerId(id).
             setRack(rack.orElse(null)).
             setBrokerEpoch(epoch).
             setIncarnationId(incarnationId).
             setFenced(fenced);
+
+        if (metadataVersion.isInControlledShutdownStateSupported()) {
+            registrationRecord.setInControlledShutdown(inControlledShutdown);
+        }

Review Comment:
   Yeah. This is a question I have had it the back of my head for a while with respect to downgrades. [KIP-778](https://wiki.apache.org/confluence/pages/viewpage.action?pageId=188746840#KIP778:KRaftUpgrades-Downgrades) talks about generating and loading a new snapshot when the metadata.version is downgraded. The active controller is not allowed to load a snapshot so that means that the active controller will have to resign after generating a snapshot but before loading the new snapshot. cc @mumrah 
   
   In other words, I think you are correct. We need to be able to generate a snapshot to a previous metadata version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r889851588


##########
metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json:
##########
@@ -17,14 +17,16 @@
   "apiKey": 17,
   "type": "metadata",
   "name": "BrokerRegistrationChangeRecord",
-  "validVersions": "0",
+  "validVersions": "0-1",

Review Comment:
   I actually followed @cmccabe's [suggestion](https://lists.apache.org/thread/lgqzoww99gmjzr0v3y23cdgq5fctfrvw).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r890538379


##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -132,15 +143,27 @@ ClusterControlManager build() {
                 replicaPlacer = new StripedReplicaPlacer(new Random());
             }
             if (controllerMetrics == null) {
-                throw new RuntimeException("You must specify controllerMetrics");
+                throw new RuntimeException("You must specify ControllerMetrics");
+            }
+            if (featureControl == null) {
+                featureControl = new FeatureControlManager.Builder().
+                    setLogContext(logContext).
+                    setSnapshotRegistry(snapshotRegistry).
+                    setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
+                        QuorumFeatures.defaultFeatureMap(),
+                        singletonList(0))).
+                    setMetadataVersion(MetadataVersion.latest()).
+                    build();

Review Comment:
   I see that and I review that PR too. I guess I missed it. :smile: 
   
   I think is is risky to allow the controller managers to be instantiated without a shared feature control manager. Allowing them to get build with a `null` feature control manager makes that more likely.
   
   I can submit a PR to fix the changes to `ReplicationControlManager`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r889832039


##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -132,15 +143,27 @@ ClusterControlManager build() {
                 replicaPlacer = new StripedReplicaPlacer(new Random());
             }
             if (controllerMetrics == null) {
-                throw new RuntimeException("You must specify controllerMetrics");
+                throw new RuntimeException("You must specify ControllerMetrics");
+            }
+            if (featureControl == null) {
+                featureControl = new FeatureControlManager.Builder().
+                    setLogContext(logContext).
+                    setSnapshotRegistry(snapshotRegistry).
+                    setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
+                        QuorumFeatures.defaultFeatureMap(),
+                        singletonList(0))).
+                    setMetadataVersion(MetadataVersion.latest()).
+                    build();

Review Comment:
   I followed what we did in the `ReplicationControlManager` as part of https://github.com/apache/kafka/commit/65b4374203608dc4d68544bde16d4acd2e31efd1. I am fine either ways but I think that we should be consistent in both places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r890549624


##########
metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json:
##########
@@ -17,14 +17,16 @@
   "apiKey": 17,
   "type": "metadata",
   "name": "BrokerRegistrationChangeRecord",
-  "validVersions": "0",
+  "validVersions": "0-1",

Review Comment:
   Okay. I am not sure where or how to document this decision but it would be good to be consistent across change type records.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r890868519


##########
metadata/src/main/java/org/apache/kafka/image/MetadataImage.java:
##########
@@ -120,10 +121,16 @@ public AclsImage acls() {
     }
 
     public void write(Consumer<List<ApiMessageAndVersion>> out) {
+        // We use the minimum KRaft metadata version if this image does
+        // not have a specific version set.
+        MetadataVersion metadataVersion = features.metadataVersion();
+        if (metadataVersion.equals(MetadataVersion.UNINITIALIZED)) {
+            metadataVersion = MetadataVersion.IBP_3_0_IV1;
+        }

Review Comment:
   @mumrah is removing `UNINITIALIZED` in https://github.com/apache/kafka/pull/12250. We can remove this logic afterwards.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

Posted by GitBox <gi...@apache.org>.
mumrah commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r891371390


##########
metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json:
##########
@@ -17,14 +17,16 @@
   "apiKey": 17,
   "type": "metadata",
   "name": "BrokerRegistrationChangeRecord",
-  "validVersions": "0",
+  "validVersions": "0-1",

Review Comment:
   I actually think we should not increase the record version unless there is an incompatible change. To my knowledge, that is what the record version has conveyed historically. However, I don't think there's any _harm_ in increasing it, and I also don't think we need to solve it in this PR. Let's go with Colin's suggestion here and we can continue this discussion on the mailing list or offline



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

Posted by GitBox <gi...@apache.org>.
dajac commented on PR #12240:
URL: https://github.com/apache/kafka/pull/12240#issuecomment-1147170400

   @jsancio Thanks for your review. I have addressed or replied to your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

Posted by GitBox <gi...@apache.org>.
dengziming commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r890725858


##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -394,31 +432,49 @@ public void replay(UnregisterBrokerRecord record) {
     }
 
     public void replay(FenceBrokerRecord record) {
-        replayRegistrationChange(record, record.id(), record.epoch(),
-            BrokerRegistrationFencingChange.UNFENCE);
+        replayRegistrationChange(
+            record,
+            record.id(),
+            record.epoch(),
+            BrokerRegistrationFencingChange.UNFENCE.asBoolean(),

Review Comment:
   Similar to Jose mentioned, this should be noticed if #12236 merged first.



##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -279,6 +309,13 @@ Set<Integer> fencedBrokerIds() {
             .collect(Collectors.toSet());
     }
 
+    private short registerBrokerRecordVersion() {

Review Comment:
   How about moving this method to MetadataVersion? I find this logic also used in `BrokerRegistration.toRecord`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

Posted by GitBox <gi...@apache.org>.
mumrah commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r891357405


##########
metadata/src/main/java/org/apache/kafka/image/MetadataImage.java:
##########
@@ -120,10 +121,16 @@ public AclsImage acls() {
     }
 
     public void write(Consumer<List<ApiMessageAndVersion>> out) {
+        // We use the minimum KRaft metadata version if this image does
+        // not have a specific version set.
+        MetadataVersion metadataVersion = features.metadataVersion();
+        if (metadataVersion.equals(MetadataVersion.UNINITIALIZED)) {
+            metadataVersion = MetadataVersion.IBP_3_0_IV1;
+        }

Review Comment:
   Right, once that PR is merged, KRaft will be at metadata version IBP_3_0_IV1 implicitly until the controller finishes bootstrapping. This will be true on the controller and broker side of things



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r890595884


##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -485,12 +541,36 @@ Iterator<UsableBroker> usableBrokers() {
             id -> brokerRegistrations.get(id).rack());
     }
 
+    /**
+     * Returns true if the broker is in fenced state; Returns false if it is
+     * not or if it does not exist.
+     */
     public boolean unfenced(int brokerId) {

Review Comment:
   Minor but I am pretty sure that we don't use this method anymore in `src/main`. If so, I say that we just remove it.



##########
metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java:
##########
@@ -93,46 +93,35 @@ private BrokerRegistration getBrokerOrThrow(int brokerId, long epoch, String act
 
     public void replay(FenceBrokerRecord record) {
         BrokerRegistration broker = getBrokerOrThrow(record.id(), record.epoch(), "fence");
-        changedBrokers.put(record.id(), Optional.of(broker.cloneWithFencing(true)));
+        changedBrokers.put(record.id(), broker.maybeCloneWith(
+            BrokerRegistrationFencingChange.UNFENCE.asBoolean(),
+            Optional.empty()
+        ));
     }
 
     public void replay(UnfenceBrokerRecord record) {
         BrokerRegistration broker = getBrokerOrThrow(record.id(), record.epoch(), "unfence");
-        changedBrokers.put(record.id(), Optional.of(broker.cloneWithFencing(false)));
+        changedBrokers.put(record.id(), broker.maybeCloneWith(
+            BrokerRegistrationFencingChange.FENCE.asBoolean(),
+            Optional.empty()
+        ));

Review Comment:
   We have an inflight change that changes these enum values around. We need to be careful in the order we merge this PRs: https://github.com/apache/kafka/pull/12236#issuecomment-1147912524



##########
metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java:
##########
@@ -213,12 +230,30 @@ public String toString() {
         bld.append("}");
         bld.append(", rack=").append(rack);
         bld.append(", fenced=").append(fenced);
+        bld.append(", inControlledShutdown=").append(inControlledShutdown);
         bld.append(")");
         return bld.toString();
     }
 
-    public BrokerRegistration cloneWithFencing(boolean fencing) {
-        return new BrokerRegistration(id, epoch, incarnationId, listeners,
-            supportedFeatures, rack, fencing);
+    public Optional<BrokerRegistration> maybeCloneWith(
+        Optional<Boolean> fencingChange,
+        Optional<Boolean> inControlledShutdownChange
+    ) {
+        boolean newFenced = fencingChange.orElse(fenced);
+        boolean newInControlledShutdownChange = inControlledShutdownChange.orElse(inControlledShutdown);
+
+        if (newFenced == fenced && newInControlledShutdownChange == inControlledShutdown)
+            return Optional.empty();

Review Comment:
   Okay but we could just return `this`. Outside of `supportedFeatures`, `BrokerRegistration` is immutable and it override `equals` and `hashCode` so this should be safe.
   
   I'll submit a minor PR to make `supportFeature` immutable.



##########
metadata/src/main/java/org/apache/kafka/image/MetadataImage.java:
##########
@@ -120,10 +121,16 @@ public AclsImage acls() {
     }
 
     public void write(Consumer<List<ApiMessageAndVersion>> out) {
+        // We use the minimum KRaft metadata version if this image does
+        // not have a specific version set.
+        MetadataVersion metadataVersion = features.metadataVersion();
+        if (metadataVersion.equals(MetadataVersion.UNINITIALIZED)) {
+            metadataVersion = MetadataVersion.IBP_3_0_IV1;
+        }

Review Comment:
   I think we need a similar change for the `ClusterControlManager` iterator. If I am not mistaken it is possible for the `FeatureControlManager` to return `UNINITIALIZED` in both active controllers and inactive controllers.
   
   Should we document this default version in `MetadataVersion` instead?
   
   cc @mumrah 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r890548627


##########
metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java:
##########
@@ -152,13 +156,22 @@ public boolean fenced() {
         return fenced;
     }
 
-    public ApiMessageAndVersion toRecord() {
+    public boolean inControlledShutdown() {
+        return inControlledShutdown;
+    }
+
+    public ApiMessageAndVersion toRecord(MetadataVersion metadataVersion) {
         RegisterBrokerRecord registrationRecord = new RegisterBrokerRecord().
             setBrokerId(id).
             setRack(rack.orElse(null)).
             setBrokerEpoch(epoch).
             setIncarnationId(incarnationId).
             setFenced(fenced);
+
+        if (metadataVersion.isInControlledShutdownStateSupported()) {
+            registrationRecord.setInControlledShutdown(inControlledShutdown);
+        }

Review Comment:
   Yeah. This is a question I have had it the back of my mind for a while with respect to downgrades. [KIP-778](https://wiki.apache.org/confluence/pages/viewpage.action?pageId=188746840#KIP778:KRaftUpgrades-Downgrades) talks about generating and loading a new snapshot when the metadata.version is downgraded. The active controller is not allowed to load a snapshot so that means that the active controller will have to resign after generating a snapshot but before loading the new snapshot. cc @mumrah 
   
   In other words, I think you are correct. We need to be able to generate a snapshot to a previous metadata version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r890963791


##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -485,12 +541,36 @@ Iterator<UsableBroker> usableBrokers() {
             id -> brokerRegistrations.get(id).rack());
     }
 
+    /**
+     * Returns true if the broker is in fenced state; Returns false if it is
+     * not or if it does not exist.
+     */
     public boolean unfenced(int brokerId) {

Review Comment:
   That's right. However, it is used in many places in the tests. I haven't found a good way to replace it in tests that is as convenient as this predicate. I would keep it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r891098455


##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -682,25 +683,35 @@ private ApiError createTopic(CreatableTopic topic,
             short replicationFactor = topic.replicationFactor() == -1 ?
                 defaultReplicationFactor : topic.replicationFactor();
             try {
-                List<List<Integer>> replicas = clusterControl.replicaPlacer().place(new PlacementSpec(
+                List<List<Integer>> partitions = clusterControl.replicaPlacer().place(new PlacementSpec(
                     0,
                     numPartitions,
                     replicationFactor
                 ), clusterDescriber);
-                for (int partitionId = 0; partitionId < replicas.size(); partitionId++) {
-                    int[] r = Replicas.toArray(replicas.get(partitionId));
+                for (int partitionId = 0; partitionId < partitions.size(); partitionId++) {
+                    List<Integer> replicas = partitions.get(partitionId);
+                    List<Integer> isr = replicas.stream().
+                        filter(clusterControl::active).collect(Collectors.toList());
+                    // We need to have at least one replica in the ISR.
+                    if (isr.isEmpty()) isr.add(replicas.get(0));

Review Comment:
   Sure. https://issues.apache.org/jira/browse/KAFKA-13962. I will do this right away.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r890873379


##########
metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java:
##########
@@ -213,12 +230,30 @@ public String toString() {
         bld.append("}");
         bld.append(", rack=").append(rack);
         bld.append(", fenced=").append(fenced);
+        bld.append(", inControlledShutdown=").append(inControlledShutdown);
         bld.append(")");
         return bld.toString();
     }
 
-    public BrokerRegistration cloneWithFencing(boolean fencing) {
-        return new BrokerRegistration(id, epoch, incarnationId, listeners,
-            supportedFeatures, rack, fencing);
+    public Optional<BrokerRegistration> maybeCloneWith(
+        Optional<Boolean> fencingChange,
+        Optional<Boolean> inControlledShutdownChange
+    ) {
+        boolean newFenced = fencingChange.orElse(fenced);
+        boolean newInControlledShutdownChange = inControlledShutdownChange.orElse(inControlledShutdown);
+
+        if (newFenced == fenced && newInControlledShutdownChange == inControlledShutdown)
+            return Optional.empty();

Review Comment:
   Yeah, you're right. I am not sure why I use this Optional here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r889716710


##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -444,6 +481,46 @@ public void testCreateTopics() throws Exception {
             ctx.replicationControl.iterator(Long.MAX_VALUE));
     }
 
+    @Test
+    public void testCreateTopicsInvariants() throws Exception {
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+        ReplicationControlManager replicationControl = ctx.replicationControl;
+
+        CreateTopicsRequestData request = new CreateTopicsRequestData();
+        request.topics().add(new CreatableTopic().setName("foo").
+            setNumPartitions(-1).setReplicationFactor((short) -1));
+
+        ctx.registerBrokers(0, 1, 2);
+        ctx.unfenceBrokers(0, 1);
+        ctx.inControlledShutdownBrokers(1);
+
+        ControllerResult<CreateTopicsResponseData> result =
+            replicationControl.createTopics(request, Collections.singleton("foo"));
+
+        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
+        expectedResponse.topics().add(new CreatableTopicResult().setName("foo").
+            setNumPartitions(1).setReplicationFactor((short) 3).
+            setErrorMessage(null).setErrorCode((short) 0).
+            setTopicId(result.response().topics().find("foo").topicId()));
+        assertEquals(expectedResponse, result.response());
+
+        ctx.replay(result.records());
+
+        // Broker 2 cannot be in the ISR because it is fenced and broker 1
+        // cannot be in the ISR because it is in controlled shutdown.

Review Comment:
   Can we add a test that confirms automatic replica assignment when all of the brokers are fenced and/or shutting down?



##########
metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java:
##########
@@ -213,12 +230,29 @@ public String toString() {
         bld.append("}");
         bld.append(", rack=").append(rack);
         bld.append(", fenced=").append(fenced);
+        bld.append(", inControlledShutdown=").append(inControlledShutdown);
         bld.append(")");
         return bld.toString();
     }
 
     public BrokerRegistration cloneWithFencing(boolean fencing) {
         return new BrokerRegistration(id, epoch, incarnationId, listeners,
-            supportedFeatures, rack, fencing);
+            supportedFeatures, rack, fencing, inControlledShutdown);
+    }

Review Comment:
   What do you think about removing this method and replacing all of the calls with `cloneWith`.



##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -491,6 +540,18 @@ public boolean unfenced(int brokerId) {
         return !registration.fenced();
     }
 
+    public boolean inControlledShutdown(int brokerId) {

Review Comment:
   Let's add a Java doc explaining that a broker that doesn't exist is not shutting down. It looks like this is only used by tests. Should we document that to avoid using this in`src/main`? Since this is only used in tests how about this signature instead `Optional<Boolean> inControlledShutdown(int)`.



##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -132,15 +143,27 @@ ClusterControlManager build() {
                 replicaPlacer = new StripedReplicaPlacer(new Random());
             }
             if (controllerMetrics == null) {
-                throw new RuntimeException("You must specify controllerMetrics");
+                throw new RuntimeException("You must specify ControllerMetrics");
+            }
+            if (featureControl == null) {
+                featureControl = new FeatureControlManager.Builder().
+                    setLogContext(logContext).
+                    setSnapshotRegistry(snapshotRegistry).
+                    setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
+                        QuorumFeatures.defaultFeatureMap(),
+                        singletonList(0))).
+                    setMetadataVersion(MetadataVersion.latest()).
+                    build();

Review Comment:
   Should `build` throw an `IllegalStateException` instead? I am worried that this runs the risk of having multiple `FeatureControlManager` in a controller. This feature control manater won't get updated when the metadata version changes.



##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -410,15 +450,22 @@ public void replay(BrokerRegistrationChangeRecord record) {
             throw new RuntimeException(String.format("Unable to replay %s: unknown " +
                 "value for fenced field: %d", record.toString(), record.fenced()));
         }
+        Optional<BrokerRegistrationInControlledShutdownChange> inControlledShutdownChange =
+            BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown());
+        if (!inControlledShutdownChange.isPresent()) {
+            throw new RuntimeException(String.format("Unable to replay %s: unknown " +
+                "value for inControlledShutdown field: %d", record.toString(), record.inControlledShutdown()));

Review Comment:
   How about `IllegalStateException`?
   
   `toString()` in `record.toString()` shouldn't be needed. That is implicit since format string uses `%s`.



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -682,25 +683,35 @@ private ApiError createTopic(CreatableTopic topic,
             short replicationFactor = topic.replicationFactor() == -1 ?
                 defaultReplicationFactor : topic.replicationFactor();
             try {
-                List<List<Integer>> replicas = clusterControl.replicaPlacer().place(new PlacementSpec(
+                List<List<Integer>> partitions = clusterControl.replicaPlacer().place(new PlacementSpec(
                     0,
                     numPartitions,
                     replicationFactor
                 ), clusterDescriber);
-                for (int partitionId = 0; partitionId < replicas.size(); partitionId++) {
-                    int[] r = Replicas.toArray(replicas.get(partitionId));
+                for (int partitionId = 0; partitionId < partitions.size(); partitionId++) {
+                    List<Integer> replicas = partitions.get(partitionId);
+                    List<Integer> isr = replicas.stream().
+                        filter(clusterControl::active).collect(Collectors.toList());
+                    // We need to have at least one replica in the ISR.
+                    if (isr.isEmpty()) isr.add(replicas.get(0));

Review Comment:
   Since all of the replicas are inactive, why not add them all to the ISR? The controller doesn't know which replica will become online first. With this implementation the controller is saying that the first replica needs to come online first. I think this is unnecessarily strict.
   
   This comment also applies to the create partition RPC.



##########
metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java:
##########
@@ -102,10 +104,34 @@ public void replay(UnfenceBrokerRecord record) {
     public void replay(BrokerRegistrationChangeRecord record) {
         BrokerRegistration broker =
             getBrokerOrThrow(record.brokerId(), record.brokerEpoch(), "change");
-        if (record.fenced() < 0) {
-            changedBrokers.put(record.brokerId(), Optional.of(broker.cloneWithFencing(false)));
-        } else if (record.fenced() > 0) {
-            changedBrokers.put(record.brokerId(), Optional.of(broker.cloneWithFencing(true)));
+        Optional<BrokerRegistrationFencingChange> fencingChange =
+            BrokerRegistrationFencingChange.fromValue(record.fenced());
+        if (!fencingChange.isPresent()) {
+            throw new IllegalStateException(String.format("Unable to replay %s: unknown " +
+                "value for fenced field: %d", record.toString(), record.fenced()));
+        }
+        Optional<BrokerRegistrationInControlledShutdownChange> inControlledShutdownChange =
+            BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown());
+        if (!inControlledShutdownChange.isPresent()) {
+            throw new IllegalStateException(String.format("Unable to replay %s: unknown " +
+                "value for inControlledShutdown field: %d", record.toString(), record.inControlledShutdown()));
+        }

Review Comment:
   I think you can simplify this handling by using `Optional#orElseThrow`.



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1160,6 +1171,28 @@ void handleBrokerUnfenced(int brokerId, long brokerEpoch, List<ApiMessageAndVers
             brokersToIsrs.partitionsWithNoLeader());
     }
 
+    /**
+     * Generate the appropriate records to handle a broker starting a controlled shutdown.
+     *
+     * First, we create an BrokerRegistrationChangeRecord. Then, we remove this broker
+     * from any non-singleton ISR and elect new leaders for partitions led by this
+     * broker.
+     *
+     * @param brokerId      The broker id.
+     * @param brokerEpoch   The broker epoch.
+     * @param records       The record list to append to.
+     */
+    void handleBrokerInControlledShutdown(int brokerId, long brokerEpoch, List<ApiMessageAndVersion> records) {
+        if (featureControl.metadataVersion().isInControlledShutdownStateSupported()) {
+            records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
+                setBrokerId(brokerId).setBrokerEpoch(brokerEpoch).
+                setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()),
+                (short) 1));

Review Comment:
   Is it possible for the broker to send multiple heartbeats with a shutdown request? Do we want to handle that case and avoid writing another record?
   
   Looking at the implementation for `generateLeaderAndIsrUpdates`, it looks like it already avoids generating multiple partition change records since the broker was removed from the ISR. 



##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -429,8 +476,10 @@ private void replayRegistrationChange(
                 "registration with that epoch found", record.toString()));
         } else {
             BrokerRegistration nextRegistration = curRegistration;
-            if (fencingChange != BrokerRegistrationFencingChange.NONE) {
-                nextRegistration = nextRegistration.cloneWithFencing(fencingChange.asBoolean().get());
+            if (fencingChange != BrokerRegistrationFencingChange.NONE
+                || inControlledShutdownChange != BrokerRegistrationInControlledShutdownChange.NONE) {

Review Comment:
   This check is not needed since `cloneWith` works for all the enum values for these two types. If you would like to avoid an object allocation then maybe you can move this check to `cloneWith`.



##########
metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json:
##########
@@ -17,14 +17,16 @@
   "apiKey": 17,
   "type": "metadata",
   "name": "BrokerRegistrationChangeRecord",
-  "validVersions": "0",
+  "validVersions": "0-1",

Review Comment:
   Hmm. This version bump is not strictly required since we are protecting the tagged field with the metadata version.
   
   For example, in `PartitionChangeRecord` we added the tagged field `LeaderRecoveryState` without changing the version.
   
   @mumrah @cmccabe Do you have an opinion on this?



##########
metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java:
##########
@@ -152,13 +156,22 @@ public boolean fenced() {
         return fenced;
     }
 
-    public ApiMessageAndVersion toRecord() {
+    public boolean inControlledShutdown() {
+        return inControlledShutdown;
+    }
+
+    public ApiMessageAndVersion toRecord(MetadataVersion metadataVersion) {
         RegisterBrokerRecord registrationRecord = new RegisterBrokerRecord().
             setBrokerId(id).
             setRack(rack.orElse(null)).
             setBrokerEpoch(epoch).
             setIncarnationId(incarnationId).
             setFenced(fenced);
+
+        if (metadataVersion.isInControlledShutdownStateSupported()) {
+            registrationRecord.setInControlledShutdown(inControlledShutdown);
+        }

Review Comment:
   Isn't is an error if `inControlledShutdown` is true and `metadataVersion` doesn't support this field? In other words I think we can just set this field is `inControlledShutdown` is `true`.
   
   I suspect that we would get an `UnsupportedVersionException` when writing this record to the log or snapshot.



##########
metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationInControlledShutdownChange.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum BrokerRegistrationInControlledShutdownChange {

Review Comment:
   What do you think about document why `Optional.of(false)` is not a valid change?



##########
metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java:
##########
@@ -102,10 +104,34 @@ public void replay(UnfenceBrokerRecord record) {
     public void replay(BrokerRegistrationChangeRecord record) {
         BrokerRegistration broker =
             getBrokerOrThrow(record.brokerId(), record.brokerEpoch(), "change");
-        if (record.fenced() < 0) {
-            changedBrokers.put(record.brokerId(), Optional.of(broker.cloneWithFencing(false)));
-        } else if (record.fenced() > 0) {
-            changedBrokers.put(record.brokerId(), Optional.of(broker.cloneWithFencing(true)));
+        Optional<BrokerRegistrationFencingChange> fencingChange =
+            BrokerRegistrationFencingChange.fromValue(record.fenced());
+        if (!fencingChange.isPresent()) {
+            throw new IllegalStateException(String.format("Unable to replay %s: unknown " +
+                "value for fenced field: %d", record.toString(), record.fenced()));
+        }
+        Optional<BrokerRegistrationInControlledShutdownChange> inControlledShutdownChange =
+            BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown());
+        if (!inControlledShutdownChange.isPresent()) {
+            throw new IllegalStateException(String.format("Unable to replay %s: unknown " +
+                "value for inControlledShutdown field: %d", record.toString(), record.inControlledShutdown()));
+        }
+
+        replayRegistration(record.brokerId(), broker, fencingChange.get(), inControlledShutdownChange.get());
+    }
+
+    private void replayRegistration(
+        int brokerId,
+        BrokerRegistration broker,
+        BrokerRegistrationFencingChange fencingChange,
+        BrokerRegistrationInControlledShutdownChange inControlledShutdownChange
+    ) {
+        if (fencingChange != BrokerRegistrationFencingChange.NONE
+            || inControlledShutdownChange != BrokerRegistrationInControlledShutdownChange.NONE) {

Review Comment:
   If you follow my other comment regarding `cloneWith`, you may be able to replace this check with a check against the `BrokerRegistration` changing.
   
   As we had more changeable registration field developers need to proliferate this kind of changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r889832832


##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -410,15 +450,22 @@ public void replay(BrokerRegistrationChangeRecord record) {
             throw new RuntimeException(String.format("Unable to replay %s: unknown " +
                 "value for fenced field: %d", record.toString(), record.fenced()));
         }
+        Optional<BrokerRegistrationInControlledShutdownChange> inControlledShutdownChange =
+            BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown());
+        if (!inControlledShutdownChange.isPresent()) {
+            throw new RuntimeException(String.format("Unable to replay %s: unknown " +
+                "value for inControlledShutdown field: %d", record.toString(), record.inControlledShutdown()));

Review Comment:
   Yeah, `IllegalStateException` seems appropriate. I followed the exception that we used in https://github.com/apache/kafka/commit/65b4374203608dc4d68544bde16d4acd2e31efd1. Let me change both.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r890540775


##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -682,25 +683,35 @@ private ApiError createTopic(CreatableTopic topic,
             short replicationFactor = topic.replicationFactor() == -1 ?
                 defaultReplicationFactor : topic.replicationFactor();
             try {
-                List<List<Integer>> replicas = clusterControl.replicaPlacer().place(new PlacementSpec(
+                List<List<Integer>> partitions = clusterControl.replicaPlacer().place(new PlacementSpec(
                     0,
                     numPartitions,
                     replicationFactor
                 ), clusterDescriber);
-                for (int partitionId = 0; partitionId < replicas.size(); partitionId++) {
-                    int[] r = Replicas.toArray(replicas.get(partitionId));
+                for (int partitionId = 0; partitionId < partitions.size(); partitionId++) {
+                    List<Integer> replicas = partitions.get(partitionId);
+                    List<Integer> isr = replicas.stream().
+                        filter(clusterControl::active).collect(Collectors.toList());
+                    // We need to have at least one replica in the ISR.
+                    if (isr.isEmpty()) isr.add(replicas.get(0));

Review Comment:
   > I would like to do this as a follow-up if you don't mind. That's a big change.
   
   Sounds good. Do you mind filing an issue for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio merged pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)

Posted by GitBox <gi...@apache.org>.
jsancio merged PR #12240:
URL: https://github.com/apache/kafka/pull/12240


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org