You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2019/05/25 00:20:39 UTC

[kafka] branch trunk updated: KAFKA-8341. Retry Consumer group operation for NOT_COORDINATOR error (#6723)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 46a02f3  KAFKA-8341. Retry Consumer group operation for NOT_COORDINATOR error (#6723)
46a02f3 is described below

commit 46a02f3231cd6d340c622636159b9f59b4b3cb6e
Author: soondenana <50...@users.noreply.github.com>
AuthorDate: Fri May 24 17:20:22 2019 -0700

    KAFKA-8341. Retry Consumer group operation for NOT_COORDINATOR error (#6723)
    
    An API call for consumer groups must send a FindCoordinatorRequest to find the consumer group coordinator, and then send a follow-up request to that node.  But the coordinator might move after the FindCoordinatorRequest but before the follow-up request is sent.  In that case we currently fail.
    
    This change fixes that by detecting this error and then retrying.  This fixes listConsumerGroupOffsets, deleteConsumerGroups, and describeConsumerGroups.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>, Boyang Chen <bc...@outlook.com>
---
 .../kafka/clients/admin/KafkaAdminClient.java      | 478 ++++++++++++---------
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 124 +++++-
 2 files changed, 383 insertions(+), 219 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index f0e6635..e612593 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -166,6 +166,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.apache.kafka.common.requests.MetadataRequest.convertToMetadataRequestTopic;
@@ -2510,21 +2511,93 @@ public class KafkaAdminClient extends AdminClient {
         return new DescribeDelegationTokenResult(tokensFuture);
     }
 
+    /**
+     * Context class to encapsulate parameters of a call to find and use a consumer group coordinator.
+     * Some of the parameters are provided at construction and are immutable whereas others are provided
+     * as "Call" are completed and values are available, like node id of the coordinator.
+     *
+     * @param <T> The type of return value of the KafkaFuture
+     * @param <O> The type of configuration option. Different for different consumer group commands.
+     */
+    private final static class ConsumerGroupOperationContext<T, O extends AbstractOptions<O>> {
+        final private String groupId;
+        final private O options;
+        final private long deadline;
+        final private KafkaFutureImpl<T> future;
+        private Optional<Node> node;
+
+        public ConsumerGroupOperationContext(String groupId,
+                                             O options,
+                                             long deadline,
+                                             KafkaFutureImpl<T> future) {
+            this.groupId = groupId;
+            this.options = options;
+            this.deadline = deadline;
+            this.future = future;
+            this.node = Optional.empty();
+        }
+
+        public String getGroupId() {
+            return groupId;
+        }
+
+        public O getOptions() {
+            return options;
+        }
+
+        public long getDeadline() {
+            return deadline;
+        }
+
+        public KafkaFutureImpl<T> getFuture() {
+            return future;
+        }
+
+        public Optional<Node> getNode() {
+            return node;
+        }
+
+        public void setNode(Node node) {
+            this.node = Optional.ofNullable(node);
+        }
+
+        public boolean hasCoordinatorMoved(AbstractResponse response) {
+            return response.errorCounts().keySet()
+                    .stream()
+                    .anyMatch(error -> error == Errors.NOT_COORDINATOR);
+        }
+    }
+
+    private void rescheduleTask(ConsumerGroupOperationContext<?, ?> context, Supplier<Call> nextCall) {
+        log.info("Node {} is no longer the Coordinator. Retrying with new coordinator.",
+                context.getNode().orElse(null));
+        // Requeue the task so that we can try with new coordinator
+        context.setNode(null);
+        Call findCoordinatorCall = getFindCoordinatorCall(context, nextCall);
+        runnable.call(findCoordinatorCall, time.milliseconds());
+    }
+
+    private static <T> Map<String, KafkaFutureImpl<T>> createFutures(Collection<String> groupIds) {
+        return new HashSet<>(groupIds).stream().collect(
+            Collectors.toMap(groupId -> groupId,
+                groupId -> {
+                    if (groupIdIsUnrepresentable(groupId)) {
+                        KafkaFutureImpl<T> future = new KafkaFutureImpl<>();
+                        future.completeExceptionally(new InvalidGroupIdException("The given group id '" +
+                                groupId + "' cannot be represented in a request."));
+                        return future;
+                    } else {
+                        return new KafkaFutureImpl<>();
+                    }
+                }
+            ));
+    }
+
     @Override
     public DescribeConsumerGroupsResult describeConsumerGroups(final Collection<String> groupIds,
                                                                final DescribeConsumerGroupsOptions options) {
 
-        final Map<String, KafkaFutureImpl<ConsumerGroupDescription>> futures = new HashMap<>(groupIds.size());
-        for (String groupId: groupIds) {
-            if (groupIdIsUnrepresentable(groupId)) {
-                KafkaFutureImpl<ConsumerGroupDescription> future = new KafkaFutureImpl<>();
-                future.completeExceptionally(new InvalidGroupIdException("The given group id '" +
-                        groupId + "' cannot be represented in a request."));
-                futures.put(groupId, future);
-            } else if (!futures.containsKey(groupId)) {
-                futures.put(groupId, new KafkaFutureImpl<>());
-            }
-        }
+        final Map<String, KafkaFutureImpl<ConsumerGroupDescription>> futures = createFutures(groupIds);
 
         // TODO: KAFKA-6788, we should consider grouping the request per coordinator and send one request with a list of
         // all consumer groups this coordinator host
@@ -2537,97 +2610,134 @@ public class KafkaAdminClient extends AdminClient {
 
             final long startFindCoordinatorMs = time.milliseconds();
             final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs());
+            ConsumerGroupOperationContext<ConsumerGroupDescription, DescribeConsumerGroupsOptions> context =
+                    new ConsumerGroupOperationContext<>(groupId, options, deadline, futures.get(groupId));
+            Call findCoordinatorCall = getFindCoordinatorCall(context,
+                () -> KafkaAdminClient.this.getDescribeConsumerGroupsCall(context));
+            runnable.call(findCoordinatorCall, startFindCoordinatorMs);
+        }
 
-            runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) {
-                @Override
-                FindCoordinatorRequest.Builder createRequest(int timeoutMs) {
-                    return new FindCoordinatorRequest.Builder(
-                            new FindCoordinatorRequestData()
+        return new DescribeConsumerGroupsResult(new HashMap<>(futures));
+    }
+
+    /**
+     * Returns a {@code Call} object to fetch the coordinator for a consumer group id. Takes another Call
+     * parameter to schedule action that need to be taken using the coordinator. The param is a Supplier
+     * so that it can be lazily created, so that it can use the results of find coordinator call in its
+     * construction.
+     *
+     * @param <T> The type of return value of the KafkaFuture, like ConsumerGroupDescription, Void etc.
+     * @param <O> The type of configuration option, like DescribeConsumerGroupsOptions, ListConsumerGroupsOptions etc
+     */
+    private <T, O extends AbstractOptions<O>> Call getFindCoordinatorCall(ConsumerGroupOperationContext<T, O> context,
+                                               Supplier<Call> nextCall) {
+        return new Call("findCoordinator", context.getDeadline(), new LeastLoadedNodeProvider()) {
+            @Override
+            FindCoordinatorRequest.Builder createRequest(int timeoutMs) {
+                return new FindCoordinatorRequest.Builder(
+                        new FindCoordinatorRequestData()
                                 .setKeyType(CoordinatorType.GROUP.id())
-                                .setKey(groupId));
-                }
+                                .setKey(context.getGroupId()));
+            }
 
-                @Override
-                void handleResponse(AbstractResponse abstractResponse) {
-                    final FindCoordinatorResponse fcResponse = (FindCoordinatorResponse) abstractResponse;
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
 
-                    if (handleGroupRequestError(fcResponse.error(), futures.get(groupId)))
-                        return;
+                if (handleGroupRequestError(response.error(), context.getFuture()))
+                    return;
 
-                    final long nowDescribeConsumerGroups = time.milliseconds();
-                    final int nodeId = fcResponse.node().id();
-                    runnable.call(new Call("describeConsumerGroups", deadline, new ConstantNodeIdProvider(nodeId)) {
-                        @Override
-                        AbstractRequest.Builder createRequest(int timeoutMs) {
-                            return new DescribeGroupsRequest.Builder(
-                                new DescribeGroupsRequestData()
-                                    .setGroups(Collections.singletonList(groupId))
-                                    .setIncludeAuthorizedOperations(options.includeAuthorizedOperations()));
-                        }
+                context.setNode(response.node());
 
-                        @Override
-                        void handleResponse(AbstractResponse abstractResponse) {
-                            final DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse;
+                runnable.call(nextCall.get(), time.milliseconds());
+            }
 
-                            KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId);
-                            final DescribedGroup describedGroup = response.data()
-                                .groups()
-                                .stream()
-                                .filter(group -> groupId.equals(group.groupId()))
-                                .findFirst().get();
+            @Override
+            void handleFailure(Throwable throwable) {
+                context.getFuture().completeExceptionally(throwable);
+            }
+        };
+    }
 
-                            final Errors groupError = Errors.forCode(describedGroup.errorCode());
+    private Call getDescribeConsumerGroupsCall(
+            ConsumerGroupOperationContext<ConsumerGroupDescription, DescribeConsumerGroupsOptions> context) {
+        return new Call("describeConsumerGroups",
+                context.getDeadline(),
+                new ConstantNodeIdProvider(context.getNode().get().id())) {
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new DescribeGroupsRequest.Builder(
+                    new DescribeGroupsRequestData()
+                        .setGroups(Collections.singletonList(context.getGroupId()))
+                        .setIncludeAuthorizedOperations(context.getOptions().includeAuthorizedOperations()));
+            }
 
-                            if (handleGroupRequestError(groupError, future))
-                                return;
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                final DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse;
 
-                            final String protocolType = describedGroup.protocolType();
-                            if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) {
-                                final List<DescribedGroupMember> members = describedGroup.members();
-                                final List<MemberDescription> memberDescriptions = new ArrayList<>(members.size());
-                                final Set<AclOperation> authorizedOperations = validAclOperations(describedGroup.authorizedOperations());
-                                for (DescribedGroupMember groupMember : members) {
-                                    Set<TopicPartition> partitions = Collections.emptySet();
-                                    if (groupMember.memberAssignment().length > 0) {
-                                        final PartitionAssignor.Assignment assignment = ConsumerProtocol.
-                                            deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
-                                        partitions = new HashSet<>(assignment.partitions());
-                                    }
-                                    final MemberDescription memberDescription =
-                                        new MemberDescription(groupMember.memberId(),
-                                            groupMember.clientId(),
-                                            groupMember.clientHost(),
-                                            new MemberAssignment(partitions));
-                                    memberDescriptions.add(memberDescription);
-                                }
-                                final ConsumerGroupDescription consumerGroupDescription =
-                                    new ConsumerGroupDescription(groupId, protocolType.isEmpty(),
-                                        memberDescriptions,
-                                        describedGroup.protocolData(),
-                                        ConsumerGroupState.parse(describedGroup.groupState()),
-                                        fcResponse.node(),
-                                        authorizedOperations);
-                                future.complete(consumerGroupDescription);
-                            }
-                        }
+                List<DescribedGroup> describedGroups = response.data().groups();
+                if (describedGroups.isEmpty()) {
+                    context.getFuture().completeExceptionally(
+                            new InvalidGroupIdException("No consumer group found for GroupId: " + context.getGroupId()));
+                    return;
+                }
 
-                        @Override
-                        void handleFailure(Throwable throwable) {
-                            KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId);
-                            future.completeExceptionally(throwable);
-                        }
-                    }, nowDescribeConsumerGroups);
+                if (describedGroups.size() > 1 ||
+                        !describedGroups.get(0).groupId().equals(context.getGroupId())) {
+                    String ids = Arrays.toString(describedGroups.stream().map(DescribedGroup::groupId).toArray());
+                    context.getFuture().completeExceptionally(new InvalidGroupIdException(
+                            "DescribeConsumerGroup request for GroupId: " + context.getGroupId() + " returned " + ids));
+                    return;
                 }
 
-                @Override
-                void handleFailure(Throwable throwable) {
-                    KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId);
-                    future.completeExceptionally(throwable);
+                final DescribedGroup describedGroup = describedGroups.get(0);
+
+                // If coordinator changed since we fetched it, retry
+                if (context.hasCoordinatorMoved(response)) {
+                    rescheduleTask(context, () -> getDescribeConsumerGroupsCall(context));
+                    return;
                 }
-            }, startFindCoordinatorMs);
-        }
 
-        return new DescribeConsumerGroupsResult(new HashMap<>(futures));
+                final Errors groupError = Errors.forCode(describedGroup.errorCode());
+                if (handleGroupRequestError(groupError, context.getFuture()))
+                    return;
+
+                final String protocolType = describedGroup.protocolType();
+                if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) {
+                    final List<DescribedGroupMember> members = describedGroup.members();
+                    final List<MemberDescription> memberDescriptions = new ArrayList<>(members.size());
+                    final Set<AclOperation> authorizedOperations = validAclOperations(describedGroup.authorizedOperations());
+                    for (DescribedGroupMember groupMember : members) {
+                        Set<TopicPartition> partitions = Collections.emptySet();
+                        if (groupMember.memberAssignment().length > 0) {
+                            final PartitionAssignor.Assignment assignment = ConsumerProtocol.
+                                deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
+                            partitions = new HashSet<>(assignment.partitions());
+                        }
+                        final MemberDescription memberDescription =
+                            new MemberDescription(groupMember.memberId(),
+                                groupMember.clientId(),
+                                groupMember.clientHost(),
+                                new MemberAssignment(partitions));
+                        memberDescriptions.add(memberDescription);
+                    }
+                    final ConsumerGroupDescription consumerGroupDescription =
+                        new ConsumerGroupDescription(context.getGroupId(), protocolType.isEmpty(),
+                            memberDescriptions,
+                            describedGroup.protocolData(),
+                            ConsumerGroupState.parse(describedGroup.groupState()),
+                            context.getNode().get(),
+                            authorizedOperations);
+                    context.getFuture().complete(consumerGroupDescription);
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                context.getFuture().completeExceptionally(throwable);
+            }
+        };
     }
 
     private Set<AclOperation> validAclOperations(final int authorizedOperations) {
@@ -2776,162 +2886,125 @@ public class KafkaAdminClient extends AdminClient {
     }
 
     @Override
-    public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId, final ListConsumerGroupOffsetsOptions options) {
+    public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId,
+                                                                   final ListConsumerGroupOffsetsOptions options) {
         final KafkaFutureImpl<Map<TopicPartition, OffsetAndMetadata>> groupOffsetListingFuture = new KafkaFutureImpl<>();
-
         final long startFindCoordinatorMs = time.milliseconds();
         final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs());
 
-        runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) {
+        ConsumerGroupOperationContext<Map<TopicPartition, OffsetAndMetadata>, ListConsumerGroupOffsetsOptions> context =
+                new ConsumerGroupOperationContext<>(groupId, options, deadline, groupOffsetListingFuture);
+
+        Call findCoordinatorCall = getFindCoordinatorCall(context,
+            () -> KafkaAdminClient.this.getListConsumerGroupOffsetsCall(context));
+        runnable.call(findCoordinatorCall, startFindCoordinatorMs);
+
+        return new ListConsumerGroupOffsetsResult(groupOffsetListingFuture);
+    }
+
+    private Call getListConsumerGroupOffsetsCall(ConsumerGroupOperationContext<Map<TopicPartition, OffsetAndMetadata>,
+            ListConsumerGroupOffsetsOptions> context) {
+        return new Call("listConsumerGroupOffsets", context.getDeadline(),
+                new ConstantNodeIdProvider(context.getNode().get().id())) {
             @Override
-            FindCoordinatorRequest.Builder createRequest(int timeoutMs) {
-                return new FindCoordinatorRequest.Builder(
-                        new FindCoordinatorRequestData()
-                            .setKeyType(CoordinatorType.GROUP.id())
-                            .setKey(groupId));
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new OffsetFetchRequest.Builder(context.getGroupId(), context.getOptions().topicPartitions());
             }
 
             @Override
             void handleResponse(AbstractResponse abstractResponse) {
-                final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
+                final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse;
+                final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>();
 
-                if (handleGroupRequestError(response.error(), groupOffsetListingFuture))
+                // If coordinator changed since we fetched it, retry
+                if (context.hasCoordinatorMoved(response)) {
+                    rescheduleTask(context, () -> getListConsumerGroupOffsetsCall(context));
                     return;
+                }
 
-                final long nowListConsumerGroupOffsets = time.milliseconds();
-
-                final int nodeId = response.node().id();
-
-                runnable.call(new Call("listConsumerGroupOffsets", deadline, new ConstantNodeIdProvider(nodeId)) {
-                    @Override
-                    AbstractRequest.Builder createRequest(int timeoutMs) {
-                        return new OffsetFetchRequest.Builder(groupId, options.topicPartitions());
-                    }
-
-                    @Override
-                    void handleResponse(AbstractResponse abstractResponse) {
-                        final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse;
-                        final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>();
-
-                        if (handleGroupRequestError(response.error(), groupOffsetListingFuture))
-                            return;
-
-                        for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry :
-                            response.responseData().entrySet()) {
-                            final TopicPartition topicPartition = entry.getKey();
-                            OffsetFetchResponse.PartitionData partitionData = entry.getValue();
-                            final Errors error = partitionData.error;
-
-                            if (error == Errors.NONE) {
-                                final Long offset = partitionData.offset;
-                                final String metadata = partitionData.metadata;
-                                final Optional<Integer> leaderEpoch = partitionData.leaderEpoch;
-                                groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
-                            } else {
-                                log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
-                            }
-                        }
-                        groupOffsetListingFuture.complete(groupOffsetsListing);
-                    }
+                if (handleGroupRequestError(response.error(), context.getFuture()))
+                    return;
 
-                    @Override
-                    void handleFailure(Throwable throwable) {
-                        groupOffsetListingFuture.completeExceptionally(throwable);
+                for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry :
+                    response.responseData().entrySet()) {
+                    final TopicPartition topicPartition = entry.getKey();
+                    OffsetFetchResponse.PartitionData partitionData = entry.getValue();
+                    final Errors error = partitionData.error;
+
+                    if (error == Errors.NONE) {
+                        final Long offset = partitionData.offset;
+                        final String metadata = partitionData.metadata;
+                        final Optional<Integer> leaderEpoch = partitionData.leaderEpoch;
+                        groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
+                    } else {
+                        log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
                     }
-                }, nowListConsumerGroupOffsets);
+                }
+                context.getFuture().complete(groupOffsetsListing);
             }
 
             @Override
             void handleFailure(Throwable throwable) {
-                groupOffsetListingFuture.completeExceptionally(throwable);
+                context.getFuture().completeExceptionally(throwable);
             }
-        }, startFindCoordinatorMs);
-
-        return new ListConsumerGroupOffsetsResult(groupOffsetListingFuture);
+        };
     }
 
     @Override
     public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options) {
 
-        final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>(groupIds.size());
-        for (String groupId: groupIds) {
-            if (groupIdIsUnrepresentable(groupId)) {
-                KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
-                future.completeExceptionally(new ApiException("The given group id '" +
-                        groupId + "' cannot be represented in a request."));
-                futures.put(groupId, future);
-            } else if (!futures.containsKey(groupId)) {
-                futures.put(groupId, new KafkaFutureImpl<>());
-            }
-        }
+        final Map<String, KafkaFutureImpl<Void>> futures = createFutures(groupIds);
 
         // TODO: KAFKA-6788, we should consider grouping the request per coordinator and send one request with a list of
         // all consumer groups this coordinator host
         for (final String groupId : groupIds) {
             // skip sending request for those futures that already failed.
-            if (futures.get(groupId).isCompletedExceptionally())
+            final KafkaFutureImpl<Void> future = futures.get(groupId);
+            if (future.isCompletedExceptionally())
                 continue;
 
             final long startFindCoordinatorMs = time.milliseconds();
             final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs());
+            ConsumerGroupOperationContext<Void, DeleteConsumerGroupsOptions> context =
+                    new ConsumerGroupOperationContext<>(groupId, options, deadline, future);
+            Call findCoordinatorCall = getFindCoordinatorCall(context,
+                () -> KafkaAdminClient.this.getDeleteConsumerGroupsCall(context));
+            runnable.call(findCoordinatorCall, startFindCoordinatorMs);
+        }
 
-            runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) {
-                @Override
-                FindCoordinatorRequest.Builder createRequest(int timeoutMs) {
-                    return new FindCoordinatorRequest.Builder(
-                            new FindCoordinatorRequestData()
-                                .setKeyType(CoordinatorType.GROUP.id())
-                                .setKey(groupId));
-                }
-
-                @Override
-                void handleResponse(AbstractResponse abstractResponse) {
-                    final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
-
-                    if (handleGroupRequestError(response.error(), futures.get(groupId)))
-                        return;
-
-                    final long nowDeleteConsumerGroups = time.milliseconds();
-
-                    final int nodeId = response.node().id();
-
-                    runnable.call(new Call("deleteConsumerGroups", deadline, new ConstantNodeIdProvider(nodeId)) {
-
-                        @Override
-                        AbstractRequest.Builder createRequest(int timeoutMs) {
-                            return new DeleteGroupsRequest.Builder(Collections.singleton(groupId));
-                        }
-
-                        @Override
-                        void handleResponse(AbstractResponse abstractResponse) {
-                            final DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse;
+        return new DeleteConsumerGroupsResult(new HashMap<>(futures));
+    }
 
-                            KafkaFutureImpl<Void> future = futures.get(groupId);
-                            final Errors groupError = response.get(groupId);
+    private Call getDeleteConsumerGroupsCall(ConsumerGroupOperationContext<Void, DeleteConsumerGroupsOptions> context) {
+        return new Call("deleteConsumerGroups", context.getDeadline(), new ConstantNodeIdProvider(context.getNode().get().id())) {
 
-                            if (handleGroupRequestError(groupError, future))
-                                return;
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new DeleteGroupsRequest.Builder(Collections.singleton(context.getGroupId()));
+            }
 
-                            future.complete(null);
-                        }
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                final DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse;
 
-                        @Override
-                        void handleFailure(Throwable throwable) {
-                            KafkaFutureImpl<Void> future = futures.get(groupId);
-                            future.completeExceptionally(throwable);
-                        }
-                    }, nowDeleteConsumerGroups);
+                // If coordinator changed since we fetched it, retry
+                if (context.hasCoordinatorMoved(response)) {
+                    rescheduleTask(context, () -> getDeleteConsumerGroupsCall(context));
+                    return;
                 }
 
-                @Override
-                void handleFailure(Throwable throwable) {
-                    KafkaFutureImpl<Void> future = futures.get(groupId);
-                    future.completeExceptionally(throwable);
-                }
-            }, startFindCoordinatorMs);
-        }
+                final Errors groupError = response.get(context.getGroupId());
+                if (handleGroupRequestError(groupError, context.getFuture()))
+                    return;
 
-        return new DeleteConsumerGroupsResult(new HashMap<>(futures));
+                context.getFuture().complete(null);
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                context.getFuture().completeExceptionally(throwable);
+            }
+        };
     }
 
     @Override
@@ -2968,5 +3041,4 @@ public class KafkaAdminClient extends AdminClient {
         }, now);
         return new ElectPreferredLeadersResult(electionFuture, partitionSet);
     }
-
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 6aaa75b..567d578 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -1078,10 +1078,10 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
             //Retriable FindCoordinatorResponse errors should be retried
-            env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.COORDINATOR_NOT_AVAILABLE,  Node.noNode()));
-            env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS,  Node.noNode()));
+            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,  Node.noNode()));
+            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS,  Node.noNode()));
 
-            env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, env.cluster().controller()));
+            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
             DescribeGroupsResponseData data = new DescribeGroupsResponseData();
 
@@ -1107,6 +1107,23 @@ public class KafkaAdminClientTest {
                 Collections.emptySet()));
             env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
 
+            /*
+             * We need to return two responses here, one with NOT_COORDINATOR error when calling describe consumer group
+             * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a
+             * FindCoordinatorResponse.
+             */
+            data = new DescribeGroupsResponseData();
+            data.groups().add(DescribeGroupsResponse.groupMetadata(
+                    "group-0",
+                    Errors.NOT_COORDINATOR,
+                    "",
+                    "",
+                    "",
+                    Collections.emptyList(),
+                    Collections.emptySet()));
+            env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
+            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
             data = new DescribeGroupsResponseData();
             TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
             TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
@@ -1133,26 +1150,83 @@ public class KafkaAdminClientTest {
                     ),
                     Collections.emptySet()));
 
-            data.groups().add(DescribeGroupsResponse.groupMetadata(
+            env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
+
+            final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(singletonList("group-0"));
+            final ConsumerGroupDescription groupDescription = result.describedGroups().get("group-0").get();
+
+            assertEquals(1, result.describedGroups().size());
+            assertEquals("group-0", groupDescription.groupId());
+            assertEquals(2, groupDescription.members().size());
+        }
+    }
+
+    @Test
+    public void testDescribeMultipleConsumerGroups() throws Exception {
+        final HashMap<Integer, Node> nodes = new HashMap<>();
+        nodes.put(0, new Node(0, "localhost", 8121));
+
+        final Cluster cluster =
+                new Cluster(
+                        "mockClusterId",
+                        nodes.values(),
+                        Collections.<PartitionInfo>emptyList(),
+                        Collections.<String>emptySet(),
+                        Collections.<String>emptySet(), nodes.get(0));
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
+            TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
+            TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
+            TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
+
+            final List<TopicPartition> topicPartitions = new ArrayList<>();
+            topicPartitions.add(0, myTopicPartition0);
+            topicPartitions.add(1, myTopicPartition1);
+            topicPartitions.add(2, myTopicPartition2);
+
+            final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(topicPartitions));
+            byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()];
+            memberAssignment.get(memberAssignmentBytes);
+
+            DescribeGroupsResponseData group0Data = new DescribeGroupsResponseData();
+            group0Data.groups().add(DescribeGroupsResponse.groupMetadata(
+                    "group-0",
+                    Errors.NONE,
+                    "",
+                    ConsumerProtocol.PROTOCOL_TYPE,
+                    "",
+                    asList(
+                            DescribeGroupsResponse.groupMember("0", "clientId0", "clientHost", memberAssignmentBytes, null),
+                            DescribeGroupsResponse.groupMember("1", "clientId1", "clientHost", memberAssignmentBytes, null)
+                    ),
+                    Collections.emptySet()));
+
+            DescribeGroupsResponseData groupConnectData = new DescribeGroupsResponseData();
+            group0Data.groups().add(DescribeGroupsResponse.groupMetadata(
                     "group-connect-0",
                     Errors.NONE,
                     "",
                     "connect",
                     "",
                     asList(
-                        DescribeGroupsResponse.groupMember("0", "clientId0", "clientHost", memberAssignmentBytes, null),
-                        DescribeGroupsResponse.groupMember("1", "clientId1", "clientHost", memberAssignmentBytes, null)
+                            DescribeGroupsResponse.groupMember("0", "clientId0", "clientHost", memberAssignmentBytes, null),
+                            DescribeGroupsResponse.groupMember("1", "clientId1", "clientHost", memberAssignmentBytes, null)
                     ),
                     Collections.emptySet()));
 
-            env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
+            env.kafkaClient().prepareResponse(new DescribeGroupsResponse(group0Data));
+            env.kafkaClient().prepareResponse(new DescribeGroupsResponse(groupConnectData));
 
-            final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(singletonList("group-0"));
-            final ConsumerGroupDescription groupDescription = result.describedGroups().get("group-0").get();
-
-            assertEquals(1, result.describedGroups().size());
-            assertEquals("group-0", groupDescription.groupId());
-            assertEquals(2, groupDescription.members().size());
+            Collection<String> groups = new HashSet<>();
+            groups.add("group-0");
+            groups.add("group-connect-0");
+            final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(groups);
+            assertEquals(2, result.describedGroups().size());
+            assertEquals(groups, result.describedGroups().keySet());
         }
     }
 
@@ -1173,14 +1247,22 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
             //Retriable FindCoordinatorResponse errors should be retried
-            env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.COORDINATOR_NOT_AVAILABLE,  Node.noNode()));
+            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,  Node.noNode()));
 
-            env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, env.cluster().controller()));
+            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
             //Retriable  errors should be retried
             env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap()));
             env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap()));
 
+            /*
+             * We need to return two responses here, one for NOT_COORDINATOR error when calling list consumer group offsets
+             * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a
+             * FindCoordinatorResponse.
+             */
+            env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
+            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
             TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
             TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
             TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
@@ -1206,7 +1288,7 @@ public class KafkaAdminClientTest {
 
     @Test
     public void testDeleteConsumerGroups() throws Exception {
-        final HashMap<Integer, Node> nodes = new HashMap<>();
+        final Map<Integer, Node> nodes = new HashMap<>();
         nodes.put(0, new Node(0, "localhost", 8121));
 
         final Cluster cluster =
@@ -1254,6 +1336,16 @@ public class KafkaAdminClientTest {
             errorResponse2.put("group-0", Errors.COORDINATOR_LOAD_IN_PROGRESS);
             env.kafkaClient().prepareResponse(new DeleteGroupsResponse(errorResponse2));
 
+            /*
+             * We need to return two responses here, one for NOT_COORDINATOR call when calling delete a consumer group
+             * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a
+             * FindCoordinatorResponse.
+             */
+            Map<String, Errors> coordinatorMoved = new HashMap<>();
+            coordinatorMoved.put("UnitTestError", Errors.NOT_COORDINATOR);
+            env.kafkaClient().prepareResponse(new DeleteGroupsResponse(coordinatorMoved));
+            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
             env.kafkaClient().prepareResponse(new DeleteGroupsResponse(validResponse));
 
             final DeleteConsumerGroupsResult errorResult1 = env.adminClient().deleteConsumerGroups(groupIds);