You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/03 23:58:35 UTC

[GitHub] [kafka] junrao commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

junrao commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r644352228



##########
File path: metadata/src/main/java/org/apache/kafka/controller/RemovingAndAddingReplicas.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+
+class RemovingAndAddingReplicas {
+    private final Set<Integer> removing;
+    private final Set<Integer> adding;
+
+    RemovingAndAddingReplicas(int[] removing, int[] adding) {
+        this.removing = new HashSet<>();
+        for (int replica : removing) {
+            this.removing.add(replica);
+        }
+        this.adding = new HashSet<>();
+        for (int replica : adding) {
+            this.adding.add(replica);
+        }
+    }
+
+    RemovingAndAddingReplicas(List<Integer> removing, List<Integer> adding) {
+        this.removing = new HashSet<>(removing);
+        this.adding = new HashSet<>(adding);
+    }
+
+    /**
+     * Calculate what replicas need to be added and removed to reach a specific target
+     * replica list.
+     *
+     * @param currentReplicas   The current replica list.
+     * @param targetReplicas    The target replica list.
+     *
+     * @return                  An object containing the removing and adding replicas.
+     */
+    static RemovingAndAddingReplicas forTarget(List<Integer> currentReplicas,
+                                               List<Integer> targetReplicas) {
+        List<Integer> removingReplicas = new ArrayList<>();
+        List<Integer> addingReplicas = new ArrayList<>();
+        List<Integer> sortedCurrentReplicas = new ArrayList<>(currentReplicas);
+        sortedCurrentReplicas.sort(Integer::compareTo);
+        List<Integer> sortedTargetReplicas = new ArrayList<>(targetReplicas);
+        sortedTargetReplicas.sort(Integer::compareTo);
+        int i = 0, j = 0;
+        while (true) {
+            if (i < sortedCurrentReplicas.size()) {
+                int currentReplica = sortedCurrentReplicas.get(i);
+                if (j < sortedTargetReplicas.size()) {
+                    int targetReplica = sortedTargetReplicas.get(j);
+                    if (currentReplica < targetReplica) {
+                        removingReplicas.add(currentReplica);
+                        i++;
+                    } else if (currentReplica > targetReplica) {
+                        addingReplicas.add(targetReplica);
+                        j++;
+                    } else {
+                        i++;
+                        j++;
+                    }
+                } else {
+                    removingReplicas.add(currentReplica);
+                    i++;
+                }
+            } else if (j < sortedTargetReplicas.size()) {
+                int targetReplica = sortedTargetReplicas.get(j);
+                addingReplicas.add(targetReplica);
+                j++;
+            } else {
+                break;
+            }
+        }
+        return new RemovingAndAddingReplicas(removingReplicas, addingReplicas);
+    }
+
+    /**
+     * Calculate the merged replica list following a reassignment.
+     *
+     * The merged list will contain all of the target replicas, in the order they appear
+     * in the target list.  It will also contain existing replicas that are scheduled to
+     * be removed.
+     *
+     * If a removing replica was in position X in the original replica list, it will
+     * appear in the merged list following the appearance of X non-new replicas.

Review comment:
       The logic seems a bit complicated and I am not sure how useful it is. For example, if current and target are (1, 3, 4) and (1, 2, 4), this method returns (1, 3, 2, 4). The old code calculates merged replicas simply as (target ++ originReplicas).distinct. So, it would be (1,2,4,3). Any benefit of the new way of merging replicas?
   

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context,
         }
     }
 
+    PartitionChangeRecord generateLeaderAndIsrUpdate(TopicControlInfo topic,
+                                                     int partitionId,
+                                                     PartitionControlInfo partition,
+                                                     int[] newIsr,
+                                                     Function<Integer, Boolean> isAcceptableLeader) {
+        PartitionChangeRecord record = new PartitionChangeRecord().
+            setTopicId(topic.id).
+            setPartitionId(partitionId);
+        int[] newReplicas = partition.replicas;
+        if (partition.isrChangeCompletesReassignment(newIsr)) {
+            if (partition.addingReplicas.length > 0) {
+                record.setAddingReplicas(Collections.emptyList());
+            }
+            if (partition.removingReplicas.length > 0) {
+                record.setRemovingReplicas(Collections.emptyList());
+                newIsr = Replicas.copyWithout(newIsr, partition.removingReplicas);
+                newReplicas = Replicas.copyWithout(partition.replicas, partition.removingReplicas);
+            }
+        }
+        int newLeader;
+        if (Replicas.contains(newIsr, partition.leader)) {
+            // If the current leader is good, don't change.
+            newLeader = partition.leader;
+        } else {
+            // Choose a new leader.
+            boolean uncleanOk = configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+            newLeader = bestLeader(newReplicas, newIsr, uncleanOk, isAcceptableLeader);
+        }
+        if (!electionWasClean(newLeader, newIsr)) {
+            // After an unclean leader election, the ISR is reset to just the new leader.
+            newIsr = new int[] {newLeader};
+        } else if (newIsr.length == 0) {
+            // We never want to shrink the ISR to size 0.
+            newIsr = partition.isr;
+        }
+        if (newLeader != partition.leader) record.setLeader(newLeader);
+        if (!Arrays.equals(newIsr, partition.isr)) {
+            record.setIsr(Replicas.toList(newIsr));
+        }
+        if (!Arrays.equals(newReplicas, partition.replicas)) {
+            record.setReplicas(Replicas.toList(newReplicas));
+        }
+        return record;
+    }
+
+    ControllerResult<AlterPartitionReassignmentsResponseData>
+            alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) {
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        AlterPartitionReassignmentsResponseData result =
+                new AlterPartitionReassignmentsResponseData().setErrorMessage(null);
+        int successfulAlterations = 0, totalAlterations = 0;
+        for (ReassignableTopic topic : request.topics()) {
+            ReassignableTopicResponse topicResponse = new ReassignableTopicResponse().
+                setName(topic.name());
+            for (ReassignablePartition partition : topic.partitions()) {
+                ApiError error = ApiError.NONE;
+                try {
+                    alterPartitionReassignment(topic.name(), partition, records);
+                    successfulAlterations++;
+                } catch (Throwable e) {
+                    log.info("Unable to alter partition reassignment for " +
+                        topic.name() + ":" + partition.partitionIndex() + " because " +
+                        "of an " + e.getClass().getSimpleName() + " error: " + e.getMessage());
+                    error = ApiError.fromThrowable(e);
+                }
+                totalAlterations++;
+                topicResponse.partitions().add(new ReassignablePartitionResponse().
+                    setPartitionIndex(partition.partitionIndex()).
+                    setErrorCode(error.error().code()).
+                    setErrorMessage(error.message()));
+            }
+            result.responses().add(topicResponse);
+        }
+        log.info("Successfully altered {} out of {} partition reassignment(s).",
+            successfulAlterations, totalAlterations);
+        return ControllerResult.atomicOf(records, result);
+    }
+
+    void alterPartitionReassignment(String topicName,
+                                    ReassignablePartition partition,
+                                    List<ApiMessageAndVersion> records) {
+        Uuid topicId = topicsByName.get(topicName);
+        if (topicId == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find a topic " +
+                "named " + topicName + ".");
+        }
+        TopicControlInfo topicInfo = topics.get(topicId);
+        if (topicInfo == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find a topic " +
+                "with ID " + topicId + ".");
+        }
+        TopicIdPartition part = new TopicIdPartition(topicId, partition.partitionIndex());
+        PartitionControlInfo partitionInfo = topicInfo.parts.get(partition.partitionIndex());
+        if (partitionInfo == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find partition " +
+                topicName + ":" + partition.partitionIndex() + ".");
+        }
+        if (partition.replicas() == null) {
+            cancelPartitionReassignment(topicName, part, partitionInfo, records);
+        } else {
+            changePartitionReassignment(topicName, part, partitionInfo, partition, records);
+        }
+    }
+
+    void cancelPartitionReassignment(String topicName,
+                                     TopicIdPartition topicIdPartition,
+                                     PartitionControlInfo partition,
+                                     List<ApiMessageAndVersion> records) {
+        if (!partition.isReassigning()) {
+            throw new NoReassignmentInProgressException(NO_REASSIGNMENT_IN_PROGRESS.message());
+        }
+        PartitionChangeRecord record = new PartitionChangeRecord().
+            setTopicId(topicIdPartition.topicId()).
+            setPartitionId(topicIdPartition.partitionId());
+        if (partition.removingReplicas.length > 0) {
+            record.setRemovingReplicas(Collections.emptyList());
+        }
+        if (partition.addingReplicas.length > 0) {
+            record.setAddingReplicas(Collections.emptyList());
+        }
+        RemovingAndAddingReplicas removingAndAddingReplicas =
+            new RemovingAndAddingReplicas(partition.removingReplicas, partition.addingReplicas);
+        List<Integer> currentReplicas = Replicas.toList(partition.replicas);
+        List<Integer> currentIsr = Replicas.toList(partition.isr);
+        List<Integer> revertedReplicas = removingAndAddingReplicas.
+            calculateRevertedReplicas(currentReplicas, currentIsr);
+        if (!revertedReplicas.equals(currentReplicas)) {
+            record.setReplicas(revertedReplicas);
+            List<Integer> newIsr = new ArrayList<>();
+            for (int replica : partition.isr) {
+                if (revertedReplicas.contains(replica)) {
+                    newIsr.add(replica);
+                }
+            }
+            if (!newIsr.equals(currentIsr)) {
+                if (!newIsr.contains(partition.leader)) {
+                    int newLeader = bestLeader(Replicas.toArray(revertedReplicas),
+                        Replicas.toArray(newIsr),
+                        configurationControl.uncleanLeaderElectionEnabledForTopic(topicName),
+                        r -> clusterControl.unfenced(r));
+                    if (!electionWasClean(newLeader, Replicas.toArray(newIsr))) {
+                        newIsr = Collections.singletonList(newLeader);
+                    }
+                    record.setLeader(newLeader);
+                }
+                record.setIsr(newIsr);
+            }
+        }
+        records.add(new ApiMessageAndVersion(record, (short) 0));
+    }
+
+    void changePartitionReassignment(String topicName,
+                                     TopicIdPartition part,
+                                     PartitionControlInfo partitionInfo,
+                                     ReassignablePartition partition,
+                                     List<ApiMessageAndVersion> records) {
+        // Check that the requested partition assignment is valid.
+        validateManualPartitionAssignment(partition.replicas(), OptionalInt.empty());
+        // Calculate the replicas to add and remove.
+        List<Integer> currentReplicas = Replicas.toList(partitionInfo.replicas);
+        RemovingAndAddingReplicas removingAndAdding =
+            RemovingAndAddingReplicas.forTarget(currentReplicas, partition.replicas());
+        PartitionChangeRecord record = new PartitionChangeRecord().
+            setTopicId(part.topicId()).
+            setPartitionId(part.partitionId());
+        List<Integer> removing = removingAndAdding.removingAsList();
+        if (!removing.isEmpty()) record.setRemovingReplicas(removing);
+        List<Integer> adding = removingAndAdding.addingAsList();
+        if (!adding.isEmpty()) record.setAddingReplicas(adding);
+
+        // Calculate the merged replica list. This may involve reordering existing
+        // replicas.
+        List<Integer> newReplicas = removingAndAdding.
+            calculateMergedReplicas(currentReplicas, partition.replicas());
+        PartitionControlInfo nextPartitionInfo = partitionInfo.merge(record);
+        if (nextPartitionInfo.isrChangeCompletesReassignment(nextPartitionInfo.isr)) {
+            // Handle partition assignments which must be completed immediately.
+            // These assignments don't add any replicas, and don't remove replicas critical
+            // to maintaining a non-empty ISR.
+            record.setRemovingReplicas(null);
+            record.setAddingReplicas(null);
+            int[] newReplicasArray = Replicas.copyWithout(nextPartitionInfo.replicas,
+                nextPartitionInfo.removingReplicas);
+            newReplicas = Replicas.toList(newReplicasArray);
+            int[] newIsr = Replicas.copyWithout(nextPartitionInfo.isr,
+                nextPartitionInfo.removingReplicas);
+            if (!Arrays.equals(nextPartitionInfo.isr, newIsr)) {
+                // Check if we need to elect a new leader.
+                if (!Replicas.contains(newIsr, partitionInfo.leader)) {
+                    int newLeader = bestLeader(newReplicasArray, newIsr,
+                        configurationControl.uncleanLeaderElectionEnabledForTopic(topicName),
+                        r -> clusterControl.unfenced(r));
+                    if (!electionWasClean(newLeader, newIsr)) {
+                        newIsr = new int[] {newLeader};
+                    }
+                    record.setLeader(newLeader);
+                }
+                record.setIsr(Replicas.toList(newIsr));
+            }
+        }
+        if (!currentReplicas.equals(newReplicas)) {
+            record.setReplicas(newReplicas);
+        }
+        // Check if there are any partition changes resulting from the above. If there
+        // are, add the appropriate record.
+        if (recordContainsChanges(record)) {
+            records.add(new ApiMessageAndVersion(record, (short) 0));
+        }
+    }
+
+    /**
+     * Returns true if a partition change record doesn't actually change anything about
+     * the partition.
+     */
+    static boolean recordContainsChanges(PartitionChangeRecord record) {
+        if (record.isr() != null) return true;
+        if (record.leader() != NO_LEADER_CHANGE) return true;
+        if (record.replicas() != null) return true;
+        if (record.removingReplicas() != null) return true;
+        if (record.addingReplicas() != null) return true;
+        return false;
+    }
+
+    ListPartitionReassignmentsResponseData listPartitionReassignments(
+            List<ListPartitionReassignmentsTopics> topicList) {
+        if (topicList == null) {
+            return listAllPartitionReassignments();
+        }
+        ListPartitionReassignmentsResponseData response =
+            new ListPartitionReassignmentsResponseData().setErrorMessage(null);
+        for (ListPartitionReassignmentsTopics topic : topicList) {
+            Uuid topicId = topicsByName.get(topic.name());
+            if (topicId != null) {
+                TopicControlInfo topicInfo = topics.get(topicId);
+                if (topicInfo == null) {
+                    throw new RuntimeException("No topic entry found for " + topicId);

Review comment:
       Should we set an error code in the response instead of throwing an exception?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context,
         }
     }
 
+    PartitionChangeRecord generateLeaderAndIsrUpdate(TopicControlInfo topic,
+                                                     int partitionId,
+                                                     PartitionControlInfo partition,
+                                                     int[] newIsr,
+                                                     Function<Integer, Boolean> isAcceptableLeader) {
+        PartitionChangeRecord record = new PartitionChangeRecord().
+            setTopicId(topic.id).
+            setPartitionId(partitionId);
+        int[] newReplicas = partition.replicas;
+        if (partition.isrChangeCompletesReassignment(newIsr)) {
+            if (partition.addingReplicas.length > 0) {
+                record.setAddingReplicas(Collections.emptyList());
+            }
+            if (partition.removingReplicas.length > 0) {
+                record.setRemovingReplicas(Collections.emptyList());
+                newIsr = Replicas.copyWithout(newIsr, partition.removingReplicas);
+                newReplicas = Replicas.copyWithout(partition.replicas, partition.removingReplicas);
+            }
+        }
+        int newLeader;
+        if (Replicas.contains(newIsr, partition.leader)) {
+            // If the current leader is good, don't change.
+            newLeader = partition.leader;
+        } else {
+            // Choose a new leader.
+            boolean uncleanOk = configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+            newLeader = bestLeader(newReplicas, newIsr, uncleanOk, isAcceptableLeader);
+        }
+        if (!electionWasClean(newLeader, newIsr)) {
+            // After an unclean leader election, the ISR is reset to just the new leader.
+            newIsr = new int[] {newLeader};
+        } else if (newIsr.length == 0) {
+            // We never want to shrink the ISR to size 0.
+            newIsr = partition.isr;
+        }
+        if (newLeader != partition.leader) record.setLeader(newLeader);
+        if (!Arrays.equals(newIsr, partition.isr)) {
+            record.setIsr(Replicas.toList(newIsr));
+        }
+        if (!Arrays.equals(newReplicas, partition.replicas)) {
+            record.setReplicas(Replicas.toList(newReplicas));
+        }
+        return record;
+    }
+
+    ControllerResult<AlterPartitionReassignmentsResponseData>
+            alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) {
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        AlterPartitionReassignmentsResponseData result =
+                new AlterPartitionReassignmentsResponseData().setErrorMessage(null);
+        int successfulAlterations = 0, totalAlterations = 0;
+        for (ReassignableTopic topic : request.topics()) {
+            ReassignableTopicResponse topicResponse = new ReassignableTopicResponse().
+                setName(topic.name());
+            for (ReassignablePartition partition : topic.partitions()) {
+                ApiError error = ApiError.NONE;
+                try {
+                    alterPartitionReassignment(topic.name(), partition, records);
+                    successfulAlterations++;
+                } catch (Throwable e) {
+                    log.info("Unable to alter partition reassignment for " +
+                        topic.name() + ":" + partition.partitionIndex() + " because " +
+                        "of an " + e.getClass().getSimpleName() + " error: " + e.getMessage());
+                    error = ApiError.fromThrowable(e);
+                }
+                totalAlterations++;
+                topicResponse.partitions().add(new ReassignablePartitionResponse().
+                    setPartitionIndex(partition.partitionIndex()).
+                    setErrorCode(error.error().code()).
+                    setErrorMessage(error.message()));
+            }
+            result.responses().add(topicResponse);
+        }
+        log.info("Successfully altered {} out of {} partition reassignment(s).",
+            successfulAlterations, totalAlterations);
+        return ControllerResult.atomicOf(records, result);
+    }
+
+    void alterPartitionReassignment(String topicName,
+                                    ReassignablePartition partition,
+                                    List<ApiMessageAndVersion> records) {
+        Uuid topicId = topicsByName.get(topicName);
+        if (topicId == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find a topic " +
+                "named " + topicName + ".");
+        }
+        TopicControlInfo topicInfo = topics.get(topicId);
+        if (topicInfo == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find a topic " +
+                "with ID " + topicId + ".");
+        }
+        TopicIdPartition part = new TopicIdPartition(topicId, partition.partitionIndex());
+        PartitionControlInfo partitionInfo = topicInfo.parts.get(partition.partitionIndex());
+        if (partitionInfo == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find partition " +
+                topicName + ":" + partition.partitionIndex() + ".");
+        }
+        if (partition.replicas() == null) {
+            cancelPartitionReassignment(topicName, part, partitionInfo, records);
+        } else {
+            changePartitionReassignment(topicName, part, partitionInfo, partition, records);
+        }
+    }
+
+    void cancelPartitionReassignment(String topicName,
+                                     TopicIdPartition topicIdPartition,
+                                     PartitionControlInfo partition,
+                                     List<ApiMessageAndVersion> records) {
+        if (!partition.isReassigning()) {
+            throw new NoReassignmentInProgressException(NO_REASSIGNMENT_IN_PROGRESS.message());
+        }
+        PartitionChangeRecord record = new PartitionChangeRecord().
+            setTopicId(topicIdPartition.topicId()).
+            setPartitionId(topicIdPartition.partitionId());
+        if (partition.removingReplicas.length > 0) {
+            record.setRemovingReplicas(Collections.emptyList());
+        }
+        if (partition.addingReplicas.length > 0) {
+            record.setAddingReplicas(Collections.emptyList());
+        }
+        RemovingAndAddingReplicas removingAndAddingReplicas =
+            new RemovingAndAddingReplicas(partition.removingReplicas, partition.addingReplicas);
+        List<Integer> currentReplicas = Replicas.toList(partition.replicas);
+        List<Integer> currentIsr = Replicas.toList(partition.isr);
+        List<Integer> revertedReplicas = removingAndAddingReplicas.
+            calculateRevertedReplicas(currentReplicas, currentIsr);
+        if (!revertedReplicas.equals(currentReplicas)) {
+            record.setReplicas(revertedReplicas);
+            List<Integer> newIsr = new ArrayList<>();
+            for (int replica : partition.isr) {
+                if (revertedReplicas.contains(replica)) {
+                    newIsr.add(replica);
+                }
+            }
+            if (!newIsr.equals(currentIsr)) {
+                if (!newIsr.contains(partition.leader)) {
+                    int newLeader = bestLeader(Replicas.toArray(revertedReplicas),
+                        Replicas.toArray(newIsr),
+                        configurationControl.uncleanLeaderElectionEnabledForTopic(topicName),
+                        r -> clusterControl.unfenced(r));
+                    if (!electionWasClean(newLeader, Replicas.toArray(newIsr))) {
+                        newIsr = Collections.singletonList(newLeader);
+                    }
+                    record.setLeader(newLeader);
+                }
+                record.setIsr(newIsr);
+            }
+        }
+        records.add(new ApiMessageAndVersion(record, (short) 0));
+    }
+
+    void changePartitionReassignment(String topicName,
+                                     TopicIdPartition part,
+                                     PartitionControlInfo partitionInfo,
+                                     ReassignablePartition partition,
+                                     List<ApiMessageAndVersion> records) {
+        // Check that the requested partition assignment is valid.
+        validateManualPartitionAssignment(partition.replicas(), OptionalInt.empty());

Review comment:
       Should we further verify that after the reassignment, all partitions for the same topic have the same number of replicas?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context,
         }
     }
 
+    PartitionChangeRecord generateLeaderAndIsrUpdate(TopicControlInfo topic,
+                                                     int partitionId,
+                                                     PartitionControlInfo partition,
+                                                     int[] newIsr,
+                                                     Function<Integer, Boolean> isAcceptableLeader) {
+        PartitionChangeRecord record = new PartitionChangeRecord().
+            setTopicId(topic.id).
+            setPartitionId(partitionId);
+        int[] newReplicas = partition.replicas;
+        if (partition.isrChangeCompletesReassignment(newIsr)) {
+            if (partition.addingReplicas.length > 0) {
+                record.setAddingReplicas(Collections.emptyList());
+            }
+            if (partition.removingReplicas.length > 0) {
+                record.setRemovingReplicas(Collections.emptyList());
+                newIsr = Replicas.copyWithout(newIsr, partition.removingReplicas);
+                newReplicas = Replicas.copyWithout(partition.replicas, partition.removingReplicas);
+            }
+        }
+        int newLeader;
+        if (Replicas.contains(newIsr, partition.leader)) {
+            // If the current leader is good, don't change.
+            newLeader = partition.leader;
+        } else {
+            // Choose a new leader.
+            boolean uncleanOk = configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+            newLeader = bestLeader(newReplicas, newIsr, uncleanOk, isAcceptableLeader);
+        }
+        if (!electionWasClean(newLeader, newIsr)) {
+            // After an unclean leader election, the ISR is reset to just the new leader.
+            newIsr = new int[] {newLeader};
+        } else if (newIsr.length == 0) {
+            // We never want to shrink the ISR to size 0.
+            newIsr = partition.isr;
+        }
+        if (newLeader != partition.leader) record.setLeader(newLeader);
+        if (!Arrays.equals(newIsr, partition.isr)) {
+            record.setIsr(Replicas.toList(newIsr));
+        }
+        if (!Arrays.equals(newReplicas, partition.replicas)) {
+            record.setReplicas(Replicas.toList(newReplicas));
+        }
+        return record;
+    }
+
+    ControllerResult<AlterPartitionReassignmentsResponseData>
+            alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) {
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        AlterPartitionReassignmentsResponseData result =
+                new AlterPartitionReassignmentsResponseData().setErrorMessage(null);
+        int successfulAlterations = 0, totalAlterations = 0;
+        for (ReassignableTopic topic : request.topics()) {
+            ReassignableTopicResponse topicResponse = new ReassignableTopicResponse().
+                setName(topic.name());
+            for (ReassignablePartition partition : topic.partitions()) {
+                ApiError error = ApiError.NONE;
+                try {
+                    alterPartitionReassignment(topic.name(), partition, records);
+                    successfulAlterations++;
+                } catch (Throwable e) {
+                    log.info("Unable to alter partition reassignment for " +
+                        topic.name() + ":" + partition.partitionIndex() + " because " +
+                        "of an " + e.getClass().getSimpleName() + " error: " + e.getMessage());
+                    error = ApiError.fromThrowable(e);
+                }
+                totalAlterations++;
+                topicResponse.partitions().add(new ReassignablePartitionResponse().
+                    setPartitionIndex(partition.partitionIndex()).
+                    setErrorCode(error.error().code()).
+                    setErrorMessage(error.message()));
+            }
+            result.responses().add(topicResponse);
+        }
+        log.info("Successfully altered {} out of {} partition reassignment(s).",
+            successfulAlterations, totalAlterations);
+        return ControllerResult.atomicOf(records, result);
+    }
+
+    void alterPartitionReassignment(String topicName,
+                                    ReassignablePartition partition,
+                                    List<ApiMessageAndVersion> records) {
+        Uuid topicId = topicsByName.get(topicName);
+        if (topicId == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find a topic " +
+                "named " + topicName + ".");
+        }
+        TopicControlInfo topicInfo = topics.get(topicId);
+        if (topicInfo == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find a topic " +
+                "with ID " + topicId + ".");
+        }
+        TopicIdPartition part = new TopicIdPartition(topicId, partition.partitionIndex());
+        PartitionControlInfo partitionInfo = topicInfo.parts.get(partition.partitionIndex());
+        if (partitionInfo == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find partition " +
+                topicName + ":" + partition.partitionIndex() + ".");
+        }
+        if (partition.replicas() == null) {
+            cancelPartitionReassignment(topicName, part, partitionInfo, records);
+        } else {
+            changePartitionReassignment(topicName, part, partitionInfo, partition, records);
+        }
+    }
+
+    void cancelPartitionReassignment(String topicName,
+                                     TopicIdPartition topicIdPartition,
+                                     PartitionControlInfo partition,
+                                     List<ApiMessageAndVersion> records) {
+        if (!partition.isReassigning()) {
+            throw new NoReassignmentInProgressException(NO_REASSIGNMENT_IN_PROGRESS.message());
+        }
+        PartitionChangeRecord record = new PartitionChangeRecord().
+            setTopicId(topicIdPartition.topicId()).
+            setPartitionId(topicIdPartition.partitionId());
+        if (partition.removingReplicas.length > 0) {
+            record.setRemovingReplicas(Collections.emptyList());
+        }
+        if (partition.addingReplicas.length > 0) {
+            record.setAddingReplicas(Collections.emptyList());
+        }
+        RemovingAndAddingReplicas removingAndAddingReplicas =
+            new RemovingAndAddingReplicas(partition.removingReplicas, partition.addingReplicas);
+        List<Integer> currentReplicas = Replicas.toList(partition.replicas);
+        List<Integer> currentIsr = Replicas.toList(partition.isr);
+        List<Integer> revertedReplicas = removingAndAddingReplicas.
+            calculateRevertedReplicas(currentReplicas, currentIsr);
+        if (!revertedReplicas.equals(currentReplicas)) {
+            record.setReplicas(revertedReplicas);
+            List<Integer> newIsr = new ArrayList<>();
+            for (int replica : partition.isr) {
+                if (revertedReplicas.contains(replica)) {
+                    newIsr.add(replica);
+                }
+            }
+            if (!newIsr.equals(currentIsr)) {
+                if (!newIsr.contains(partition.leader)) {
+                    int newLeader = bestLeader(Replicas.toArray(revertedReplicas),
+                        Replicas.toArray(newIsr),
+                        configurationControl.uncleanLeaderElectionEnabledForTopic(topicName),
+                        r -> clusterControl.unfenced(r));
+                    if (!electionWasClean(newLeader, Replicas.toArray(newIsr))) {

Review comment:
       We have this logic duplicated in multiple places. It's easy to forget about this part after each leader election. Could we factor that into bestLeader() such that it returns the new leader and the new isr?

##########
File path: metadata/src/main/resources/common/metadata/PartitionChangeRecord.json
##########
@@ -29,6 +29,15 @@
       "about": "null if the ISR didn't change; the new in-sync replicas otherwise." },
     { "name": "Leader", "type": "int32", "default": "-2", "entityType": "brokerId",
       "versions": "0+", "taggedVersions": "0+", "tag": 1,
-      "about": "-1 if there is now no leader; -2 if the leader didn't change; the new leader otherwise." }
+      "about": "-1 if there is now no leader; -2 if the leader didn't change; the new leader otherwise." },
+    { "name": "Replicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
+      "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 2,

Review comment:
       Should versions start from 1+ for the new fields?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -235,6 +256,41 @@ int preferredReplica() {
             return replicas.length == 0 ? NO_LEADER : replicas[0];
         }
 
+        boolean isReassigning() {
+            return removingReplicas.length > 0 || addingReplicas.length > 0;
+        }
+
+        /**
+         * Check if an ISR change completes this partition's reassignment.
+         *
+         * @param newIsr    The new ISR.
+         * @return          True if the reassignment is complete.
+         */
+        boolean isrChangeCompletesReassignment(int[] newIsr) {

Review comment:
       The old controller disables auto leader balancing when a partition reassignment is in progress. Should we do the same thing here?

##########
File path: metadata/src/main/resources/common/metadata/PartitionRecord.json
##########
@@ -27,9 +27,9 @@
       "about": "The replicas of this partition, sorted by preferred order." },
     { "name": "Isr", "type":  "[]int32", "versions":  "0+",
       "about": "The in-sync replicas of this partition" },
-    { "name": "RemovingReplicas", "type":  "[]int32", "versions":  "0+", "nullableVersions": "0+", "entityType": "brokerId",
+    { "name": "RemovingReplicas", "type":  "[]int32", "versions":  "0+", "entityType": "brokerId",

Review comment:
       Do we need to bump up the version by removing nullable?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/RemovingAndAddingReplicas.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+
+class RemovingAndAddingReplicas {
+    private final Set<Integer> removing;
+    private final Set<Integer> adding;
+
+    RemovingAndAddingReplicas(int[] removing, int[] adding) {
+        this.removing = new HashSet<>();
+        for (int replica : removing) {
+            this.removing.add(replica);
+        }
+        this.adding = new HashSet<>();
+        for (int replica : adding) {
+            this.adding.add(replica);
+        }
+    }
+
+    RemovingAndAddingReplicas(List<Integer> removing, List<Integer> adding) {
+        this.removing = new HashSet<>(removing);
+        this.adding = new HashSet<>(adding);
+    }
+
+    /**
+     * Calculate what replicas need to be added and removed to reach a specific target
+     * replica list.
+     *
+     * @param currentReplicas   The current replica list.
+     * @param targetReplicas    The target replica list.
+     *
+     * @return                  An object containing the removing and adding replicas.
+     */
+    static RemovingAndAddingReplicas forTarget(List<Integer> currentReplicas,
+                                               List<Integer> targetReplicas) {
+        List<Integer> removingReplicas = new ArrayList<>();
+        List<Integer> addingReplicas = new ArrayList<>();
+        List<Integer> sortedCurrentReplicas = new ArrayList<>(currentReplicas);
+        sortedCurrentReplicas.sort(Integer::compareTo);
+        List<Integer> sortedTargetReplicas = new ArrayList<>(targetReplicas);
+        sortedTargetReplicas.sort(Integer::compareTo);
+        int i = 0, j = 0;
+        while (true) {
+            if (i < sortedCurrentReplicas.size()) {
+                int currentReplica = sortedCurrentReplicas.get(i);
+                if (j < sortedTargetReplicas.size()) {
+                    int targetReplica = sortedTargetReplicas.get(j);
+                    if (currentReplica < targetReplica) {
+                        removingReplicas.add(currentReplica);
+                        i++;
+                    } else if (currentReplica > targetReplica) {
+                        addingReplicas.add(targetReplica);
+                        j++;
+                    } else {
+                        i++;
+                        j++;
+                    }
+                } else {
+                    removingReplicas.add(currentReplica);
+                    i++;
+                }
+            } else if (j < sortedTargetReplicas.size()) {
+                int targetReplica = sortedTargetReplicas.get(j);
+                addingReplicas.add(targetReplica);
+                j++;
+            } else {
+                break;
+            }
+        }
+        return new RemovingAndAddingReplicas(removingReplicas, addingReplicas);
+    }
+
+    /**
+     * Calculate the merged replica list following a reassignment.
+     *
+     * The merged list will contain all of the target replicas, in the order they appear
+     * in the target list.  It will also contain existing replicas that are scheduled to
+     * be removed.
+     *
+     * If a removing replica was in position X in the original replica list, it will
+     * appear in the merged list following the appearance of X non-new replicas.
+     *
+     * @param currentReplicas       The current replica list.
+     * @param targetReplicas        The target replica list.
+     *
+     * @return                      The merged replica list.
+     */
+    List<Integer> calculateMergedReplicas(List<Integer> currentReplicas,

Review comment:
       The old class ReplicaAssignment stores the current/target list as well. Then one doesn't need to provide current and target again to compute merged replicas, which eliminates inconsistency. Could we do the same here?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context,
         }
     }
 
+    PartitionChangeRecord generateLeaderAndIsrUpdate(TopicControlInfo topic,
+                                                     int partitionId,
+                                                     PartitionControlInfo partition,
+                                                     int[] newIsr,
+                                                     Function<Integer, Boolean> isAcceptableLeader) {
+        PartitionChangeRecord record = new PartitionChangeRecord().
+            setTopicId(topic.id).
+            setPartitionId(partitionId);
+        int[] newReplicas = partition.replicas;
+        if (partition.isrChangeCompletesReassignment(newIsr)) {
+            if (partition.addingReplicas.length > 0) {
+                record.setAddingReplicas(Collections.emptyList());
+            }
+            if (partition.removingReplicas.length > 0) {
+                record.setRemovingReplicas(Collections.emptyList());
+                newIsr = Replicas.copyWithout(newIsr, partition.removingReplicas);
+                newReplicas = Replicas.copyWithout(partition.replicas, partition.removingReplicas);
+            }
+        }
+        int newLeader;
+        if (Replicas.contains(newIsr, partition.leader)) {
+            // If the current leader is good, don't change.
+            newLeader = partition.leader;
+        } else {
+            // Choose a new leader.
+            boolean uncleanOk = configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+            newLeader = bestLeader(newReplicas, newIsr, uncleanOk, isAcceptableLeader);
+        }
+        if (!electionWasClean(newLeader, newIsr)) {
+            // After an unclean leader election, the ISR is reset to just the new leader.
+            newIsr = new int[] {newLeader};
+        } else if (newIsr.length == 0) {
+            // We never want to shrink the ISR to size 0.
+            newIsr = partition.isr;
+        }
+        if (newLeader != partition.leader) record.setLeader(newLeader);
+        if (!Arrays.equals(newIsr, partition.isr)) {
+            record.setIsr(Replicas.toList(newIsr));
+        }
+        if (!Arrays.equals(newReplicas, partition.replicas)) {
+            record.setReplicas(Replicas.toList(newReplicas));
+        }
+        return record;
+    }
+
+    ControllerResult<AlterPartitionReassignmentsResponseData>
+            alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) {
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        AlterPartitionReassignmentsResponseData result =
+                new AlterPartitionReassignmentsResponseData().setErrorMessage(null);
+        int successfulAlterations = 0, totalAlterations = 0;
+        for (ReassignableTopic topic : request.topics()) {
+            ReassignableTopicResponse topicResponse = new ReassignableTopicResponse().
+                setName(topic.name());
+            for (ReassignablePartition partition : topic.partitions()) {
+                ApiError error = ApiError.NONE;
+                try {
+                    alterPartitionReassignment(topic.name(), partition, records);
+                    successfulAlterations++;
+                } catch (Throwable e) {
+                    log.info("Unable to alter partition reassignment for " +
+                        topic.name() + ":" + partition.partitionIndex() + " because " +
+                        "of an " + e.getClass().getSimpleName() + " error: " + e.getMessage());
+                    error = ApiError.fromThrowable(e);
+                }
+                totalAlterations++;
+                topicResponse.partitions().add(new ReassignablePartitionResponse().
+                    setPartitionIndex(partition.partitionIndex()).
+                    setErrorCode(error.error().code()).
+                    setErrorMessage(error.message()));
+            }
+            result.responses().add(topicResponse);
+        }
+        log.info("Successfully altered {} out of {} partition reassignment(s).",
+            successfulAlterations, totalAlterations);
+        return ControllerResult.atomicOf(records, result);
+    }
+
+    void alterPartitionReassignment(String topicName,
+                                    ReassignablePartition partition,
+                                    List<ApiMessageAndVersion> records) {
+        Uuid topicId = topicsByName.get(topicName);
+        if (topicId == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find a topic " +
+                "named " + topicName + ".");
+        }
+        TopicControlInfo topicInfo = topics.get(topicId);
+        if (topicInfo == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find a topic " +
+                "with ID " + topicId + ".");
+        }
+        TopicIdPartition part = new TopicIdPartition(topicId, partition.partitionIndex());
+        PartitionControlInfo partitionInfo = topicInfo.parts.get(partition.partitionIndex());
+        if (partitionInfo == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find partition " +
+                topicName + ":" + partition.partitionIndex() + ".");
+        }
+        if (partition.replicas() == null) {
+            cancelPartitionReassignment(topicName, part, partitionInfo, records);
+        } else {
+            changePartitionReassignment(topicName, part, partitionInfo, partition, records);
+        }
+    }
+
+    void cancelPartitionReassignment(String topicName,
+                                     TopicIdPartition topicIdPartition,
+                                     PartitionControlInfo partition,
+                                     List<ApiMessageAndVersion> records) {
+        if (!partition.isReassigning()) {
+            throw new NoReassignmentInProgressException(NO_REASSIGNMENT_IN_PROGRESS.message());
+        }
+        PartitionChangeRecord record = new PartitionChangeRecord().
+            setTopicId(topicIdPartition.topicId()).
+            setPartitionId(topicIdPartition.partitionId());
+        if (partition.removingReplicas.length > 0) {
+            record.setRemovingReplicas(Collections.emptyList());
+        }
+        if (partition.addingReplicas.length > 0) {
+            record.setAddingReplicas(Collections.emptyList());
+        }
+        RemovingAndAddingReplicas removingAndAddingReplicas =
+            new RemovingAndAddingReplicas(partition.removingReplicas, partition.addingReplicas);
+        List<Integer> currentReplicas = Replicas.toList(partition.replicas);
+        List<Integer> currentIsr = Replicas.toList(partition.isr);
+        List<Integer> revertedReplicas = removingAndAddingReplicas.
+            calculateRevertedReplicas(currentReplicas, currentIsr);
+        if (!revertedReplicas.equals(currentReplicas)) {
+            record.setReplicas(revertedReplicas);
+            List<Integer> newIsr = new ArrayList<>();
+            for (int replica : partition.isr) {
+                if (revertedReplicas.contains(replica)) {
+                    newIsr.add(replica);
+                }
+            }
+            if (!newIsr.equals(currentIsr)) {
+                if (!newIsr.contains(partition.leader)) {
+                    int newLeader = bestLeader(Replicas.toArray(revertedReplicas),
+                        Replicas.toArray(newIsr),
+                        configurationControl.uncleanLeaderElectionEnabledForTopic(topicName),
+                        r -> clusterControl.unfenced(r));
+                    if (!electionWasClean(newLeader, Replicas.toArray(newIsr))) {
+                        newIsr = Collections.singletonList(newLeader);
+                    }
+                    record.setLeader(newLeader);
+                }
+                record.setIsr(newIsr);
+            }
+        }
+        records.add(new ApiMessageAndVersion(record, (short) 0));
+    }
+
+    void changePartitionReassignment(String topicName,

Review comment:
       Could we add some comments to document the partition reassignment flow? For example, what triggers the new replicas to be added, what triggers the completion of the reassignment, how are old replicas deleted, etc.

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -711,56 +830,73 @@ BrokersToIsrs brokersToIsrs() {
             }
             TopicControlInfo topic = topics.get(topicId);
             for (AlterIsrRequestData.PartitionData partitionData : topicData.partitions()) {
-                PartitionControlInfo partition = topic.parts.get(partitionData.partitionIndex());
+                int partitionId = partitionData.partitionIndex();
+                PartitionControlInfo partition = topic.parts.get(partitionId);
                 if (partition == null) {
                     responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
-                        setPartitionIndex(partitionData.partitionIndex()).
+                        setPartitionIndex(partitionId).
                         setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()));
                     continue;
                 }
                 if (request.brokerId() != partition.leader) {
                     responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
-                        setPartitionIndex(partitionData.partitionIndex()).
+                        setPartitionIndex(partitionId).
                         setErrorCode(INVALID_REQUEST.code()));
                     continue;
                 }
                 if (partitionData.leaderEpoch() != partition.leaderEpoch) {
                     responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
-                        setPartitionIndex(partitionData.partitionIndex()).
-                        setErrorCode(Errors.FENCED_LEADER_EPOCH.code()));
+                        setPartitionIndex(partitionId).
+                        setErrorCode(FENCED_LEADER_EPOCH.code()));
                     continue;
                 }
                 if (partitionData.currentIsrVersion() != partition.partitionEpoch) {
                     responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
-                        setPartitionIndex(partitionData.partitionIndex()).
+                        setPartitionIndex(partitionId).
                         setErrorCode(Errors.INVALID_UPDATE_VERSION.code()));
                     continue;
                 }
                 int[] newIsr = Replicas.toArray(partitionData.newIsr());
                 if (!Replicas.validateIsr(partition.replicas, newIsr)) {
                     responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
-                        setPartitionIndex(partitionData.partitionIndex()).
+                        setPartitionIndex(partitionId).
                         setErrorCode(INVALID_REQUEST.code()));
                     continue;
                 }
                 if (!Replicas.contains(newIsr, partition.leader)) {
                     // An alterIsr request can't remove the current leader.
                     responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
-                        setPartitionIndex(partitionData.partitionIndex()).
+                        setPartitionIndex(partitionId).
                         setErrorCode(INVALID_REQUEST.code()));
                     continue;
                 }
-                records.add(new ApiMessageAndVersion(new PartitionChangeRecord().
-                    setPartitionId(partitionData.partitionIndex()).
-                    setTopicId(topic.id).
-                    setIsr(partitionData.newIsr()), (short) 0));
-                responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
-                    setPartitionIndex(partitionData.partitionIndex()).
-                    setErrorCode(Errors.NONE.code()).
-                    setLeaderId(partition.leader).
-                    setLeaderEpoch(partition.leaderEpoch).
-                    setCurrentIsrVersion(partition.partitionEpoch + 1).
-                    setIsr(partitionData.newIsr()));
+                PartitionChangeRecord record = generateLeaderAndIsrUpdate(topic,

Review comment:
       In the old logic, when the reassignment completes, even if the leader doesn't need to change, we bump up the leader epoch to prevent the removed replicas from being added to back to ISR unnecessarily. generateLeaderAndIsrUpdate() doesn't seem to do the same thing?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -343,12 +409,21 @@ public String toString() {
         this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0);
         this.topics = new TimelineHashMap<>(snapshotRegistry, 0);
         this.brokersToIsrs = new BrokersToIsrs(snapshotRegistry);
+        this.reassigningTopics = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.reassigningPartitionCount = new TimelineInteger(snapshotRegistry);
     }
 
     public void replay(TopicRecord record) {
-        topicsByName.put(record.name(), record.topicId());
-        topics.put(record.topicId(),
+        Uuid prevTopicId = topicsByName.get(record.name());
+        if (prevTopicId != null) {
+            replay(new RemoveTopicRecord().setTopicId(prevTopicId));
+        }
+        TopicControlInfo prevTopic = topics.put(record.topicId(),
             new TopicControlInfo(record.name(), snapshotRegistry, record.topicId()));
+        if (prevTopic != null) {

Review comment:
       Hmm, in this case, the topicId is the same, by removing the topic, we are deleting the current topicId. Perhaps we should throw an IllegalStateException since this is not expected?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
##########
@@ -90,6 +97,13 @@ public Integer value() {
                 return preferredReplicaImbalanceCount;
             }
         });
+        this.reassigningPartitionCountGauge = omitBrokerMetrics ? null :

Review comment:
       So, in the combined mode, we won't report REASSIGNING_PARTITION_COUNT? It seems it's useful in the combined mode too.

##########
File path: metadata/src/main/resources/common/metadata/PartitionChangeRecord.json
##########
@@ -29,6 +29,15 @@
       "about": "null if the ISR didn't change; the new in-sync replicas otherwise." },
     { "name": "Leader", "type": "int32", "default": "-2", "entityType": "brokerId",
       "versions": "0+", "taggedVersions": "0+", "tag": 1,
-      "about": "-1 if there is now no leader; -2 if the leader didn't change; the new leader otherwise." }
+      "about": "-1 if there is now no leader; -2 if the leader didn't change; the new leader otherwise." },
+    { "name": "Replicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
+      "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 2,
+      "about": "null if the replicas didn't change; the new replicas otherwise." },
+    { "name": "RemovingReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",

Review comment:
       Why is RemovingReplicas nullable here, but not in PartitionRecord.json?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -1111,19 +1111,19 @@ private QuorumController(LogContext logContext,
         }
         return appendWriteEvent("alterPartitionReassignments",
             time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS),
-            () -> {
-                throw new UnsupportedOperationException();
-            });
+            () -> replicationControl.alterPartitionReassignments(request));
     }
 
     @Override
     public CompletableFuture<ListPartitionReassignmentsResponseData>
             listPartitionReassignments(ListPartitionReassignmentsRequestData request) {
+        if (request.topics() != null && request.topics().isEmpty()) {
+            return CompletableFuture.completedFuture(
+                new ListPartitionReassignmentsResponseData().setErrorMessage(null));

Review comment:
       Is there a need for setErrorMessage(null)? It seem the error message defaults to null.

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context,
         }
     }
 
+    PartitionChangeRecord generateLeaderAndIsrUpdate(TopicControlInfo topic,
+                                                     int partitionId,
+                                                     PartitionControlInfo partition,
+                                                     int[] newIsr,
+                                                     Function<Integer, Boolean> isAcceptableLeader) {
+        PartitionChangeRecord record = new PartitionChangeRecord().
+            setTopicId(topic.id).
+            setPartitionId(partitionId);
+        int[] newReplicas = partition.replicas;
+        if (partition.isrChangeCompletesReassignment(newIsr)) {
+            if (partition.addingReplicas.length > 0) {
+                record.setAddingReplicas(Collections.emptyList());
+            }
+            if (partition.removingReplicas.length > 0) {
+                record.setRemovingReplicas(Collections.emptyList());
+                newIsr = Replicas.copyWithout(newIsr, partition.removingReplicas);
+                newReplicas = Replicas.copyWithout(partition.replicas, partition.removingReplicas);
+            }
+        }
+        int newLeader;
+        if (Replicas.contains(newIsr, partition.leader)) {
+            // If the current leader is good, don't change.
+            newLeader = partition.leader;
+        } else {
+            // Choose a new leader.
+            boolean uncleanOk = configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+            newLeader = bestLeader(newReplicas, newIsr, uncleanOk, isAcceptableLeader);
+        }
+        if (!electionWasClean(newLeader, newIsr)) {
+            // After an unclean leader election, the ISR is reset to just the new leader.
+            newIsr = new int[] {newLeader};
+        } else if (newIsr.length == 0) {
+            // We never want to shrink the ISR to size 0.
+            newIsr = partition.isr;
+        }
+        if (newLeader != partition.leader) record.setLeader(newLeader);
+        if (!Arrays.equals(newIsr, partition.isr)) {
+            record.setIsr(Replicas.toList(newIsr));
+        }
+        if (!Arrays.equals(newReplicas, partition.replicas)) {
+            record.setReplicas(Replicas.toList(newReplicas));
+        }
+        return record;
+    }
+
+    ControllerResult<AlterPartitionReassignmentsResponseData>
+            alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) {
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        AlterPartitionReassignmentsResponseData result =
+                new AlterPartitionReassignmentsResponseData().setErrorMessage(null);
+        int successfulAlterations = 0, totalAlterations = 0;
+        for (ReassignableTopic topic : request.topics()) {
+            ReassignableTopicResponse topicResponse = new ReassignableTopicResponse().
+                setName(topic.name());
+            for (ReassignablePartition partition : topic.partitions()) {
+                ApiError error = ApiError.NONE;
+                try {
+                    alterPartitionReassignment(topic.name(), partition, records);
+                    successfulAlterations++;
+                } catch (Throwable e) {
+                    log.info("Unable to alter partition reassignment for " +
+                        topic.name() + ":" + partition.partitionIndex() + " because " +
+                        "of an " + e.getClass().getSimpleName() + " error: " + e.getMessage());
+                    error = ApiError.fromThrowable(e);
+                }
+                totalAlterations++;
+                topicResponse.partitions().add(new ReassignablePartitionResponse().
+                    setPartitionIndex(partition.partitionIndex()).
+                    setErrorCode(error.error().code()).
+                    setErrorMessage(error.message()));
+            }
+            result.responses().add(topicResponse);
+        }
+        log.info("Successfully altered {} out of {} partition reassignment(s).",
+            successfulAlterations, totalAlterations);
+        return ControllerResult.atomicOf(records, result);
+    }
+
+    void alterPartitionReassignment(String topicName,
+                                    ReassignablePartition partition,
+                                    List<ApiMessageAndVersion> records) {
+        Uuid topicId = topicsByName.get(topicName);
+        if (topicId == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find a topic " +
+                "named " + topicName + ".");
+        }
+        TopicControlInfo topicInfo = topics.get(topicId);
+        if (topicInfo == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find a topic " +
+                "with ID " + topicId + ".");
+        }
+        TopicIdPartition part = new TopicIdPartition(topicId, partition.partitionIndex());
+        PartitionControlInfo partitionInfo = topicInfo.parts.get(partition.partitionIndex());
+        if (partitionInfo == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find partition " +
+                topicName + ":" + partition.partitionIndex() + ".");
+        }
+        if (partition.replicas() == null) {
+            cancelPartitionReassignment(topicName, part, partitionInfo, records);
+        } else {
+            changePartitionReassignment(topicName, part, partitionInfo, partition, records);
+        }
+    }
+
+    void cancelPartitionReassignment(String topicName,
+                                     TopicIdPartition topicIdPartition,
+                                     PartitionControlInfo partition,
+                                     List<ApiMessageAndVersion> records) {
+        if (!partition.isReassigning()) {
+            throw new NoReassignmentInProgressException(NO_REASSIGNMENT_IN_PROGRESS.message());
+        }
+        PartitionChangeRecord record = new PartitionChangeRecord().
+            setTopicId(topicIdPartition.topicId()).
+            setPartitionId(topicIdPartition.partitionId());
+        if (partition.removingReplicas.length > 0) {
+            record.setRemovingReplicas(Collections.emptyList());
+        }
+        if (partition.addingReplicas.length > 0) {
+            record.setAddingReplicas(Collections.emptyList());
+        }
+        RemovingAndAddingReplicas removingAndAddingReplicas =
+            new RemovingAndAddingReplicas(partition.removingReplicas, partition.addingReplicas);
+        List<Integer> currentReplicas = Replicas.toList(partition.replicas);
+        List<Integer> currentIsr = Replicas.toList(partition.isr);
+        List<Integer> revertedReplicas = removingAndAddingReplicas.
+            calculateRevertedReplicas(currentReplicas, currentIsr);
+        if (!revertedReplicas.equals(currentReplicas)) {
+            record.setReplicas(revertedReplicas);
+            List<Integer> newIsr = new ArrayList<>();
+            for (int replica : partition.isr) {
+                if (revertedReplicas.contains(replica)) {
+                    newIsr.add(replica);
+                }
+            }
+            if (!newIsr.equals(currentIsr)) {
+                if (!newIsr.contains(partition.leader)) {
+                    int newLeader = bestLeader(Replicas.toArray(revertedReplicas),
+                        Replicas.toArray(newIsr),
+                        configurationControl.uncleanLeaderElectionEnabledForTopic(topicName),
+                        r -> clusterControl.unfenced(r));
+                    if (!electionWasClean(newLeader, Replicas.toArray(newIsr))) {
+                        newIsr = Collections.singletonList(newLeader);
+                    }
+                    record.setLeader(newLeader);
+                }
+                record.setIsr(newIsr);
+            }
+        }
+        records.add(new ApiMessageAndVersion(record, (short) 0));
+    }
+
+    void changePartitionReassignment(String topicName,
+                                     TopicIdPartition part,
+                                     PartitionControlInfo partitionInfo,
+                                     ReassignablePartition partition,
+                                     List<ApiMessageAndVersion> records) {
+        // Check that the requested partition assignment is valid.
+        validateManualPartitionAssignment(partition.replicas(), OptionalInt.empty());
+        // Calculate the replicas to add and remove.
+        List<Integer> currentReplicas = Replicas.toList(partitionInfo.replicas);
+        RemovingAndAddingReplicas removingAndAdding =
+            RemovingAndAddingReplicas.forTarget(currentReplicas, partition.replicas());
+        PartitionChangeRecord record = new PartitionChangeRecord().
+            setTopicId(part.topicId()).
+            setPartitionId(part.partitionId());
+        List<Integer> removing = removingAndAdding.removingAsList();
+        if (!removing.isEmpty()) record.setRemovingReplicas(removing);
+        List<Integer> adding = removingAndAdding.addingAsList();
+        if (!adding.isEmpty()) record.setAddingReplicas(adding);
+
+        // Calculate the merged replica list. This may involve reordering existing
+        // replicas.
+        List<Integer> newReplicas = removingAndAdding.
+            calculateMergedReplicas(currentReplicas, partition.replicas());
+        PartitionControlInfo nextPartitionInfo = partitionInfo.merge(record);
+        if (nextPartitionInfo.isrChangeCompletesReassignment(nextPartitionInfo.isr)) {
+            // Handle partition assignments which must be completed immediately.
+            // These assignments don't add any replicas, and don't remove replicas critical
+            // to maintaining a non-empty ISR.
+            record.setRemovingReplicas(null);
+            record.setAddingReplicas(null);
+            int[] newReplicasArray = Replicas.copyWithout(nextPartitionInfo.replicas,
+                nextPartitionInfo.removingReplicas);
+            newReplicas = Replicas.toList(newReplicasArray);
+            int[] newIsr = Replicas.copyWithout(nextPartitionInfo.isr,
+                nextPartitionInfo.removingReplicas);
+            if (!Arrays.equals(nextPartitionInfo.isr, newIsr)) {
+                // Check if we need to elect a new leader.
+                if (!Replicas.contains(newIsr, partitionInfo.leader)) {
+                    int newLeader = bestLeader(newReplicasArray, newIsr,
+                        configurationControl.uncleanLeaderElectionEnabledForTopic(topicName),
+                        r -> clusterControl.unfenced(r));
+                    if (!electionWasClean(newLeader, newIsr)) {
+                        newIsr = new int[] {newLeader};
+                    }
+                    record.setLeader(newLeader);
+                }
+                record.setIsr(Replicas.toList(newIsr));
+            }
+        }
+        if (!currentReplicas.equals(newReplicas)) {
+            record.setReplicas(newReplicas);
+        }
+        // Check if there are any partition changes resulting from the above. If there
+        // are, add the appropriate record.
+        if (recordContainsChanges(record)) {
+            records.add(new ApiMessageAndVersion(record, (short) 0));
+        }
+    }
+
+    /**
+     * Returns true if a partition change record doesn't actually change anything about
+     * the partition.
+     */
+    static boolean recordContainsChanges(PartitionChangeRecord record) {
+        if (record.isr() != null) return true;
+        if (record.leader() != NO_LEADER_CHANGE) return true;
+        if (record.replicas() != null) return true;
+        if (record.removingReplicas() != null) return true;
+        if (record.addingReplicas() != null) return true;
+        return false;
+    }
+
+    ListPartitionReassignmentsResponseData listPartitionReassignments(
+            List<ListPartitionReassignmentsTopics> topicList) {
+        if (topicList == null) {
+            return listAllPartitionReassignments();
+        }
+        ListPartitionReassignmentsResponseData response =
+            new ListPartitionReassignmentsResponseData().setErrorMessage(null);
+        for (ListPartitionReassignmentsTopics topic : topicList) {
+            Uuid topicId = topicsByName.get(topic.name());
+            if (topicId != null) {
+                TopicControlInfo topicInfo = topics.get(topicId);
+                if (topicInfo == null) {
+                    throw new RuntimeException("No topic entry found for " + topicId);
+                }
+                OngoingTopicReassignment ongoingTopic = new OngoingTopicReassignment().
+                    setName(topic.name());
+                for (int partitionId : topic.partitionIndexes()) {
+                    Optional<OngoingPartitionReassignment> ongoing =
+                        getOngoingPartitionReassignment(topicInfo, partitionId);
+                    if (ongoing.isPresent()) {
+                        ongoingTopic.partitions().add(ongoing.get());
+                    }
+                }
+                if (!ongoingTopic.partitions().isEmpty()) {
+                    response.topics().add(ongoingTopic);
+                }
+            }
+        }
+        return response;
+    }
+
+    ListPartitionReassignmentsResponseData listAllPartitionReassignments() {

Review comment:
       This method is almost the same as listPartitionReassignments(). Could we reuse the logic somehow?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org