You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2019/08/14 16:25:45 UTC

[kafka] branch trunk updated: KAFKA-8345: KIP-455: Admin API changes (Part 2) (#7120)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5129ab5  KAFKA-8345: KIP-455: Admin API changes (Part 2) (#7120)
5129ab5 is described below

commit 5129ab53ee9a2e46a22a13b1d5300ee139c4999f
Author: Stanislav Kozlovski <st...@outlook.com>
AuthorDate: Wed Aug 14 17:25:17 2019 +0100

    KAFKA-8345: KIP-455: Admin API changes (Part 2) (#7120)
    
    Add the AlterPartitionReassignments and ListPartitionReassignments APIs.  Also remove an unused methodlength suppression for KafkaAdminClient.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>, Viktor Somogyi <vi...@gmail.com>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../java/org/apache/kafka/clients/admin/Admin.java | 115 ++++++++++
 .../admin/AlterPartitionReassignmentsOptions.java  |  31 +++
 .../admin/AlterPartitionReassignmentsResult.java   |  59 +++++
 .../kafka/clients/admin/KafkaAdminClient.java      | 250 +++++++++++++++++++++
 .../admin/ListPartitionReassignmentsOptions.java   |  29 +++
 .../admin/ListPartitionReassignmentsResult.java    |  43 ++++
 .../clients/admin/NewPartitionReassignment.java    |  44 ++++
 .../kafka/clients/admin/PartitionReassignment.java |  60 +++++
 .../AlterPartitionReassignmentsResponse.java       |   2 +-
 .../ListPartitionReassignmentsRequest.java         |  21 +-
 .../ListPartitionReassignmentsResponse.java        |   2 +-
 .../message/ListPartitionReassignmentsRequest.json |   2 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 215 ++++++++++++++++++
 .../kafka/clients/admin/MockAdminClient.java       |  12 +
 .../kafka/common/requests/RequestResponseTest.java |   6 +-
 16 files changed, 878 insertions(+), 15 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 0bb2193..475b9b0 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -52,7 +52,7 @@
               files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|SchemaGenerator|AbstractCoordinator).java"/>
 
     <suppress checks="JavaNCSS"
-              files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java|TransactionManagerTest.java|SenderTest.java"/>
+              files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java|TransactionManagerTest.java|SenderTest.java|KafkaAdminClient.java"/>
 
     <suppress checks="NPathComplexity"
               files="(BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer).java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index c1be7a7..8fdd21b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -21,6 +21,7 @@ import java.time.Duration;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -930,6 +931,120 @@ public interface Admin extends AutoCloseable {
         Set<TopicPartition> partitions,
         ElectLeadersOptions options);
 
+
+    /**
+     * Change the reassignments for one or more partitions.
+     * Providing an empty Optional (e.g via {@link Optional#empty()}) will <bold>revert</bold> the reassignment for the associated partition.
+     *
+     * This is a convenience method for {@link #alterPartitionReassignments(Map, AlterPartitionReassignmentsOptions)}
+     * with default options.  See the overload for more details.
+     */
+    default AlterPartitionReassignmentsResult alterPartitionReassignments(
+        Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments) {
+        return alterPartitionReassignments(reassignments, new AlterPartitionReassignmentsOptions());
+    }
+
+    /**
+     * Change the reassignments for one or more partitions.
+     * Providing an empty Optional (e.g via {@link Optional#empty()}) will <bold>revert</bold> the reassignment for the associated partition.
+     *
+     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
+     * the returned {@code AlterPartitionReassignmentsResult}:</p>
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+     *   If the authenticated user didn't have alter access to the cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException}
+     *   If the topic or partition does not exist within the cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   if the request timed out before the controller could record the new assignments.</li>
+     *   <li>{@link org.apache.kafka.common.errors.InvalidReplicaAssignmentException}
+     *   If the specified assignment was not valid.</li>
+     *   <li>{@link org.apache.kafka.common.errors.NoReassignmentInProgressException}
+     *   If there was an attempt to cancel a reassignment for a partition which was not being reassigned.</li>
+     * </ul>
+     *
+     * @param reassignments   The reassignments to add, modify, or remove.
+     * @param options         The options to use.
+     * @return                The result.
+     */
+    AlterPartitionReassignmentsResult alterPartitionReassignments(
+        Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments,
+        AlterPartitionReassignmentsOptions options);
+
+
+    /**
+     * List all of the current partition reassignments
+     *
+     * This is a convenience method for {@link #listPartitionReassignments(ListPartitionReassignmentsOptions)}
+     * with default options. See the overload for more details.
+     */
+    default ListPartitionReassignmentsResult listPartitionReassignments() {
+        return listPartitionReassignments(new ListPartitionReassignmentsOptions());
+    }
+
+    /**
+     * List the current reassignments for the given partitions
+     *
+     * This is a convenience method for {@link #listPartitionReassignments(Set, ListPartitionReassignmentsOptions)}
+     * with default options. See the overload for more details.
+     */
+    default ListPartitionReassignmentsResult listPartitionReassignments(Set<TopicPartition> partitions) {
+        return listPartitionReassignments(partitions, new ListPartitionReassignmentsOptions());
+    }
+
+    /**
+     * List the current reassignments for the given partitions
+     *
+     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
+     * the returned {@code ListPartitionReassignmentsResult}:</p>
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+     *   If the authenticated user doesn't have alter access to the cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException}
+     *   If a given topic or partition does not exist.</li>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the controller could list the current reassignments.</li>
+     * </ul>
+     *
+     * @param partitions      The topic partitions to list reassignments for.
+     * @param options         The options to use.
+     * @return                The result.
+     */
+    default ListPartitionReassignmentsResult listPartitionReassignments(
+        Set<TopicPartition> partitions,
+        ListPartitionReassignmentsOptions options) {
+        return listPartitionReassignments(Optional.of(partitions), options);
+    }
+
+    /**
+     * List all of the current partition reassignments
+     *
+     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
+     * the returned {@code ListPartitionReassignmentsResult}:</p>
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+     *   If the authenticated user doesn't have alter access to the cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException}
+     *   If a given topic or partition does not exist.</li>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the controller could list the current reassignments.</li>
+     * </ul>
+     *
+     * @param options         The options to use.
+     * @return                The result.
+     */
+    default ListPartitionReassignmentsResult listPartitionReassignments(ListPartitionReassignmentsOptions options) {
+        return listPartitionReassignments(Optional.empty(), options);
+    }
+
+    /**
+     * @param partitions the partitions we want to get reassignment for, or an empty optional if we want to get the reassignments for all partitions in the cluster
+     * @param options         The options to use.
+     * @return                The result.
+     */
+    ListPartitionReassignmentsResult listPartitionReassignments(Optional<Set<TopicPartition>> partitions,
+                                                                ListPartitionReassignmentsOptions options);
+
     /**
      * Get the metrics kept by the adminClient
      */
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsOptions.java
new file mode 100644
index 0000000..bee9c70
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsOptions.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * Options for {@link AdminClient#alterPartitionReassignments(Map, AlterPartitionReassignmentsOptions)}
+ *
+ * The API of this class is evolving. See {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class AlterPartitionReassignmentsOptions extends AbstractOptions<AlterPartitionReassignmentsOptions> {
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsResult.java
new file mode 100644
index 0000000..2009ab5
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsResult.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * The result of {@link AdminClient#alterPartitionReassignments(Map, AlterPartitionReassignmentsOptions)}.
+ *
+ * The API of this class is evolving. See {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class AlterPartitionReassignmentsResult {
+    private final Map<TopicPartition, KafkaFuture<Void>> futures;
+
+    AlterPartitionReassignmentsResult(Map<TopicPartition, KafkaFuture<Void>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     * Return a map from partitions to futures which can be used to check the status of the reassignment.
+     *
+     * Possible error codes:
+     *
+     * INVALID_REPLICA_ASSIGNMENT (39) -  if the specified replica assignment was not valid -- for example, if it included negative numbers, repeated numbers, or specified a broker ID that the controller was not aware of.
+     * NO_REASSIGNMENT_IN_PROGRESS (85) - if the request wants to cancel reassignments but none exist
+     * UNKNOWN (-1)
+     *
+     */
+    public Map<TopicPartition, KafkaFuture<Void>> values() {
+        return futures;
+    }
+
+    /**
+     * Return a future which succeeds only if all the reassignments were successfully initiated.
+     */
+    public KafkaFuture<Void> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 7bcad41..936dc65 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -61,6 +61,8 @@ import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
 import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
 import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
 import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
@@ -81,6 +83,7 @@ import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.Altera
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
 import org.apache.kafka.common.message.ListGroupsRequestData;
 import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
 import org.apache.kafka.common.message.MetadataRequestData;
 import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
 import org.apache.kafka.common.metrics.JmxReporter;
@@ -95,6 +98,8 @@ import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.AlterConfigsRequest;
 import org.apache.kafka.common.requests.AlterConfigsResponse;
+import org.apache.kafka.common.requests.AlterPartitionReassignmentsRequest;
+import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse;
 import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
 import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
 import org.apache.kafka.common.requests.ApiError;
@@ -140,6 +145,8 @@ import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
 import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
 import org.apache.kafka.common.requests.ListGroupsRequest;
 import org.apache.kafka.common.requests.ListGroupsResponse;
+import org.apache.kafka.common.requests.ListPartitionReassignmentsRequest;
+import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.OffsetFetchRequest;
@@ -170,7 +177,9 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
+import java.util.TreeMap;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -179,6 +188,12 @@ import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignablePartition;
+import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
+import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
+import static org.apache.kafka.common.message.ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics;
+import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
+import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
 import static org.apache.kafka.common.requests.MetadataRequest.convertToMetadataRequestTopic;
 import static org.apache.kafka.common.utils.Utils.closeQuietly;
 
@@ -3082,6 +3097,241 @@ public class KafkaAdminClient extends AdminClient {
         return new ElectLeadersResult(electionFuture);
     }
 
+    @Override
+    public AlterPartitionReassignmentsResult alterPartitionReassignments(
+            Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments,
+            AlterPartitionReassignmentsOptions options) {
+        final Map<TopicPartition, KafkaFutureImpl<Void>> futures = new HashMap<>();
+        final Map<String, Map<Integer, Optional<NewPartitionReassignment>>> topicsToReassignments = new TreeMap<>();
+        for (Map.Entry<TopicPartition, Optional<NewPartitionReassignment>> entry : reassignments.entrySet()) {
+            String topic = entry.getKey().topic();
+            int partition = entry.getKey().partition();
+            TopicPartition topicPartition = new TopicPartition(topic, partition);
+            Optional<NewPartitionReassignment> reassignment = entry.getValue();
+            KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+            futures.put(topicPartition, future);
+
+            if (topicNameIsUnrepresentable(topic)) {
+                future.completeExceptionally(new InvalidTopicException("The given topic name '" +
+                        topic + "' cannot be represented in a request."));
+            } else if (topicPartition.partition() < 0) {
+                future.completeExceptionally(new InvalidTopicException("The given partition index " +
+                        topicPartition.partition() + " is not valid."));
+            } else {
+                Map<Integer, Optional<NewPartitionReassignment>> partitionReassignments =
+                        topicsToReassignments.get(topicPartition.topic());
+                if (partitionReassignments == null) {
+                    partitionReassignments = new TreeMap<>();
+                    topicsToReassignments.put(topic, partitionReassignments);
+                }
+
+                partitionReassignments.put(partition, reassignment);
+            }
+        }
+
+        final long now = time.milliseconds();
+        Call call = new Call("alterPartitionReassignments", calcDeadlineMs(now, options.timeoutMs()),
+                new ControllerNodeProvider()) {
+
+            @Override
+            public AbstractRequest.Builder createRequest(int timeoutMs) {
+                AlterPartitionReassignmentsRequestData data =
+                        new AlterPartitionReassignmentsRequestData();
+                for (Map.Entry<String, Map<Integer, Optional<NewPartitionReassignment>>> entry :
+                        topicsToReassignments.entrySet()) {
+                    String topicName = entry.getKey();
+                    Map<Integer, Optional<NewPartitionReassignment>> partitionsToReassignments = entry.getValue();
+
+                    List<ReassignablePartition> reassignablePartitions = new ArrayList<>();
+                    for (Map.Entry<Integer, Optional<NewPartitionReassignment>> partitionEntry :
+                            partitionsToReassignments.entrySet()) {
+                        int partitionIndex = partitionEntry.getKey();
+                        Optional<NewPartitionReassignment> reassignment = partitionEntry.getValue();
+
+                        ReassignablePartition reassignablePartition = new ReassignablePartition()
+                                .setPartitionIndex(partitionIndex)
+                                .setReplicas(reassignment.map(NewPartitionReassignment::targetBrokers).orElse(null));
+                        reassignablePartitions.add(reassignablePartition);
+                    }
+
+                    ReassignableTopic reassignableTopic = new ReassignableTopic()
+                            .setName(topicName)
+                            .setPartitions(reassignablePartitions);
+                    data.topics().add(reassignableTopic);
+                }
+                data.setTimeoutMs(timeoutMs);
+                return new AlterPartitionReassignmentsRequest.Builder(data);
+            }
+
+            @Override
+            public void handleResponse(AbstractResponse abstractResponse) {
+                AlterPartitionReassignmentsResponse response = (AlterPartitionReassignmentsResponse) abstractResponse;
+                Map<TopicPartition, ApiException> errors = new HashMap<>();
+                int receivedResponsesCount = 0;
+
+                Errors topLevelError = Errors.forCode(response.data().errorCode());
+                switch (topLevelError) {
+                    case NONE:
+                        receivedResponsesCount += validateTopicResponses(response.data().responses(), errors);
+                        break;
+                    case NOT_CONTROLLER:
+                        handleNotControllerError(topLevelError);
+                        break;
+                    default:
+                        for (ReassignableTopicResponse topicResponse : response.data().responses()) {
+                            String topicName = topicResponse.name();
+                            for (ReassignablePartitionResponse partition : topicResponse.partitions()) {
+                                errors.put(
+                                        new TopicPartition(topicName, partition.partitionIndex()),
+                                        new ApiError(topLevelError, topLevelError.message()).exception()
+                                );
+                                receivedResponsesCount += 1;
+                            }
+                        }
+                        break;
+                }
+
+                assertResponseCountMatch(errors, receivedResponsesCount);
+                for (Map.Entry<TopicPartition, ApiException> entry : errors.entrySet()) {
+                    ApiException exception = entry.getValue();
+                    if (exception == null)
+                        futures.get(entry.getKey()).complete(null);
+                    else
+                        futures.get(entry.getKey()).completeExceptionally(exception);
+                }
+            }
+
+            private void assertResponseCountMatch(Map<TopicPartition, ApiException> errors, int receivedResponsesCount) {
+                int expectedResponsesCount = topicsToReassignments.values().stream().mapToInt(Map::size).sum();
+                if (errors.values().stream().noneMatch(Objects::nonNull) && receivedResponsesCount != expectedResponsesCount) {
+                    String quantifier = receivedResponsesCount > expectedResponsesCount ? "many" : "less";
+                    throw new UnknownServerException("The server returned too " + quantifier + " results." +
+                        "Expected " + expectedResponsesCount + " but received " + receivedResponsesCount);
+                }
+            }
+
+            private int validateTopicResponses(List<ReassignableTopicResponse> topicResponses,
+                                               Map<TopicPartition, ApiException> errors) {
+                int receivedResponsesCount = 0;
+
+                for (ReassignableTopicResponse topicResponse : topicResponses) {
+                    String topicName = topicResponse.name();
+                    for (ReassignablePartitionResponse partResponse : topicResponse.partitions()) {
+                        Errors partitionError = Errors.forCode(partResponse.errorCode());
+
+                        TopicPartition tp = new TopicPartition(topicName, partResponse.partitionIndex());
+                        if (partitionError == Errors.NONE) {
+                            errors.put(tp, null);
+                        } else {
+                            errors.put(tp, new ApiError(partitionError, partResponse.errorMessage()).exception());
+                        }
+                        receivedResponsesCount += 1;
+                    }
+                }
+
+                return receivedResponsesCount;
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                for (KafkaFutureImpl<Void> future : futures.values()) {
+                    future.completeExceptionally(throwable);
+                }
+            }
+        };
+        if (!topicsToReassignments.isEmpty()) {
+            runnable.call(call, now);
+        }
+        return new AlterPartitionReassignmentsResult(new HashMap<>(futures));
+    }
+
+    @Override
+    public ListPartitionReassignmentsResult listPartitionReassignments(Optional<Set<TopicPartition>> partitions,
+                                                                       ListPartitionReassignmentsOptions options) {
+        final KafkaFutureImpl<Map<TopicPartition, PartitionReassignment>> partitionReassignmentsFuture = new KafkaFutureImpl<>();
+        if (partitions.isPresent()) {
+            for (TopicPartition tp : partitions.get()) {
+                String topic = tp.topic();
+                int partition = tp.partition();
+                if (topicNameIsUnrepresentable(topic)) {
+                    partitionReassignmentsFuture.completeExceptionally(new InvalidTopicException("The given topic name '"
+                            + topic + "' cannot be represented in a request."));
+                } else if (partition < 0) {
+                    partitionReassignmentsFuture.completeExceptionally(new InvalidTopicException("The given partition index " +
+                            partition + " is not valid."));
+                }
+                if (partitionReassignmentsFuture.isCompletedExceptionally())
+                    return new ListPartitionReassignmentsResult(partitionReassignmentsFuture);
+            }
+        }
+        final long now = time.milliseconds();
+        runnable.call(new Call("listPartitionReassignments", calcDeadlineMs(now, options.timeoutMs()),
+            new ControllerNodeProvider()) {
+
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                ListPartitionReassignmentsRequestData listData = new ListPartitionReassignmentsRequestData();
+                listData.setTimeoutMs(timeoutMs);
+
+                if (partitions.isPresent()) {
+                    Map<String, ListPartitionReassignmentsTopics> reassignmentTopicByTopicName = new HashMap<>();
+
+                    for (TopicPartition tp : partitions.get()) {
+                        if (!reassignmentTopicByTopicName.containsKey(tp.topic()))
+                            reassignmentTopicByTopicName.put(tp.topic(), new ListPartitionReassignmentsTopics().setName(tp.topic()));
+
+                        reassignmentTopicByTopicName.get(tp.topic()).partitionIndexes().add(tp.partition());
+                    }
+
+                    listData.setTopics(new ArrayList<>(reassignmentTopicByTopicName.values()));
+                }
+                return new ListPartitionReassignmentsRequest.Builder(listData);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                ListPartitionReassignmentsResponse response = (ListPartitionReassignmentsResponse) abstractResponse;
+                Errors error = Errors.forCode(response.data().errorCode());
+                switch (error) {
+                    case NONE:
+                        break;
+                    case NOT_CONTROLLER:
+                        handleNotControllerError(error);
+                        break;
+                    default:
+                        partitionReassignmentsFuture.completeExceptionally(new ApiError(error, response.data().errorMessage()).exception());
+                        break;
+                }
+                Map<TopicPartition, PartitionReassignment> reassignmentMap = new HashMap<>();
+
+                for (OngoingTopicReassignment topicReassignment : response.data().topics()) {
+                    String topicName = topicReassignment.name();
+                    for (OngoingPartitionReassignment partitionReassignment : topicReassignment.partitions()) {
+                        reassignmentMap.put(
+                            new TopicPartition(topicName, partitionReassignment.partitionIndex()),
+                            new PartitionReassignment(partitionReassignment.replicas(), partitionReassignment.addingReplicas(), partitionReassignment.removingReplicas())
+                        );
+                    }
+                }
+
+                partitionReassignmentsFuture.complete(reassignmentMap);
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                partitionReassignmentsFuture.completeExceptionally(throwable);
+            }
+        }, now);
+
+        return new ListPartitionReassignmentsResult(partitionReassignmentsFuture);
+    }
+
+    private void handleNotControllerError(Errors error) throws ApiException {
+        metadataManager.clearController();
+        metadataManager.requestUpdate();
+        throw error.exception();
+    }
+
     /**
      * Returns a boolean indicating whether the resource needs to go to a specific node
      */
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListPartitionReassignmentsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListPartitionReassignmentsOptions.java
new file mode 100644
index 0000000..7dcc7a6
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListPartitionReassignmentsOptions.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link AdminClient#listPartitionReassignments(ListPartitionReassignmentsOptions)}
+ *
+ * The API of this class is evolving. See {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class ListPartitionReassignmentsOptions extends AbstractOptions<ListPartitionReassignmentsOptions> {
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListPartitionReassignmentsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListPartitionReassignmentsResult.java
new file mode 100644
index 0000000..3d7b14c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListPartitionReassignmentsResult.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Map;
+
+/**
+ * The result of {@link AdminClient#listPartitionReassignments(ListPartitionReassignmentsOptions)}.
+ *
+ * The API of this class is evolving. See {@link AdminClient} for details.
+ */
+public class ListPartitionReassignmentsResult {
+    private final KafkaFuture<Map<TopicPartition, PartitionReassignment>> future;
+
+    public ListPartitionReassignmentsResult(KafkaFuture<Map<TopicPartition, PartitionReassignment>> reassignments) {
+        this.future = reassignments;
+    }
+
+    /**
+     * Return a future which yields a map containing each partition's reassignments
+     */
+    public KafkaFuture<Map<TopicPartition, PartitionReassignment>> reassignments() {
+        return future;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
new file mode 100644
index 0000000..8856470
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * A new partition reassignment, which can be applied via {@link AdminClient#alterPartitionReassignments(Map, AlterPartitionReassignmentsOptions)}.
+ */
+public class NewPartitionReassignment {
+    private final List<Integer> targetBrokers;
+
+    public static Optional<NewPartitionReassignment> of(Integer... brokers) {
+        return Optional.of(new NewPartitionReassignment(Arrays.asList(brokers)));
+    }
+
+    public NewPartitionReassignment(List<Integer> targetBrokers) {
+        this.targetBrokers = Collections.unmodifiableList(new ArrayList<>(targetBrokers));
+    }
+
+    public List<Integer> targetBrokers() {
+        return targetBrokers;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/PartitionReassignment.java b/clients/src/main/java/org/apache/kafka/clients/admin/PartitionReassignment.java
new file mode 100644
index 0000000..cc3306e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/PartitionReassignment.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A partition reassignment, which has been listed via {@link AdminClient#listPartitionReassignments()}.
+ */
+public class PartitionReassignment {
+
+    private final List<Integer> replicas;
+    private final List<Integer> addingReplicas;
+    private final List<Integer> removingReplicas;
+
+    public PartitionReassignment(List<Integer> replicas, List<Integer> addingReplicas, List<Integer> removingReplicas) {
+        this.replicas = Collections.unmodifiableList(replicas);
+        this.addingReplicas = Collections.unmodifiableList(addingReplicas);
+        this.removingReplicas = Collections.unmodifiableList(removingReplicas);
+    }
+
+    /**
+     * The brokers which this partition currently resides on.
+     */
+    public List<Integer> replicas() {
+        return replicas;
+    }
+
+    /**
+     * The brokers that we are adding this partition to as part of a reassignment.
+     * A subset of replicas.
+     */
+    public List<Integer> addingReplicas() {
+        return addingReplicas;
+    }
+
+    /**
+     * The brokers that we are removing this partition from as part of a reassignment.
+     * A subset of replicas.
+     */
+    public List<Integer> removingReplicas() {
+        return removingReplicas;
+    }
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java
index db1cfab..ef235cd 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java
@@ -36,7 +36,7 @@ public class AlterPartitionReassignmentsResponse extends AbstractResponse {
         this(struct, ApiKeys.ALTER_PARTITION_REASSIGNMENTS.latestVersion());
     }
 
-    AlterPartitionReassignmentsResponse(AlterPartitionReassignmentsResponseData data) {
+    public AlterPartitionReassignmentsResponse(AlterPartitionReassignmentsResponseData data) {
         this.data = data;
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsRequest.java
index 471147b..0aa5e55 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsRequest.java
@@ -24,9 +24,11 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.common.message.ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics;
 
 public class ListPartitionReassignmentsRequest extends AbstractRequest {
 
@@ -86,14 +88,17 @@ public class ListPartitionReassignmentsRequest extends AbstractRequest {
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         ApiError apiError = ApiError.fromThrowable(e);
 
-        List<OngoingTopicReassignment> ongoingTopicReassignments = data.topics().stream().map(topic ->
-                new OngoingTopicReassignment()
-                        .setName(topic.name())
-                        .setPartitions(topic.partitionIndexes().stream().map(partitionIndex ->
-                                new OngoingPartitionReassignment().setPartitionIndex(partitionIndex)).collect(Collectors.toList())
-                        )
-        ).collect(Collectors.toList());
-
+        List<OngoingTopicReassignment> ongoingTopicReassignments = new ArrayList<>();
+        if (data.topics() != null) {
+            for (ListPartitionReassignmentsTopics topic : data.topics()) {
+                ongoingTopicReassignments.add(
+                        new OngoingTopicReassignment()
+                                .setName(topic.name())
+                                .setPartitions(topic.partitionIndexes().stream().map(partitionIndex ->
+                                        new OngoingPartitionReassignment().setPartitionIndex(partitionIndex)).collect(Collectors.toList()))
+                );
+            }
+        }
         ListPartitionReassignmentsResponseData responseData = new ListPartitionReassignmentsResponseData()
                 .setTopics(ongoingTopicReassignments)
                 .setErrorCode(apiError.error().code())
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsResponse.java
index 9513e88..0d53e4d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsResponse.java
@@ -33,7 +33,7 @@ public class ListPartitionReassignmentsResponse extends AbstractResponse {
         this(struct, ApiKeys.LIST_PARTITION_REASSIGNMENTS.latestVersion());
     }
 
-    ListPartitionReassignmentsResponse(ListPartitionReassignmentsResponseData responseData) {
+    public ListPartitionReassignmentsResponse(ListPartitionReassignmentsResponseData responseData) {
         this.data = responseData;
     }
 
diff --git a/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json b/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json
index d0ebf8b..ac871b2 100644
--- a/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json
+++ b/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json
@@ -21,7 +21,7 @@
   "fields": [
     { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
       "about": "The time in ms to wait for the request to complete." },
-    { "name": "Topics", "type": "[]ListPartitionReassignmentsTopics", "versions": "0+", "nullableVersions": "0+",
+    { "name": "Topics", "type": "[]ListPartitionReassignmentsTopics", "versions": "0+", "nullableVersions": "0+", "default": "null",
       "about": "The topics to list partition reassignments for, or null to list everything.", "fields": [
       { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name" },
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 7ec7c24..ff9e272 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -52,6 +52,7 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicDeletionDisabledException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
 import org.apache.kafka.common.message.DeleteGroupsResponseData;
@@ -66,7 +67,9 @@ import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionR
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
 import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
 import org.apache.kafka.common.requests.CreateAclsResponse;
@@ -87,6 +90,7 @@ import org.apache.kafka.common.requests.ElectLeadersResponse;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
 import org.apache.kafka.common.requests.ListGroupsResponse;
+import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
@@ -119,11 +123,16 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
+import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
+import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
+import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
+import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -1561,6 +1570,212 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testAlterPartitionReassignments() throws Exception {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            TopicPartition tp1 = new TopicPartition("A", 0);
+            TopicPartition tp2 = new TopicPartition("B", 0);
+            Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments = new HashMap<>();
+            reassignments.put(tp1, Optional.empty());
+            reassignments.put(tp2, NewPartitionReassignment.of(1, 2, 3));
+
+            // 1. server returns less responses than number of partitions we sent
+            AlterPartitionReassignmentsResponseData responseData1 = new AlterPartitionReassignmentsResponseData();
+            ReassignablePartitionResponse normalPartitionResponse = new ReassignablePartitionResponse().setPartitionIndex(0);
+            responseData1.setResponses(Collections.singletonList(
+                    new ReassignableTopicResponse()
+                            .setName("A")
+                            .setPartitions(Collections.singletonList(normalPartitionResponse))));
+            env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(responseData1));
+            AlterPartitionReassignmentsResult result1 = env.adminClient().alterPartitionReassignments(reassignments);
+            Future future1 = result1.all();
+            Future future2 = result1.values().get(tp1);
+            TestUtils.assertFutureError(future1, UnknownServerException.class);
+            TestUtils.assertFutureError(future2, UnknownServerException.class);
+
+            // 2. NOT_CONTROLLER error handling
+            AlterPartitionReassignmentsResponseData controllerErrResponseData =
+                    new AlterPartitionReassignmentsResponseData()
+                            .setErrorCode(Errors.NOT_CONTROLLER.code())
+                            .setErrorMessage(Errors.NOT_CONTROLLER.message())
+                            .setResponses(Arrays.asList(
+                                new ReassignableTopicResponse()
+                                        .setName("A")
+                                        .setPartitions(Collections.singletonList(normalPartitionResponse)),
+                                new ReassignableTopicResponse()
+                                        .setName("B")
+                                        .setPartitions(Collections.singletonList(normalPartitionResponse)))
+                            );
+            MetadataResponse controllerNodeResponse = MetadataResponse.prepareResponse(env.cluster().nodes(),
+                    env.cluster().clusterResource().clusterId(), 1, Collections.emptyList());
+            AlterPartitionReassignmentsResponseData normalResponse =
+                    new AlterPartitionReassignmentsResponseData()
+                            .setResponses(Arrays.asList(
+                                    new ReassignableTopicResponse()
+                                            .setName("A")
+                                            .setPartitions(Collections.singletonList(normalPartitionResponse)),
+                                    new ReassignableTopicResponse()
+                                            .setName("B")
+                                            .setPartitions(Collections.singletonList(normalPartitionResponse)))
+                            );
+            env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(controllerErrResponseData));
+            env.kafkaClient().prepareResponse(controllerNodeResponse);
+            env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(normalResponse));
+            AlterPartitionReassignmentsResult controllerErrResult = env.adminClient().alterPartitionReassignments(reassignments);
+            controllerErrResult.all().get();
+            controllerErrResult.values().get(tp1).get();
+            controllerErrResult.values().get(tp2).get();
+
+            // 3. partition-level error
+            AlterPartitionReassignmentsResponseData partitionLevelErrData =
+                    new AlterPartitionReassignmentsResponseData()
+                            .setResponses(Arrays.asList(
+                                    new ReassignableTopicResponse()
+                                            .setName("A")
+                                            .setPartitions(Collections.singletonList(new ReassignablePartitionResponse()
+                                                .setPartitionIndex(0).setErrorMessage(Errors.INVALID_REPLICA_ASSIGNMENT.message())
+                                                .setErrorCode(Errors.INVALID_REPLICA_ASSIGNMENT.code())
+                                            )),
+                                    new ReassignableTopicResponse()
+                                            .setName("B")
+                                            .setPartitions(Collections.singletonList(normalPartitionResponse)))
+                            );
+            env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(partitionLevelErrData));
+            AlterPartitionReassignmentsResult partitionLevelErrResult = env.adminClient().alterPartitionReassignments(reassignments);
+            TestUtils.assertFutureError(partitionLevelErrResult.values().get(tp1), Errors.INVALID_REPLICA_ASSIGNMENT.exception().getClass());
+            partitionLevelErrResult.values().get(tp2).get();
+
+            // 4. top-level error
+            AlterPartitionReassignmentsResponseData topLevelErrResponseData =
+                    new AlterPartitionReassignmentsResponseData()
+                            .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code())
+                            .setErrorMessage(Errors.CLUSTER_AUTHORIZATION_FAILED.message())
+                            .setResponses(Arrays.asList(
+                                    new ReassignableTopicResponse()
+                                            .setName("A")
+                                            .setPartitions(Collections.singletonList(normalPartitionResponse)),
+                                    new ReassignableTopicResponse()
+                                            .setName("B")
+                                            .setPartitions(Collections.singletonList(normalPartitionResponse)))
+                            );
+            env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(topLevelErrResponseData));
+            AlterPartitionReassignmentsResult topLevelErrResult = env.adminClient().alterPartitionReassignments(reassignments);
+            TestUtils.assertFutureError(topLevelErrResult.all(), Errors.CLUSTER_AUTHORIZATION_FAILED.exception().getClass());
+            TestUtils.assertFutureError(topLevelErrResult.values().get(tp1), Errors.CLUSTER_AUTHORIZATION_FAILED.exception().getClass());
+            TestUtils.assertFutureError(topLevelErrResult.values().get(tp2), Errors.CLUSTER_AUTHORIZATION_FAILED.exception().getClass());
+
+            // 5. unrepresentable topic name error
+            TopicPartition invalidTopicTP = new TopicPartition("", 0);
+            TopicPartition invalidPartitionTP = new TopicPartition("ABC", -1);
+            Map<TopicPartition, Optional<NewPartitionReassignment>> invalidTopicReassignments = new HashMap<>();
+            invalidTopicReassignments.put(invalidPartitionTP, NewPartitionReassignment.of(1, 2, 3));
+            invalidTopicReassignments.put(invalidTopicTP, NewPartitionReassignment.of(1, 2, 3));
+            invalidTopicReassignments.put(tp1, NewPartitionReassignment.of(1, 2, 3));
+
+            AlterPartitionReassignmentsResponseData singlePartResponseData =
+                    new AlterPartitionReassignmentsResponseData()
+                            .setResponses(Collections.singletonList(
+                                    new ReassignableTopicResponse()
+                                            .setName("A")
+                                            .setPartitions(Collections.singletonList(normalPartitionResponse)))
+                            );
+            env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(singlePartResponseData));
+            AlterPartitionReassignmentsResult unrepresentableTopicResult = env.adminClient().alterPartitionReassignments(invalidTopicReassignments);
+            TestUtils.assertFutureError(unrepresentableTopicResult.values().get(invalidTopicTP), InvalidTopicException.class);
+            TestUtils.assertFutureError(unrepresentableTopicResult.values().get(invalidPartitionTP), InvalidTopicException.class);
+            unrepresentableTopicResult.values().get(tp1).get();
+
+            // Test success scenario
+            AlterPartitionReassignmentsResponseData noErrResponseData =
+                    new AlterPartitionReassignmentsResponseData()
+                            .setErrorCode(Errors.NONE.code())
+                            .setErrorMessage(Errors.NONE.message())
+                            .setResponses(Arrays.asList(
+                                    new ReassignableTopicResponse()
+                                            .setName("A")
+                                            .setPartitions(Collections.singletonList(normalPartitionResponse)),
+                                    new ReassignableTopicResponse()
+                                            .setName("B")
+                                            .setPartitions(Collections.singletonList(normalPartitionResponse)))
+                            );
+            env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(noErrResponseData));
+            AlterPartitionReassignmentsResult noErrResult = env.adminClient().alterPartitionReassignments(reassignments);
+            noErrResult.all().get();
+            noErrResult.values().get(tp1).get();
+            noErrResult.values().get(tp2).get();
+        }
+    }
+
+    @Test
+    public void testListPartitionReassignments() throws Exception {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            TopicPartition tp1 = new TopicPartition("A", 0);
+            OngoingPartitionReassignment tp1PartitionReassignment = new OngoingPartitionReassignment()
+                    .setPartitionIndex(0)
+                    .setRemovingReplicas(Arrays.asList(1, 2, 3))
+                    .setAddingReplicas(Arrays.asList(4, 5, 6))
+                    .setReplicas(Arrays.asList(1, 2, 3, 4, 5, 6));
+            OngoingTopicReassignment tp1Reassignment = new OngoingTopicReassignment().setName("A")
+                    .setPartitions(Collections.singletonList(tp1PartitionReassignment));
+
+            TopicPartition tp2 = new TopicPartition("B", 0);
+            OngoingPartitionReassignment tp2PartitionReassignment = new OngoingPartitionReassignment()
+                    .setPartitionIndex(0)
+                    .setRemovingReplicas(Arrays.asList(1, 2, 3))
+                    .setAddingReplicas(Arrays.asList(4, 5, 6))
+                    .setReplicas(Arrays.asList(1, 2, 3, 4, 5, 6));
+            OngoingTopicReassignment tp2Reassignment = new OngoingTopicReassignment().setName("B")
+                    .setPartitions(Collections.singletonList(tp2PartitionReassignment));
+
+            // 1. NOT_CONTROLLER error handling
+            ListPartitionReassignmentsResponseData notControllerData = new ListPartitionReassignmentsResponseData()
+                    .setErrorCode(Errors.NOT_CONTROLLER.code())
+                    .setErrorMessage(Errors.NOT_CONTROLLER.message());
+            MetadataResponse controllerNodeResponse = MetadataResponse.prepareResponse(env.cluster().nodes(),
+                    env.cluster().clusterResource().clusterId(), 1, Collections.emptyList());
+            ListPartitionReassignmentsResponseData reassignmentsData = new ListPartitionReassignmentsResponseData()
+                    .setTopics(Arrays.asList(tp1Reassignment, tp2Reassignment));
+            env.kafkaClient().prepareResponse(new ListPartitionReassignmentsResponse(notControllerData));
+            env.kafkaClient().prepareResponse(controllerNodeResponse);
+            env.kafkaClient().prepareResponse(new ListPartitionReassignmentsResponse(reassignmentsData));
+
+            ListPartitionReassignmentsResult noControllerResult = env.adminClient().listPartitionReassignments();
+            noControllerResult.reassignments().get(); // no error
+
+            // 2. UNKNOWN_TOPIC_OR_EXCEPTION_ERROR
+            ListPartitionReassignmentsResponseData unknownTpData = new ListPartitionReassignmentsResponseData()
+                    .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+                    .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message());
+            env.kafkaClient().prepareResponse(new ListPartitionReassignmentsResponse(unknownTpData));
+
+            ListPartitionReassignmentsResult unknownTpResult = env.adminClient().listPartitionReassignments(new HashSet<>(Arrays.asList(tp1, tp2)));
+            TestUtils.assertFutureError(unknownTpResult.reassignments(), UnknownTopicOrPartitionException.class);
+
+            // 3. Success
+            ListPartitionReassignmentsResponseData responseData = new ListPartitionReassignmentsResponseData()
+                    .setTopics(Arrays.asList(tp1Reassignment, tp2Reassignment));
+            env.kafkaClient().prepareResponse(new ListPartitionReassignmentsResponse(responseData));
+            ListPartitionReassignmentsResult responseResult = env.adminClient().listPartitionReassignments();
+
+            Map<TopicPartition, PartitionReassignment> reassignments = responseResult.reassignments().get();
+
+            PartitionReassignment tp1Result = reassignments.get(tp1);
+            assertEquals(tp1PartitionReassignment.addingReplicas(), tp1Result.addingReplicas());
+            assertEquals(tp1PartitionReassignment.removingReplicas(), tp1Result.removingReplicas());
+            assertEquals(tp1PartitionReassignment.replicas(), tp1Result.replicas());
+            assertEquals(tp1PartitionReassignment.replicas(), tp1Result.replicas());
+            PartitionReassignment tp2Result = reassignments.get(tp2);
+            assertEquals(tp2PartitionReassignment.addingReplicas(), tp2Result.addingReplicas());
+            assertEquals(tp2PartitionReassignment.removingReplicas(), tp2Result.removingReplicas());
+            assertEquals(tp2PartitionReassignment.replicas(), tp2Result.replicas());
+            assertEquals(tp2PartitionReassignment.replicas(), tp2Result.replicas());
+        }
+    }
+
     private static MemberDescription convertToMemberDescriptions(DescribedGroupMember member,
                                                                  MemberAssignment assignment) {
         return new MemberDescription(member.memberId(),
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 7ca9ce4..baaf661 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -40,6 +40,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 public class MockAdminClient extends AdminClient {
@@ -421,6 +422,17 @@ public class MockAdminClient extends AdminClient {
     }
 
     @Override
+    public AlterPartitionReassignmentsResult alterPartitionReassignments(Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments,
+                                                                         AlterPartitionReassignmentsOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public ListPartitionReassignmentsResult listPartitionReassignments(Optional<Set<TopicPartition>> partitions, ListPartitionReassignmentsOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
     public void close(Duration timeout) {}
 
 
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index a979d4c..8645d79 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -1702,8 +1702,8 @@ public class RequestResponseTest {
 
     private ListPartitionReassignmentsResponse createListPartitionReassignmentsResponse() {
         ListPartitionReassignmentsResponseData data = new ListPartitionReassignmentsResponseData();
-        data.topics().add(
-                new ListPartitionReassignmentsResponseData.OngoingTopicReassignment()
+        data.setTopics(Collections.singletonList(
+            new ListPartitionReassignmentsResponseData.OngoingTopicReassignment()
                         .setName("topic")
                         .setPartitions(Collections.singletonList(
                                 new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment()
@@ -1713,7 +1713,7 @@ public class RequestResponseTest {
                                         .setRemovingReplicas(Collections.singletonList(1))
                                 )
                         )
-        );
+        ));
         return new ListPartitionReassignmentsResponse(data);
     }
 }