You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2019/08/14 16:25:45 UTC
[kafka] branch trunk updated: KAFKA-8345: KIP-455: Admin API
changes (Part 2) (#7120)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5129ab5 KAFKA-8345: KIP-455: Admin API changes (Part 2) (#7120)
5129ab5 is described below
commit 5129ab53ee9a2e46a22a13b1d5300ee139c4999f
Author: Stanislav Kozlovski <st...@outlook.com>
AuthorDate: Wed Aug 14 17:25:17 2019 +0100
KAFKA-8345: KIP-455: Admin API changes (Part 2) (#7120)
Add the AlterPartitionReassignments and ListPartitionReassignments APIs. Also remove an unused methodlength suppression for KafkaAdminClient.
Reviewers: Colin P. McCabe <cm...@apache.org>, Viktor Somogyi <vi...@gmail.com>
---
checkstyle/suppressions.xml | 2 +-
.../java/org/apache/kafka/clients/admin/Admin.java | 115 ++++++++++
.../admin/AlterPartitionReassignmentsOptions.java | 31 +++
.../admin/AlterPartitionReassignmentsResult.java | 59 +++++
.../kafka/clients/admin/KafkaAdminClient.java | 250 +++++++++++++++++++++
.../admin/ListPartitionReassignmentsOptions.java | 29 +++
.../admin/ListPartitionReassignmentsResult.java | 43 ++++
.../clients/admin/NewPartitionReassignment.java | 44 ++++
.../kafka/clients/admin/PartitionReassignment.java | 60 +++++
.../AlterPartitionReassignmentsResponse.java | 2 +-
.../ListPartitionReassignmentsRequest.java | 21 +-
.../ListPartitionReassignmentsResponse.java | 2 +-
.../message/ListPartitionReassignmentsRequest.json | 2 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 215 ++++++++++++++++++
.../kafka/clients/admin/MockAdminClient.java | 12 +
.../kafka/common/requests/RequestResponseTest.java | 6 +-
16 files changed, 878 insertions(+), 15 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 0bb2193..475b9b0 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -52,7 +52,7 @@
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|SchemaGenerator|AbstractCoordinator).java"/>
<suppress checks="JavaNCSS"
- files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java|TransactionManagerTest.java|SenderTest.java"/>
+ files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java|TransactionManagerTest.java|SenderTest.java|KafkaAdminClient.java"/>
<suppress checks="NPathComplexity"
files="(BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer).java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index c1be7a7..8fdd21b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -21,6 +21,7 @@ import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -930,6 +931,120 @@ public interface Admin extends AutoCloseable {
Set<TopicPartition> partitions,
ElectLeadersOptions options);
+
+ /**
+ * Change the reassignments for one or more partitions.
+ * Providing an empty Optional (e.g via {@link Optional#empty()}) will <bold>revert</bold> the reassignment for the associated partition.
+ *
+ * This is a convenience method for {@link #alterPartitionReassignments(Map, AlterPartitionReassignmentsOptions)}
+ * with default options. See the overload for more details.
+ */
+ default AlterPartitionReassignmentsResult alterPartitionReassignments(
+ Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments) {
+ return alterPartitionReassignments(reassignments, new AlterPartitionReassignmentsOptions());
+ }
+
+ /**
+ * Change the reassignments for one or more partitions.
+ * Providing an empty Optional (e.g via {@link Optional#empty()}) will <bold>revert</bold> the reassignment for the associated partition.
+ *
+ * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
+ * the returned {@code AlterPartitionReassignmentsResult}:</p>
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+ * If the authenticated user didn't have alter access to the cluster.</li>
+ * <li>{@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException}
+ * If the topic or partition does not exist within the cluster.</li>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * if the request timed out before the controller could record the new assignments.</li>
+ * <li>{@link org.apache.kafka.common.errors.InvalidReplicaAssignmentException}
+ * If the specified assignment was not valid.</li>
+ * <li>{@link org.apache.kafka.common.errors.NoReassignmentInProgressException}
+ * If there was an attempt to cancel a reassignment for a partition which was not being reassigned.</li>
+ * </ul>
+ *
+ * @param reassignments The reassignments to add, modify, or remove.
+ * @param options The options to use.
+ * @return The result.
+ */
+ AlterPartitionReassignmentsResult alterPartitionReassignments(
+ Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments,
+ AlterPartitionReassignmentsOptions options);
+
+
+ /**
+ * List all of the current partition reassignments
+ *
+ * This is a convenience method for {@link #listPartitionReassignments(ListPartitionReassignmentsOptions)}
+ * with default options. See the overload for more details.
+ */
+ default ListPartitionReassignmentsResult listPartitionReassignments() {
+ return listPartitionReassignments(new ListPartitionReassignmentsOptions());
+ }
+
+ /**
+ * List the current reassignments for the given partitions
+ *
+ * This is a convenience method for {@link #listPartitionReassignments(Set, ListPartitionReassignmentsOptions)}
+ * with default options. See the overload for more details.
+ */
+ default ListPartitionReassignmentsResult listPartitionReassignments(Set<TopicPartition> partitions) {
+ return listPartitionReassignments(partitions, new ListPartitionReassignmentsOptions());
+ }
+
+ /**
+ * List the current reassignments for the given partitions
+ *
+ * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
+ * the returned {@code ListPartitionReassignmentsResult}:</p>
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+ * If the authenticated user doesn't have alter access to the cluster.</li>
+ * <li>{@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException}
+ * If a given topic or partition does not exist.</li>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * If the request timed out before the controller could list the current reassignments.</li>
+ * </ul>
+ *
+ * @param partitions The topic partitions to list reassignments for.
+ * @param options The options to use.
+ * @return The result.
+ */
+ default ListPartitionReassignmentsResult listPartitionReassignments(
+ Set<TopicPartition> partitions,
+ ListPartitionReassignmentsOptions options) {
+ return listPartitionReassignments(Optional.of(partitions), options);
+ }
+
+ /**
+ * List all of the current partition reassignments
+ *
+ * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
+ * the returned {@code ListPartitionReassignmentsResult}:</p>
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+ * If the authenticated user doesn't have alter access to the cluster.</li>
+ * <li>{@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException}
+ * If a given topic or partition does not exist.</li>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * If the request timed out before the controller could list the current reassignments.</li>
+ * </ul>
+ *
+ * @param options The options to use.
+ * @return The result.
+ */
+ default ListPartitionReassignmentsResult listPartitionReassignments(ListPartitionReassignmentsOptions options) {
+ return listPartitionReassignments(Optional.empty(), options);
+ }
+
+ /**
+ * @param partitions the partitions we want to get reassignment for, or an empty optional if we want to get the reassignments for all partitions in the cluster
+ * @param options The options to use.
+ * @return The result.
+ */
+ ListPartitionReassignmentsResult listPartitionReassignments(Optional<Set<TopicPartition>> partitions,
+ ListPartitionReassignmentsOptions options);
+
/**
* Get the metrics kept by the adminClient
*/
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsOptions.java
new file mode 100644
index 0000000..bee9c70
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsOptions.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * Options for {@link AdminClient#alterPartitionReassignments(Map, AlterPartitionReassignmentsOptions)}
+ *
+ * The API of this class is evolving. See {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class AlterPartitionReassignmentsOptions extends AbstractOptions<AlterPartitionReassignmentsOptions> {
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsResult.java
new file mode 100644
index 0000000..2009ab5
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsResult.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * The result of {@link AdminClient#alterPartitionReassignments(Map, AlterPartitionReassignmentsOptions)}.
+ *
+ * The API of this class is evolving. See {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class AlterPartitionReassignmentsResult {
+ private final Map<TopicPartition, KafkaFuture<Void>> futures;
+
+ AlterPartitionReassignmentsResult(Map<TopicPartition, KafkaFuture<Void>> futures) {
+ this.futures = futures;
+ }
+
+ /**
+ * Return a map from partitions to futures which can be used to check the status of the reassignment.
+ *
+ * Possible error codes:
+ *
+ * INVALID_REPLICA_ASSIGNMENT (39) - if the specified replica assignment was not valid -- for example, if it included negative numbers, repeated numbers, or specified a broker ID that the controller was not aware of.
+ * NO_REASSIGNMENT_IN_PROGRESS (85) - if the request wants to cancel reassignments but none exist
+ * UNKNOWN (-1)
+ *
+ */
+ public Map<TopicPartition, KafkaFuture<Void>> values() {
+ return futures;
+ }
+
+ /**
+ * Return a future which succeeds only if all the reassignments were successfully initiated.
+ */
+ public KafkaFuture<Void> all() {
+ return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 7bcad41..936dc65 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -61,6 +61,8 @@ import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
@@ -81,6 +83,7 @@ import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.Altera
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
import org.apache.kafka.common.metrics.JmxReporter;
@@ -95,6 +98,8 @@ import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AlterConfigsRequest;
import org.apache.kafka.common.requests.AlterConfigsResponse;
+import org.apache.kafka.common.requests.AlterPartitionReassignmentsRequest;
+import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
import org.apache.kafka.common.requests.ApiError;
@@ -140,6 +145,8 @@ import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListGroupsResponse;
+import org.apache.kafka.common.requests.ListPartitionReassignmentsRequest;
+import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
@@ -170,7 +177,9 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
+import java.util.TreeMap;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -179,6 +188,12 @@ import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import static org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignablePartition;
+import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
+import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
+import static org.apache.kafka.common.message.ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics;
+import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
+import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
import static org.apache.kafka.common.requests.MetadataRequest.convertToMetadataRequestTopic;
import static org.apache.kafka.common.utils.Utils.closeQuietly;
@@ -3082,6 +3097,241 @@ public class KafkaAdminClient extends AdminClient {
return new ElectLeadersResult(electionFuture);
}
+ @Override
+ public AlterPartitionReassignmentsResult alterPartitionReassignments(
+ Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments,
+ AlterPartitionReassignmentsOptions options) {
+ final Map<TopicPartition, KafkaFutureImpl<Void>> futures = new HashMap<>();
+ final Map<String, Map<Integer, Optional<NewPartitionReassignment>>> topicsToReassignments = new TreeMap<>();
+ for (Map.Entry<TopicPartition, Optional<NewPartitionReassignment>> entry : reassignments.entrySet()) {
+ String topic = entry.getKey().topic();
+ int partition = entry.getKey().partition();
+ TopicPartition topicPartition = new TopicPartition(topic, partition);
+ Optional<NewPartitionReassignment> reassignment = entry.getValue();
+ KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+ futures.put(topicPartition, future);
+
+ if (topicNameIsUnrepresentable(topic)) {
+ future.completeExceptionally(new InvalidTopicException("The given topic name '" +
+ topic + "' cannot be represented in a request."));
+ } else if (topicPartition.partition() < 0) {
+ future.completeExceptionally(new InvalidTopicException("The given partition index " +
+ topicPartition.partition() + " is not valid."));
+ } else {
+ Map<Integer, Optional<NewPartitionReassignment>> partitionReassignments =
+ topicsToReassignments.get(topicPartition.topic());
+ if (partitionReassignments == null) {
+ partitionReassignments = new TreeMap<>();
+ topicsToReassignments.put(topic, partitionReassignments);
+ }
+
+ partitionReassignments.put(partition, reassignment);
+ }
+ }
+
+ final long now = time.milliseconds();
+ Call call = new Call("alterPartitionReassignments", calcDeadlineMs(now, options.timeoutMs()),
+ new ControllerNodeProvider()) {
+
+ @Override
+ public AbstractRequest.Builder createRequest(int timeoutMs) {
+ AlterPartitionReassignmentsRequestData data =
+ new AlterPartitionReassignmentsRequestData();
+ for (Map.Entry<String, Map<Integer, Optional<NewPartitionReassignment>>> entry :
+ topicsToReassignments.entrySet()) {
+ String topicName = entry.getKey();
+ Map<Integer, Optional<NewPartitionReassignment>> partitionsToReassignments = entry.getValue();
+
+ List<ReassignablePartition> reassignablePartitions = new ArrayList<>();
+ for (Map.Entry<Integer, Optional<NewPartitionReassignment>> partitionEntry :
+ partitionsToReassignments.entrySet()) {
+ int partitionIndex = partitionEntry.getKey();
+ Optional<NewPartitionReassignment> reassignment = partitionEntry.getValue();
+
+ ReassignablePartition reassignablePartition = new ReassignablePartition()
+ .setPartitionIndex(partitionIndex)
+ .setReplicas(reassignment.map(NewPartitionReassignment::targetBrokers).orElse(null));
+ reassignablePartitions.add(reassignablePartition);
+ }
+
+ ReassignableTopic reassignableTopic = new ReassignableTopic()
+ .setName(topicName)
+ .setPartitions(reassignablePartitions);
+ data.topics().add(reassignableTopic);
+ }
+ data.setTimeoutMs(timeoutMs);
+ return new AlterPartitionReassignmentsRequest.Builder(data);
+ }
+
+ @Override
+ public void handleResponse(AbstractResponse abstractResponse) {
+ AlterPartitionReassignmentsResponse response = (AlterPartitionReassignmentsResponse) abstractResponse;
+ Map<TopicPartition, ApiException> errors = new HashMap<>();
+ int receivedResponsesCount = 0;
+
+ Errors topLevelError = Errors.forCode(response.data().errorCode());
+ switch (topLevelError) {
+ case NONE:
+ receivedResponsesCount += validateTopicResponses(response.data().responses(), errors);
+ break;
+ case NOT_CONTROLLER:
+ handleNotControllerError(topLevelError);
+ break;
+ default:
+ for (ReassignableTopicResponse topicResponse : response.data().responses()) {
+ String topicName = topicResponse.name();
+ for (ReassignablePartitionResponse partition : topicResponse.partitions()) {
+ errors.put(
+ new TopicPartition(topicName, partition.partitionIndex()),
+ new ApiError(topLevelError, topLevelError.message()).exception()
+ );
+ receivedResponsesCount += 1;
+ }
+ }
+ break;
+ }
+
+ assertResponseCountMatch(errors, receivedResponsesCount);
+ for (Map.Entry<TopicPartition, ApiException> entry : errors.entrySet()) {
+ ApiException exception = entry.getValue();
+ if (exception == null)
+ futures.get(entry.getKey()).complete(null);
+ else
+ futures.get(entry.getKey()).completeExceptionally(exception);
+ }
+ }
+
+ private void assertResponseCountMatch(Map<TopicPartition, ApiException> errors, int receivedResponsesCount) {
+ int expectedResponsesCount = topicsToReassignments.values().stream().mapToInt(Map::size).sum();
+ if (errors.values().stream().noneMatch(Objects::nonNull) && receivedResponsesCount != expectedResponsesCount) {
+ String quantifier = receivedResponsesCount > expectedResponsesCount ? "many" : "less";
+ throw new UnknownServerException("The server returned too " + quantifier + " results." +
+ "Expected " + expectedResponsesCount + " but received " + receivedResponsesCount);
+ }
+ }
+
+ private int validateTopicResponses(List<ReassignableTopicResponse> topicResponses,
+ Map<TopicPartition, ApiException> errors) {
+ int receivedResponsesCount = 0;
+
+ for (ReassignableTopicResponse topicResponse : topicResponses) {
+ String topicName = topicResponse.name();
+ for (ReassignablePartitionResponse partResponse : topicResponse.partitions()) {
+ Errors partitionError = Errors.forCode(partResponse.errorCode());
+
+ TopicPartition tp = new TopicPartition(topicName, partResponse.partitionIndex());
+ if (partitionError == Errors.NONE) {
+ errors.put(tp, null);
+ } else {
+ errors.put(tp, new ApiError(partitionError, partResponse.errorMessage()).exception());
+ }
+ receivedResponsesCount += 1;
+ }
+ }
+
+ return receivedResponsesCount;
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ for (KafkaFutureImpl<Void> future : futures.values()) {
+ future.completeExceptionally(throwable);
+ }
+ }
+ };
+ if (!topicsToReassignments.isEmpty()) {
+ runnable.call(call, now);
+ }
+ return new AlterPartitionReassignmentsResult(new HashMap<>(futures));
+ }
+
+ @Override
+ public ListPartitionReassignmentsResult listPartitionReassignments(Optional<Set<TopicPartition>> partitions,
+ ListPartitionReassignmentsOptions options) {
+ final KafkaFutureImpl<Map<TopicPartition, PartitionReassignment>> partitionReassignmentsFuture = new KafkaFutureImpl<>();
+ if (partitions.isPresent()) {
+ for (TopicPartition tp : partitions.get()) {
+ String topic = tp.topic();
+ int partition = tp.partition();
+ if (topicNameIsUnrepresentable(topic)) {
+ partitionReassignmentsFuture.completeExceptionally(new InvalidTopicException("The given topic name '"
+ + topic + "' cannot be represented in a request."));
+ } else if (partition < 0) {
+ partitionReassignmentsFuture.completeExceptionally(new InvalidTopicException("The given partition index " +
+ partition + " is not valid."));
+ }
+ if (partitionReassignmentsFuture.isCompletedExceptionally())
+ return new ListPartitionReassignmentsResult(partitionReassignmentsFuture);
+ }
+ }
+ final long now = time.milliseconds();
+ runnable.call(new Call("listPartitionReassignments", calcDeadlineMs(now, options.timeoutMs()),
+ new ControllerNodeProvider()) {
+
+ @Override
+ AbstractRequest.Builder createRequest(int timeoutMs) {
+ ListPartitionReassignmentsRequestData listData = new ListPartitionReassignmentsRequestData();
+ listData.setTimeoutMs(timeoutMs);
+
+ if (partitions.isPresent()) {
+ Map<String, ListPartitionReassignmentsTopics> reassignmentTopicByTopicName = new HashMap<>();
+
+ for (TopicPartition tp : partitions.get()) {
+ if (!reassignmentTopicByTopicName.containsKey(tp.topic()))
+ reassignmentTopicByTopicName.put(tp.topic(), new ListPartitionReassignmentsTopics().setName(tp.topic()));
+
+ reassignmentTopicByTopicName.get(tp.topic()).partitionIndexes().add(tp.partition());
+ }
+
+ listData.setTopics(new ArrayList<>(reassignmentTopicByTopicName.values()));
+ }
+ return new ListPartitionReassignmentsRequest.Builder(listData);
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ ListPartitionReassignmentsResponse response = (ListPartitionReassignmentsResponse) abstractResponse;
+ Errors error = Errors.forCode(response.data().errorCode());
+ switch (error) {
+ case NONE:
+ break;
+ case NOT_CONTROLLER:
+ handleNotControllerError(error);
+ break;
+ default:
+ partitionReassignmentsFuture.completeExceptionally(new ApiError(error, response.data().errorMessage()).exception());
+ break;
+ }
+ Map<TopicPartition, PartitionReassignment> reassignmentMap = new HashMap<>();
+
+ for (OngoingTopicReassignment topicReassignment : response.data().topics()) {
+ String topicName = topicReassignment.name();
+ for (OngoingPartitionReassignment partitionReassignment : topicReassignment.partitions()) {
+ reassignmentMap.put(
+ new TopicPartition(topicName, partitionReassignment.partitionIndex()),
+ new PartitionReassignment(partitionReassignment.replicas(), partitionReassignment.addingReplicas(), partitionReassignment.removingReplicas())
+ );
+ }
+ }
+
+ partitionReassignmentsFuture.complete(reassignmentMap);
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ partitionReassignmentsFuture.completeExceptionally(throwable);
+ }
+ }, now);
+
+ return new ListPartitionReassignmentsResult(partitionReassignmentsFuture);
+ }
+
+ private void handleNotControllerError(Errors error) throws ApiException {
+ metadataManager.clearController();
+ metadataManager.requestUpdate();
+ throw error.exception();
+ }
+
/**
* Returns a boolean indicating whether the resource needs to go to a specific node
*/
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListPartitionReassignmentsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListPartitionReassignmentsOptions.java
new file mode 100644
index 0000000..7dcc7a6
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListPartitionReassignmentsOptions.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link AdminClient#listPartitionReassignments(ListPartitionReassignmentsOptions)}
+ *
+ * The API of this class is evolving. See {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class ListPartitionReassignmentsOptions extends AbstractOptions<ListPartitionReassignmentsOptions> {
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListPartitionReassignmentsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListPartitionReassignmentsResult.java
new file mode 100644
index 0000000..3d7b14c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListPartitionReassignmentsResult.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Map;
+
+/**
+ * The result of {@link AdminClient#listPartitionReassignments(ListPartitionReassignmentsOptions)}.
+ *
+ * The API of this class is evolving. See {@link AdminClient} for details.
+ */
+public class ListPartitionReassignmentsResult {
+ private final KafkaFuture<Map<TopicPartition, PartitionReassignment>> future;
+
+ public ListPartitionReassignmentsResult(KafkaFuture<Map<TopicPartition, PartitionReassignment>> reassignments) {
+ this.future = reassignments;
+ }
+
+ /**
+ * Return a future which yields a map containing each partition's reassignments
+ */
+ public KafkaFuture<Map<TopicPartition, PartitionReassignment>> reassignments() {
+ return future;
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
new file mode 100644
index 0000000..8856470
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * A new partition reassignment, which can be applied via {@link AdminClient#alterPartitionReassignments(Map, AlterPartitionReassignmentsOptions)}.
+ */
+public class NewPartitionReassignment {
+ private final List<Integer> targetBrokers;
+
+ public static Optional<NewPartitionReassignment> of(Integer... brokers) {
+ return Optional.of(new NewPartitionReassignment(Arrays.asList(brokers)));
+ }
+
+ public NewPartitionReassignment(List<Integer> targetBrokers) {
+ this.targetBrokers = Collections.unmodifiableList(new ArrayList<>(targetBrokers));
+ }
+
+ public List<Integer> targetBrokers() {
+ return targetBrokers;
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/PartitionReassignment.java b/clients/src/main/java/org/apache/kafka/clients/admin/PartitionReassignment.java
new file mode 100644
index 0000000..cc3306e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/PartitionReassignment.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A partition reassignment, which has been listed via {@link AdminClient#listPartitionReassignments()}.
+ */
+public class PartitionReassignment {
+
+ private final List<Integer> replicas;
+ private final List<Integer> addingReplicas;
+ private final List<Integer> removingReplicas;
+
+ public PartitionReassignment(List<Integer> replicas, List<Integer> addingReplicas, List<Integer> removingReplicas) {
+ this.replicas = Collections.unmodifiableList(replicas);
+ this.addingReplicas = Collections.unmodifiableList(addingReplicas);
+ this.removingReplicas = Collections.unmodifiableList(removingReplicas);
+ }
+
+ /**
+ * The brokers which this partition currently resides on.
+ */
+ public List<Integer> replicas() {
+ return replicas;
+ }
+
+ /**
+ * The brokers that we are adding this partition to as part of a reassignment.
+ * A subset of replicas.
+ */
+ public List<Integer> addingReplicas() {
+ return addingReplicas;
+ }
+
+ /**
+ * The brokers that we are removing this partition from as part of a reassignment.
+ * A subset of replicas.
+ */
+ public List<Integer> removingReplicas() {
+ return removingReplicas;
+ }
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java
index db1cfab..ef235cd 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java
@@ -36,7 +36,7 @@ public class AlterPartitionReassignmentsResponse extends AbstractResponse {
this(struct, ApiKeys.ALTER_PARTITION_REASSIGNMENTS.latestVersion());
}
- AlterPartitionReassignmentsResponse(AlterPartitionReassignmentsResponseData data) {
+ public AlterPartitionReassignmentsResponse(AlterPartitionReassignmentsResponseData data) {
this.data = data;
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsRequest.java
index 471147b..0aa5e55 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsRequest.java
@@ -24,9 +24,11 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
+import static org.apache.kafka.common.message.ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics;
public class ListPartitionReassignmentsRequest extends AbstractRequest {
@@ -86,14 +88,17 @@ public class ListPartitionReassignmentsRequest extends AbstractRequest {
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
ApiError apiError = ApiError.fromThrowable(e);
- List<OngoingTopicReassignment> ongoingTopicReassignments = data.topics().stream().map(topic ->
- new OngoingTopicReassignment()
- .setName(topic.name())
- .setPartitions(topic.partitionIndexes().stream().map(partitionIndex ->
- new OngoingPartitionReassignment().setPartitionIndex(partitionIndex)).collect(Collectors.toList())
- )
- ).collect(Collectors.toList());
-
+ List<OngoingTopicReassignment> ongoingTopicReassignments = new ArrayList<>();
+ if (data.topics() != null) {
+ for (ListPartitionReassignmentsTopics topic : data.topics()) {
+ ongoingTopicReassignments.add(
+ new OngoingTopicReassignment()
+ .setName(topic.name())
+ .setPartitions(topic.partitionIndexes().stream().map(partitionIndex ->
+ new OngoingPartitionReassignment().setPartitionIndex(partitionIndex)).collect(Collectors.toList()))
+ );
+ }
+ }
ListPartitionReassignmentsResponseData responseData = new ListPartitionReassignmentsResponseData()
.setTopics(ongoingTopicReassignments)
.setErrorCode(apiError.error().code())
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsResponse.java
index 9513e88..0d53e4d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsResponse.java
@@ -33,7 +33,7 @@ public class ListPartitionReassignmentsResponse extends AbstractResponse {
this(struct, ApiKeys.LIST_PARTITION_REASSIGNMENTS.latestVersion());
}
- ListPartitionReassignmentsResponse(ListPartitionReassignmentsResponseData responseData) {
+ public ListPartitionReassignmentsResponse(ListPartitionReassignmentsResponseData responseData) {
this.data = responseData;
}
diff --git a/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json b/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json
index d0ebf8b..ac871b2 100644
--- a/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json
+++ b/clients/src/main/resources/common/message/ListPartitionReassignmentsRequest.json
@@ -21,7 +21,7 @@
"fields": [
{ "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
"about": "The time in ms to wait for the request to complete." },
- { "name": "Topics", "type": "[]ListPartitionReassignmentsTopics", "versions": "0+", "nullableVersions": "0+",
+ { "name": "Topics", "type": "[]ListPartitionReassignmentsTopics", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The topics to list partition reassignments for, or null to list everything.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name" },
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 7ec7c24..ff9e272 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -52,6 +52,7 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
@@ -66,7 +67,9 @@ import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionR
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
import org.apache.kafka.common.requests.CreateAclsResponse;
@@ -87,6 +90,7 @@ import org.apache.kafka.common.requests.ElectLeadersResponse;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
import org.apache.kafka.common.requests.ListGroupsResponse;
+import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
@@ -119,11 +123,16 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
+import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
+import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
+import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
+import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -1561,6 +1570,212 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void testAlterPartitionReassignments() throws Exception {
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ TopicPartition tp1 = new TopicPartition("A", 0);
+ TopicPartition tp2 = new TopicPartition("B", 0);
+ Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments = new HashMap<>();
+ reassignments.put(tp1, Optional.empty());
+ reassignments.put(tp2, NewPartitionReassignment.of(1, 2, 3));
+
+ // 1. server returns less responses than number of partitions we sent
+ AlterPartitionReassignmentsResponseData responseData1 = new AlterPartitionReassignmentsResponseData();
+ ReassignablePartitionResponse normalPartitionResponse = new ReassignablePartitionResponse().setPartitionIndex(0);
+ responseData1.setResponses(Collections.singletonList(
+ new ReassignableTopicResponse()
+ .setName("A")
+ .setPartitions(Collections.singletonList(normalPartitionResponse))));
+ env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(responseData1));
+ AlterPartitionReassignmentsResult result1 = env.adminClient().alterPartitionReassignments(reassignments);
+ Future future1 = result1.all();
+ Future future2 = result1.values().get(tp1);
+ TestUtils.assertFutureError(future1, UnknownServerException.class);
+ TestUtils.assertFutureError(future2, UnknownServerException.class);
+
+ // 2. NOT_CONTROLLER error handling
+ AlterPartitionReassignmentsResponseData controllerErrResponseData =
+ new AlterPartitionReassignmentsResponseData()
+ .setErrorCode(Errors.NOT_CONTROLLER.code())
+ .setErrorMessage(Errors.NOT_CONTROLLER.message())
+ .setResponses(Arrays.asList(
+ new ReassignableTopicResponse()
+ .setName("A")
+ .setPartitions(Collections.singletonList(normalPartitionResponse)),
+ new ReassignableTopicResponse()
+ .setName("B")
+ .setPartitions(Collections.singletonList(normalPartitionResponse)))
+ );
+ MetadataResponse controllerNodeResponse = MetadataResponse.prepareResponse(env.cluster().nodes(),
+ env.cluster().clusterResource().clusterId(), 1, Collections.emptyList());
+ AlterPartitionReassignmentsResponseData normalResponse =
+ new AlterPartitionReassignmentsResponseData()
+ .setResponses(Arrays.asList(
+ new ReassignableTopicResponse()
+ .setName("A")
+ .setPartitions(Collections.singletonList(normalPartitionResponse)),
+ new ReassignableTopicResponse()
+ .setName("B")
+ .setPartitions(Collections.singletonList(normalPartitionResponse)))
+ );
+ env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(controllerErrResponseData));
+ env.kafkaClient().prepareResponse(controllerNodeResponse);
+ env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(normalResponse));
+ AlterPartitionReassignmentsResult controllerErrResult = env.adminClient().alterPartitionReassignments(reassignments);
+ controllerErrResult.all().get();
+ controllerErrResult.values().get(tp1).get();
+ controllerErrResult.values().get(tp2).get();
+
+ // 3. partition-level error
+ AlterPartitionReassignmentsResponseData partitionLevelErrData =
+ new AlterPartitionReassignmentsResponseData()
+ .setResponses(Arrays.asList(
+ new ReassignableTopicResponse()
+ .setName("A")
+ .setPartitions(Collections.singletonList(new ReassignablePartitionResponse()
+ .setPartitionIndex(0).setErrorMessage(Errors.INVALID_REPLICA_ASSIGNMENT.message())
+ .setErrorCode(Errors.INVALID_REPLICA_ASSIGNMENT.code())
+ )),
+ new ReassignableTopicResponse()
+ .setName("B")
+ .setPartitions(Collections.singletonList(normalPartitionResponse)))
+ );
+ env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(partitionLevelErrData));
+ AlterPartitionReassignmentsResult partitionLevelErrResult = env.adminClient().alterPartitionReassignments(reassignments);
+ TestUtils.assertFutureError(partitionLevelErrResult.values().get(tp1), Errors.INVALID_REPLICA_ASSIGNMENT.exception().getClass());
+ partitionLevelErrResult.values().get(tp2).get();
+
+ // 4. top-level error
+ AlterPartitionReassignmentsResponseData topLevelErrResponseData =
+ new AlterPartitionReassignmentsResponseData()
+ .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code())
+ .setErrorMessage(Errors.CLUSTER_AUTHORIZATION_FAILED.message())
+ .setResponses(Arrays.asList(
+ new ReassignableTopicResponse()
+ .setName("A")
+ .setPartitions(Collections.singletonList(normalPartitionResponse)),
+ new ReassignableTopicResponse()
+ .setName("B")
+ .setPartitions(Collections.singletonList(normalPartitionResponse)))
+ );
+ env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(topLevelErrResponseData));
+ AlterPartitionReassignmentsResult topLevelErrResult = env.adminClient().alterPartitionReassignments(reassignments);
+ TestUtils.assertFutureError(topLevelErrResult.all(), Errors.CLUSTER_AUTHORIZATION_FAILED.exception().getClass());
+ TestUtils.assertFutureError(topLevelErrResult.values().get(tp1), Errors.CLUSTER_AUTHORIZATION_FAILED.exception().getClass());
+ TestUtils.assertFutureError(topLevelErrResult.values().get(tp2), Errors.CLUSTER_AUTHORIZATION_FAILED.exception().getClass());
+
+ // 5. unrepresentable topic name error
+ TopicPartition invalidTopicTP = new TopicPartition("", 0);
+ TopicPartition invalidPartitionTP = new TopicPartition("ABC", -1);
+ Map<TopicPartition, Optional<NewPartitionReassignment>> invalidTopicReassignments = new HashMap<>();
+ invalidTopicReassignments.put(invalidPartitionTP, NewPartitionReassignment.of(1, 2, 3));
+ invalidTopicReassignments.put(invalidTopicTP, NewPartitionReassignment.of(1, 2, 3));
+ invalidTopicReassignments.put(tp1, NewPartitionReassignment.of(1, 2, 3));
+
+ AlterPartitionReassignmentsResponseData singlePartResponseData =
+ new AlterPartitionReassignmentsResponseData()
+ .setResponses(Collections.singletonList(
+ new ReassignableTopicResponse()
+ .setName("A")
+ .setPartitions(Collections.singletonList(normalPartitionResponse)))
+ );
+ env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(singlePartResponseData));
+ AlterPartitionReassignmentsResult unrepresentableTopicResult = env.adminClient().alterPartitionReassignments(invalidTopicReassignments);
+ TestUtils.assertFutureError(unrepresentableTopicResult.values().get(invalidTopicTP), InvalidTopicException.class);
+ TestUtils.assertFutureError(unrepresentableTopicResult.values().get(invalidPartitionTP), InvalidTopicException.class);
+ unrepresentableTopicResult.values().get(tp1).get();
+
+ // Test success scenario
+ AlterPartitionReassignmentsResponseData noErrResponseData =
+ new AlterPartitionReassignmentsResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setErrorMessage(Errors.NONE.message())
+ .setResponses(Arrays.asList(
+ new ReassignableTopicResponse()
+ .setName("A")
+ .setPartitions(Collections.singletonList(normalPartitionResponse)),
+ new ReassignableTopicResponse()
+ .setName("B")
+ .setPartitions(Collections.singletonList(normalPartitionResponse)))
+ );
+ env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(noErrResponseData));
+ AlterPartitionReassignmentsResult noErrResult = env.adminClient().alterPartitionReassignments(reassignments);
+ noErrResult.all().get();
+ noErrResult.values().get(tp1).get();
+ noErrResult.values().get(tp2).get();
+ }
+ }
+
+ @Test
+ public void testListPartitionReassignments() throws Exception {
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ TopicPartition tp1 = new TopicPartition("A", 0);
+ OngoingPartitionReassignment tp1PartitionReassignment = new OngoingPartitionReassignment()
+ .setPartitionIndex(0)
+ .setRemovingReplicas(Arrays.asList(1, 2, 3))
+ .setAddingReplicas(Arrays.asList(4, 5, 6))
+ .setReplicas(Arrays.asList(1, 2, 3, 4, 5, 6));
+ OngoingTopicReassignment tp1Reassignment = new OngoingTopicReassignment().setName("A")
+ .setPartitions(Collections.singletonList(tp1PartitionReassignment));
+
+ TopicPartition tp2 = new TopicPartition("B", 0);
+ OngoingPartitionReassignment tp2PartitionReassignment = new OngoingPartitionReassignment()
+ .setPartitionIndex(0)
+ .setRemovingReplicas(Arrays.asList(1, 2, 3))
+ .setAddingReplicas(Arrays.asList(4, 5, 6))
+ .setReplicas(Arrays.asList(1, 2, 3, 4, 5, 6));
+ OngoingTopicReassignment tp2Reassignment = new OngoingTopicReassignment().setName("B")
+ .setPartitions(Collections.singletonList(tp2PartitionReassignment));
+
+ // 1. NOT_CONTROLLER error handling
+ ListPartitionReassignmentsResponseData notControllerData = new ListPartitionReassignmentsResponseData()
+ .setErrorCode(Errors.NOT_CONTROLLER.code())
+ .setErrorMessage(Errors.NOT_CONTROLLER.message());
+ MetadataResponse controllerNodeResponse = MetadataResponse.prepareResponse(env.cluster().nodes(),
+ env.cluster().clusterResource().clusterId(), 1, Collections.emptyList());
+ ListPartitionReassignmentsResponseData reassignmentsData = new ListPartitionReassignmentsResponseData()
+ .setTopics(Arrays.asList(tp1Reassignment, tp2Reassignment));
+ env.kafkaClient().prepareResponse(new ListPartitionReassignmentsResponse(notControllerData));
+ env.kafkaClient().prepareResponse(controllerNodeResponse);
+ env.kafkaClient().prepareResponse(new ListPartitionReassignmentsResponse(reassignmentsData));
+
+ ListPartitionReassignmentsResult noControllerResult = env.adminClient().listPartitionReassignments();
+ noControllerResult.reassignments().get(); // no error
+
+ // 2. UNKNOWN_TOPIC_OR_EXCEPTION_ERROR
+ ListPartitionReassignmentsResponseData unknownTpData = new ListPartitionReassignmentsResponseData()
+ .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+ .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message());
+ env.kafkaClient().prepareResponse(new ListPartitionReassignmentsResponse(unknownTpData));
+
+ ListPartitionReassignmentsResult unknownTpResult = env.adminClient().listPartitionReassignments(new HashSet<>(Arrays.asList(tp1, tp2)));
+ TestUtils.assertFutureError(unknownTpResult.reassignments(), UnknownTopicOrPartitionException.class);
+
+ // 3. Success
+ ListPartitionReassignmentsResponseData responseData = new ListPartitionReassignmentsResponseData()
+ .setTopics(Arrays.asList(tp1Reassignment, tp2Reassignment));
+ env.kafkaClient().prepareResponse(new ListPartitionReassignmentsResponse(responseData));
+ ListPartitionReassignmentsResult responseResult = env.adminClient().listPartitionReassignments();
+
+ Map<TopicPartition, PartitionReassignment> reassignments = responseResult.reassignments().get();
+
+ PartitionReassignment tp1Result = reassignments.get(tp1);
+ assertEquals(tp1PartitionReassignment.addingReplicas(), tp1Result.addingReplicas());
+ assertEquals(tp1PartitionReassignment.removingReplicas(), tp1Result.removingReplicas());
+ assertEquals(tp1PartitionReassignment.replicas(), tp1Result.replicas());
+ assertEquals(tp1PartitionReassignment.replicas(), tp1Result.replicas());
+ PartitionReassignment tp2Result = reassignments.get(tp2);
+ assertEquals(tp2PartitionReassignment.addingReplicas(), tp2Result.addingReplicas());
+ assertEquals(tp2PartitionReassignment.removingReplicas(), tp2Result.removingReplicas());
+ assertEquals(tp2PartitionReassignment.replicas(), tp2Result.replicas());
+ assertEquals(tp2PartitionReassignment.replicas(), tp2Result.replicas());
+ }
+ }
+
private static MemberDescription convertToMemberDescriptions(DescribedGroupMember member,
MemberAssignment assignment) {
return new MemberDescription(member.memberId(),
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 7ca9ce4..baaf661 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -40,6 +40,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
public class MockAdminClient extends AdminClient {
@@ -421,6 +422,17 @@ public class MockAdminClient extends AdminClient {
}
@Override
+ public AlterPartitionReassignmentsResult alterPartitionReassignments(Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments,
+ AlterPartitionReassignmentsOptions options) {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ @Override
+ public ListPartitionReassignmentsResult listPartitionReassignments(Optional<Set<TopicPartition>> partitions, ListPartitionReassignmentsOptions options) {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ @Override
public void close(Duration timeout) {}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index a979d4c..8645d79 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -1702,8 +1702,8 @@ public class RequestResponseTest {
private ListPartitionReassignmentsResponse createListPartitionReassignmentsResponse() {
ListPartitionReassignmentsResponseData data = new ListPartitionReassignmentsResponseData();
- data.topics().add(
- new ListPartitionReassignmentsResponseData.OngoingTopicReassignment()
+ data.setTopics(Collections.singletonList(
+ new ListPartitionReassignmentsResponseData.OngoingTopicReassignment()
.setName("topic")
.setPartitions(Collections.singletonList(
new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment()
@@ -1713,7 +1713,7 @@ public class RequestResponseTest {
.setRemovingReplicas(Collections.singletonList(1))
)
)
- );
+ ));
return new ListPartitionReassignmentsResponse(data);
}
}