You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/04/15 17:13:26 UTC

[kafka] branch trunk updated: KAFKA-6058: Refactor consumer API result return types (#4856)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b599b39  KAFKA-6058: Refactor consumer API result return types (#4856)
b599b39 is described below

commit b599b395f3d421b3fdb1d25b294778478c2467cf
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Sun Apr 15 10:13:22 2018 -0700

    KAFKA-6058: Refactor consumer API result return types (#4856)
    
    Refactored the return types in consumer group APIs the following way:
    
    ```
    Map<TopicPartition, KafkaFuture<Void>> DeleteConsumerGroupsResult#deletedGroups()
    
    Map<TopicPartition, KafkaFuture<ConsumerGroupDescription>> DescribeConsumerGroupsResult#describedGroups()
    
    KafkaFuture<Collection<ConsumerGroupListing>> ListConsumerGroupsResult#listings()
    
    KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> ListConsumerGroupOffsetsResult#partitionsToOffsetAndMetadata()
    ```
    
    * For DeleteConsumerGroupsResult and DescribeConsumerGroupsResult, for each group id we have two round-trips to get the coordinator, and then send the delete / describe request; I leave the potential optimization of batching requests for future work.
    
    * For ListConsumerGroupOffsetsResult, it is a simple single round-trip and hence the whole map is wrapped as a Future.
    
    * ListConsumerGroupsResult, it is the most tricky one: we would only know how many futures we should wait for after the first listNode returns, and hence I constructed the flattened future in the middle wrapped with the underlying map of futures; also added an iterator API to compensate the "fail the whole future if any broker returns error" behavior. The iterator future will throw exception on the failing brokers, while return the consumer for other succeeded brokers.
    
    Reviewers: Colin Patrick McCabe <co...@cmccabe.xyz>, Jason Gustafson <ja...@confluent.io>
---
 .../clients/admin/DeleteConsumerGroupsResult.java  |  17 +-
 .../admin/DescribeConsumerGroupsResult.java        |  15 +-
 .../kafka/clients/admin/KafkaAdminClient.java      | 254 +++++++++++++--------
 .../clients/admin/ListConsumerGroupsResult.java    |  71 +++++-
 .../java/org/apache/kafka/common/KafkaFuture.java  |   9 -
 .../kafka/common/internals/KafkaFutureImpl.java    |   6 +-
 .../java/org/apache/kafka/clients/MockClient.java  |   3 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 109 ++++++---
 8 files changed, 336 insertions(+), 148 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
index b4bce26..dd6835c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
@@ -29,13 +29,24 @@ import java.util.Map;
  */
 @InterfaceStability.Evolving
 public class DeleteConsumerGroupsResult {
-    final KafkaFuture<Map<String, KafkaFuture<Void>>> futures;
+    private final Map<String, KafkaFuture<Void>> futures;
 
-    DeleteConsumerGroupsResult(KafkaFuture<Map<String, KafkaFuture<Void>>> futures) {
+    DeleteConsumerGroupsResult(final Map<String, KafkaFuture<Void>> futures) {
         this.futures = futures;
     }
 
-    public KafkaFuture<Map<String, KafkaFuture<Void>>> deletedGroups() {
+    /**
+     * Return a map from group id to futures which can be used to check the status of
+     * individual deletions.
+     */
+    public Map<String, KafkaFuture<Void>> deletedGroups() {
         return futures;
     }
+
+    /**
+     * Return a future which succeeds only if all the consumer group deletions succeed.
+     */
+    public KafkaFuture<Void> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
index adde031..ac2189c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
@@ -32,16 +32,23 @@ import java.util.Map;
 @InterfaceStability.Evolving
 public class DescribeConsumerGroupsResult {
 
-    private final KafkaFuture<Map<String, KafkaFuture<ConsumerGroupDescription>>> futures;
+    private final Map<String, KafkaFuture<ConsumerGroupDescription>> futures;
 
-    public DescribeConsumerGroupsResult(KafkaFuture<Map<String, KafkaFuture<ConsumerGroupDescription>>> futures) {
+    public DescribeConsumerGroupsResult(final Map<String, KafkaFuture<ConsumerGroupDescription>> futures) {
         this.futures = futures;
     }
 
     /**
-     * Return a map from group name to futures which can be used to check the description of a consumer group.
+     * Return a map from group id to futures which can be used to check the description of a consumer group.
      */
-    public KafkaFuture<Map<String, KafkaFuture<ConsumerGroupDescription>>> describedGroups() {
+    public Map<String, KafkaFuture<ConsumerGroupDescription>> describedGroups() {
         return futures;
     }
+
+    /**
+     * Return a future which succeeds only if all the consumer group description succeed.
+     */
+    public KafkaFuture<Void> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 50bcfd3..fa3f943 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -46,6 +46,7 @@ import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.BrokerNotAvailableException;
 import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.RetriableException;
@@ -53,6 +54,7 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -916,8 +918,11 @@ public class KafkaAdminClient extends AdminClient {
          * @param correlationIdToCall   A map of correlation IDs to calls.
          * @param callsInFlight         A map of nodes to the calls they have in flight.
         **/
-        private void handleResponses(long now, List<ClientResponse> responses, Map<String, List<Call>> callsInFlight,
-                Map<Integer, Call> correlationIdToCall) {
+        private void handleResponses(long now,
+                                     List<ClientResponse> responses,
+                                     Map<String, List<Call>> callsInFlight,
+                                     Map<Integer, Call> correlationIdToCall) {
+
             for (ClientResponse response : responses) {
                 int correlationId = response.requestHeader().correlationId();
 
@@ -1108,7 +1113,11 @@ public class KafkaAdminClient extends AdminClient {
      * those policies on the server, so that they can be changed in the future if needed.
      */
     private static boolean topicNameIsUnrepresentable(String topicName) {
-        return (topicName == null) || topicName.isEmpty();
+        return topicName == null || topicName.isEmpty();
+    }
+
+    private static boolean groupIdIsUnrepresentable(String groupId) {
+        return groupId == null;
     }
 
     @Override
@@ -1951,6 +1960,7 @@ public class KafkaAdminClient extends AdminClient {
         return new DescribeReplicaLogDirsResult(new HashMap<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>>(futures));
     }
 
+    @Override
     public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions,
                                                    final CreatePartitionsOptions options) {
         final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>(newPartitions.size());
@@ -1989,6 +1999,7 @@ public class KafkaAdminClient extends AdminClient {
         return new CreatePartitionsResult(new HashMap<String, KafkaFuture<Void>>(futures));
     }
 
+    @Override
     public DeleteRecordsResult deleteRecords(final Map<TopicPartition, RecordsToDelete> recordsToDelete,
                                              final DeleteRecordsOptions options) {
 
@@ -2228,20 +2239,30 @@ public class KafkaAdminClient extends AdminClient {
     @Override
     public DescribeConsumerGroupsResult describeConsumerGroups(final Collection<String> groupIds,
                                                                final DescribeConsumerGroupsOptions options) {
-        final KafkaFutureImpl<Map<String, KafkaFuture<ConsumerGroupDescription>>> resultFutures = new KafkaFutureImpl<>();
-        final Map<String, KafkaFutureImpl<ConsumerGroupDescription>> consumerGroupFutures = new HashMap<>(groupIds.size());
-        final ArrayList<String> groupIdList = new ArrayList<>();
-        for (String groupId : groupIds) {
-            if (!consumerGroupFutures.containsKey(groupId)) {
-                consumerGroupFutures.put(groupId, new KafkaFutureImpl<ConsumerGroupDescription>());
-                groupIdList.add(groupId);
+
+        final Map<String, KafkaFutureImpl<ConsumerGroupDescription>> futures = new HashMap<>(groupIds.size());
+        for (String groupId: groupIds) {
+            if (groupIdIsUnrepresentable(groupId)) {
+                KafkaFutureImpl<ConsumerGroupDescription> future = new KafkaFutureImpl<>();
+                future.completeExceptionally(new InvalidGroupIdException("The given group id '" +
+                        groupId + "' cannot be represented in a request."));
+                futures.put(groupId, future);
+            } else if (!futures.containsKey(groupId)) {
+                futures.put(groupId, new KafkaFutureImpl<ConsumerGroupDescription>());
             }
         }
 
-        for (final String groupId : groupIdList) {
+        // TODO: KAFKA-6788, we should consider grouping the request per coordinator and send one request with a list of
+        // all consumer groups this coordinator host
+        for (final Map.Entry<String, KafkaFutureImpl<ConsumerGroupDescription>> entry : futures.entrySet()) {
+            // skip sending request for those futures that already failed.
+            if (entry.getValue().isCompletedExceptionally())
+                continue;
+
+            final String groupId = entry.getKey();
 
-            final long nowFindCoordinator = time.milliseconds();
-            final long deadline = calcDeadlineMs(nowFindCoordinator, options.timeoutMs());
+            final long startFindCoordinatorMs = time.milliseconds();
+            final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs());
 
             runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) {
                 @Override
@@ -2261,23 +2282,21 @@ public class KafkaAdminClient extends AdminClient {
 
                         @Override
                         AbstractRequest.Builder createRequest(int timeoutMs) {
-                            return new DescribeGroupsRequest.Builder(groupIdList);
+                            return new DescribeGroupsRequest.Builder(Collections.singletonList(groupId));
                         }
 
                         @Override
                         void handleResponse(AbstractResponse abstractResponse) {
                             final DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse;
-                            // Handle server responses for particular groupId.
-                            for (Map.Entry<String, KafkaFutureImpl<ConsumerGroupDescription>> entry : consumerGroupFutures.entrySet()) {
-                                final String groupId = entry.getKey();
-                                final KafkaFutureImpl<ConsumerGroupDescription> future = entry.getValue();
-                                final DescribeGroupsResponse.GroupMetadata groupMetadata = response.groups().get(groupId);
-                                final Errors groupError = groupMetadata.error();
-                                if (groupError != Errors.NONE) {
-                                    future.completeExceptionally(groupError.exception());
-                                    continue;
-                                }
 
+                            KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId);
+                            final DescribeGroupsResponse.GroupMetadata groupMetadata = response.groups().get(groupId);
+
+                            final Errors groupError = groupMetadata.error();
+                            if (groupError != Errors.NONE) {
+                                // TODO: KAFKA-6789, we can retry based on the error code
+                                future.completeExceptionally(groupError.exception());
+                            } else {
                                 final String protocolType = groupMetadata.protocolType();
                                 if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) {
                                     final List<DescribeGroupsResponse.GroupMember> members = groupMetadata.members();
@@ -2306,27 +2325,28 @@ public class KafkaAdminClient extends AdminClient {
 
                         @Override
                         void handleFailure(Throwable throwable) {
-                            completeAllExceptionally(consumerGroupFutures.values(), throwable);
+                            KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId);
+                            future.completeExceptionally(throwable);
                         }
                     }, nowDescribeConsumerGroups);
-
-                    resultFutures.complete(new HashMap<String, KafkaFuture<ConsumerGroupDescription>>(consumerGroupFutures));
                 }
 
                 @Override
                 void handleFailure(Throwable throwable) {
-                    resultFutures.completeExceptionally(throwable);
+                    KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId);
+                    future.completeExceptionally(throwable);
                 }
-            }, nowFindCoordinator);
+            }, startFindCoordinatorMs);
         }
 
-        return new DescribeConsumerGroupsResult(resultFutures);
+        return new DescribeConsumerGroupsResult(new HashMap<String, KafkaFuture<ConsumerGroupDescription>>(futures));
     }
 
     @Override
     public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) {
-        //final KafkaFutureImpl<Map<Node, KafkaFuture<Collection<ConsumerGroupListing>>>> nodeAndConsumerGroupListing = new KafkaFutureImpl<>();
-        final KafkaFutureImpl<Collection<ConsumerGroupListing>> future = new KafkaFutureImpl<Collection<ConsumerGroupListing>>();
+        final Map<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> futuresMap = new HashMap<>();
+        final KafkaFutureImpl<Collection<ConsumerGroupListing>> flattenFuture = new KafkaFutureImpl<>();
+        final KafkaFutureImpl<Void> listFuture = new KafkaFutureImpl<>();
 
         final long nowMetadata = time.milliseconds();
         final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs());
@@ -2334,49 +2354,74 @@ public class KafkaAdminClient extends AdminClient {
         runnable.call(new Call("listNodes", deadline, new LeastLoadedNodeProvider()) {
             @Override
             AbstractRequest.Builder createRequest(int timeoutMs) {
-                return new MetadataRequest.Builder(Collections.<String>emptyList(), true);
+                return new MetadataRequest.Builder(Collections.singletonList(Topic.GROUP_METADATA_TOPIC_NAME), true);
             }
 
             @Override
             void handleResponse(AbstractResponse abstractResponse) {
                 MetadataResponse metadataResponse = (MetadataResponse) abstractResponse;
 
-                final Map<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> futures = new HashMap<>();
-
-                for (final Node node : metadataResponse.brokers()) {
-                    futures.put(node, new KafkaFutureImpl<Collection<ConsumerGroupListing>>());
+                for (final MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) {
+                    if (metadata.topic().equals(Topic.GROUP_METADATA_TOPIC_NAME)) {
+                        for (final MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata()) {
+                            final Node leader = partitionMetadata.leader();
+                            if (partitionMetadata.error() != Errors.NONE) {
+                                // TODO: KAFKA-6789, retry based on the error code
+                                KafkaFutureImpl<Collection<ConsumerGroupListing>> future = new KafkaFutureImpl<>();
+                                future.completeExceptionally(partitionMetadata.error().exception());
+                                // if it is the leader not found error, then the leader might be NoNode; if there are more than
+                                // one such error, we will only have one entry in the map. For now it is okay since we are not
+                                // guaranteeing to return the full list of consumers still.
+                                futuresMap.put(leader, future);
+                            } else {
+                                futuresMap.put(leader, new KafkaFutureImpl<Collection<ConsumerGroupListing>>());
+                            }
+                        }
+                        listFuture.complete(null);
+                    } else {
+                        if (metadata.error() != Errors.NONE)
+                            listFuture.completeExceptionally(metadata.error().exception());
+                        else
+                            listFuture.completeExceptionally(new IllegalStateException("Unexpected topic metadata for "
+                                    + metadata.topic() + " is returned; cannot find the brokers to query consumer listings."));
+                    }
                 }
 
-                future.combine(futures.values().toArray(new KafkaFuture[0])).thenApply(
-                        new KafkaFuture.BaseFunction<Collection<ConsumerGroupListing>, Collection<ConsumerGroupListing>>() {
+                // we have to flatten the future here instead in the result, because we need to wait until the map of nodes
+                // are known from the listNode request.
+                flattenFuture.copyWith(
+                        KafkaFuture.allOf(futuresMap.values().toArray(new KafkaFuture[0])),
+                        new KafkaFuture.BaseFunction<Void, Collection<ConsumerGroupListing>>() {
                             @Override
-                            public Collection<ConsumerGroupListing> apply(Collection<ConsumerGroupListing> v) {
+                            public Collection<ConsumerGroupListing> apply(Void v) {
                                 List<ConsumerGroupListing> listings = new ArrayList<>();
-                                for (Map.Entry<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> entry : futures.entrySet()) {
+                                for (Map.Entry<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> entry : futuresMap.entrySet()) {
                                     Collection<ConsumerGroupListing> results;
                                     try {
                                         results = entry.getValue().get();
+                                        listings.addAll(results);
                                     } catch (Throwable e) {
-                                        // This should be unreachable, since the future returned by KafkaFuture#allOf should
-                                        // have failed if any Future failed.
-                                        throw new KafkaException("ListConsumerGroupsResult#listings(): internal error", e);
+                                        // This should be unreachable, because allOf ensured that all the futures
+                                        // completed successfully.
+                                        throw new RuntimeException(e);
                                     }
-                                    listings.addAll(results);
                                 }
                                 return listings;
                             }
                         });
 
+                for (final Map.Entry<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> entry : futuresMap.entrySet()) {
+                    // skip sending the request for those futures who have already failed
+                    if (entry.getValue().isCompletedExceptionally())
+                        continue;
 
-                for (final Map.Entry<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> entry : futures.entrySet()) {
                     final long nowList = time.milliseconds();
 
                     final int brokerId = entry.getKey().id();
+                    final KafkaFutureImpl<Collection<ConsumerGroupListing>> future = entry.getValue();
 
                     runnable.call(new Call("listConsumerGroups", deadline, new ConstantNodeIdProvider(brokerId)) {
 
-                        private final KafkaFutureImpl<Collection<ConsumerGroupListing>> future = entry.getValue();
-
                         @Override
                         AbstractRequest.Builder createRequest(int timeoutMs) {
                             return new ListGroupsRequest.Builder();
@@ -2385,21 +2430,26 @@ public class KafkaAdminClient extends AdminClient {
                         @Override
                         void handleResponse(AbstractResponse abstractResponse) {
                             final ListGroupsResponse response = (ListGroupsResponse) abstractResponse;
-                            final List<ConsumerGroupListing> groupsListing = new ArrayList<>();
-                            for (ListGroupsResponse.Group group : response.groups()) {
-                                if (group.protocolType().equals(ConsumerProtocol.PROTOCOL_TYPE) || group.protocolType().isEmpty()) {
-                                    final String groupId = group.groupId();
-                                    final String protocolType = group.protocolType();
-                                    final ConsumerGroupListing groupListing = new ConsumerGroupListing(groupId, protocolType.isEmpty());
-                                    groupsListing.add(groupListing);
+
+                            if (response.error() != Errors.NONE) {
+                                future.completeExceptionally(response.error().exception());
+                            } else {
+                                final List<ConsumerGroupListing> groupsListing = new ArrayList<>();
+                                for (ListGroupsResponse.Group group : response.groups()) {
+                                    if (group.protocolType().equals(ConsumerProtocol.PROTOCOL_TYPE) || group.protocolType().isEmpty()) {
+                                        final String groupId = group.groupId();
+                                        final String protocolType = group.protocolType();
+                                        final ConsumerGroupListing groupListing = new ConsumerGroupListing(groupId, protocolType.isEmpty());
+                                        groupsListing.add(groupListing);
+                                    }
                                 }
+                                future.complete(groupsListing);
                             }
-                            future.complete(groupsListing);
                         }
 
                         @Override
                         void handleFailure(Throwable throwable) {
-                            completeAllExceptionally(futures.values(), throwable);
+                            future.completeExceptionally(throwable);
                         }
                     }, nowList);
 
@@ -2408,19 +2458,19 @@ public class KafkaAdminClient extends AdminClient {
 
             @Override
             void handleFailure(Throwable throwable) {
-                future.completeExceptionally(throwable);
+                listFuture.completeExceptionally(throwable);
             }
         }, nowMetadata);
 
-        return new ListConsumerGroupsResult(future);
+        return new ListConsumerGroupsResult(listFuture, flattenFuture, futuresMap);
     }
 
     @Override
     public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId, final ListConsumerGroupOffsetsOptions options) {
         final KafkaFutureImpl<Map<TopicPartition, OffsetAndMetadata>> groupOffsetListingFuture = new KafkaFutureImpl<>();
 
-        final long nowFindCoordinator = time.milliseconds();
-        final long deadline = calcDeadlineMs(nowFindCoordinator, options.timeoutMs());
+        final long startFindCoordinatorMs = time.milliseconds();
+        final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs());
 
         runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) {
             @Override
@@ -2446,14 +2496,25 @@ public class KafkaAdminClient extends AdminClient {
                     void handleResponse(AbstractResponse abstractResponse) {
                         final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse;
                         final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>();
-                        for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry :
-                                response.responseData().entrySet()) {
-                            final TopicPartition topicPartition = entry.getKey();
-                            final Long offset = entry.getValue().offset;
-                            final String metadata = entry.getValue().metadata;
-                            groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, metadata));
+
+                        if (response.hasError()) {
+                            groupOffsetListingFuture.completeExceptionally(response.error().exception());
+                        } else {
+                            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry :
+                                    response.responseData().entrySet()) {
+                                final TopicPartition topicPartition = entry.getKey();
+                                final Errors error = entry.getValue().error;
+
+                                if (error == Errors.NONE) {
+                                    final Long offset = entry.getValue().offset;
+                                    final String metadata = entry.getValue().metadata;
+                                    groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, metadata));
+                                } else {
+                                    log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
+                                }
+                            }
+                            groupOffsetListingFuture.complete(groupOffsetsListing);
                         }
-                        groupOffsetListingFuture.complete(groupOffsetsListing);
                     }
 
                     @Override
@@ -2467,27 +2528,35 @@ public class KafkaAdminClient extends AdminClient {
             void handleFailure(Throwable throwable) {
                 groupOffsetListingFuture.completeExceptionally(throwable);
             }
-        }, nowFindCoordinator);
+        }, startFindCoordinatorMs);
 
         return new ListConsumerGroupOffsetsResult(groupOffsetListingFuture);
     }
 
     @Override
     public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options) {
-        final KafkaFutureImpl<Map<String, KafkaFuture<Void>>> deleteConsumerGroupsFuture = new KafkaFutureImpl<>();
-        final Map<String, KafkaFutureImpl<Void>> deleteConsumerGroupFutures = new HashMap<>(groupIds.size());
-        final Set<String> groupIdList = new HashSet<>();
-        for (String groupId : groupIds) {
-            if (!deleteConsumerGroupFutures.containsKey(groupId)) {
-                deleteConsumerGroupFutures.put(groupId, new KafkaFutureImpl<Void>());
-                groupIdList.add(groupId);
+
+        final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>(groupIds.size());
+        for (String groupId: groupIds) {
+            if (groupIdIsUnrepresentable(groupId)) {
+                KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+                future.completeExceptionally(new ApiException("The given group id '" +
+                        groupId + "' cannot be represented in a request."));
+                futures.put(groupId, future);
+            } else if (!futures.containsKey(groupId)) {
+                futures.put(groupId, new KafkaFutureImpl<Void>());
             }
         }
 
-        for (final String groupId : groupIdList) {
+        // TODO: KAFKA-6788, we should consider grouping the request per coordinator and send one request with a list of
+        // all consumer groups this coordinator host
+        for (final String groupId : groupIds) {
+            // skip sending request for those futures that already failed.
+            if (futures.get(groupId).isCompletedExceptionally())
+                continue;
 
-            final long nowFindCoordinator = time.milliseconds();
-            final long deadline = calcDeadlineMs(nowFindCoordinator, options.timeoutMs());
+            final long startFindCoordinatorMs = time.milliseconds();
+            final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs());
 
             runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) {
                 @Override
@@ -2513,36 +2582,33 @@ public class KafkaAdminClient extends AdminClient {
                         @Override
                         void handleResponse(AbstractResponse abstractResponse) {
                             final DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse;
-                            // Handle server responses for particular groupId.
-                            for (Map.Entry<String, KafkaFutureImpl<Void>> entry : deleteConsumerGroupFutures.entrySet()) {
-                                final String groupId = entry.getKey();
-                                final KafkaFutureImpl<Void> future = entry.getValue();
-                                final Errors groupError = response.get(groupId);
-                                if (groupError != Errors.NONE) {
-                                    future.completeExceptionally(groupError.exception());
-                                    continue;
-                                }
 
+                            KafkaFutureImpl<Void> future = futures.get(groupId);
+                            final Errors groupError = response.get(groupId);
+
+                            if (groupError != Errors.NONE) {
+                                future.completeExceptionally(groupError.exception());
+                            } else {
                                 future.complete(null);
                             }
                         }
 
                         @Override
                         void handleFailure(Throwable throwable) {
-                            completeAllExceptionally(deleteConsumerGroupFutures.values(), throwable);
+                            KafkaFutureImpl<Void> future = futures.get(groupId);
+                            future.completeExceptionally(throwable);
                         }
                     }, nowDeleteConsumerGroups);
-
-                    deleteConsumerGroupsFuture.complete(new HashMap<String, KafkaFuture<Void>>(deleteConsumerGroupFutures));
                 }
 
                 @Override
                 void handleFailure(Throwable throwable) {
-                    deleteConsumerGroupsFuture.completeExceptionally(throwable);
+                    KafkaFutureImpl<Void> future = futures.get(groupId);
+                    future.completeExceptionally(throwable);
                 }
-            }, nowFindCoordinator);
+            }, startFindCoordinatorMs);
         }
 
-        return new DeleteConsumerGroupsResult(deleteConsumerGroupsFuture);
+        return new DeleteConsumerGroupsResult(new HashMap<String, KafkaFuture<Void>>(futures));
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
index c725371..c3f1236 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
@@ -18,9 +18,14 @@
 package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.AbstractIterator;
 
 import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
 
 /**
  * The result of the {@link AdminClient#listConsumerGroups()} call.
@@ -29,16 +34,70 @@ import java.util.Collection;
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupsResult {
-    private final KafkaFuture<Collection<ConsumerGroupListing>> future;
+    private final Map<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> futuresMap;
+    private final KafkaFuture<Collection<ConsumerGroupListing>> flattenFuture;
+    private final KafkaFuture<Void> listFuture;
 
-    ListConsumerGroupsResult(KafkaFuture<Collection<ConsumerGroupListing>> future) {
-        this.future = future;
+    ListConsumerGroupsResult(final KafkaFuture<Void> listFuture,
+                             final KafkaFuture<Collection<ConsumerGroupListing>> flattenFuture,
+                             final Map<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> futuresMap) {
+        this.flattenFuture = flattenFuture;
+        this.listFuture = listFuture;
+        this.futuresMap = futuresMap;
+    }
+
+    private class FutureConsumerGroupListingIterator extends AbstractIterator<KafkaFuture<ConsumerGroupListing>> {
+        private Iterator<KafkaFutureImpl<Collection<ConsumerGroupListing>>> futuresIter;
+        private Iterator<ConsumerGroupListing> innerIter;
+
+        @Override
+        protected KafkaFuture<ConsumerGroupListing> makeNext() {
+            if (futuresIter == null) {
+                try {
+                    listFuture.get();
+                } catch (Exception e) {
+                    // the list future has failed, there will be no listings to show at all
+                    return allDone();
+                }
+
+                futuresIter = futuresMap.values().iterator();
+            }
+
+            while (innerIter == null || !innerIter.hasNext()) {
+                if (futuresIter.hasNext()) {
+                    KafkaFuture<Collection<ConsumerGroupListing>> collectionFuture = futuresIter.next();
+                    try {
+                        Collection<ConsumerGroupListing> collection = collectionFuture.get();
+                        innerIter = collection.iterator();
+                    } catch (Exception e) {
+                        KafkaFutureImpl<ConsumerGroupListing> future = new KafkaFutureImpl<>();
+                        future.completeExceptionally(e);
+                        return future;
+                    }
+                } else {
+                    return allDone();
+                }
+            }
+
+            KafkaFutureImpl<ConsumerGroupListing> future = new KafkaFutureImpl<>();
+            future.complete(innerIter.next());
+            return future;
+        }
+    }
+
+    /**
+     * Return an iterator of futures for ConsumerGroupListing objects; the returned future will throw exception
+     * if we cannot get a complete collection of consumer listings.
+     */
+    public Iterator<KafkaFuture<ConsumerGroupListing>> iterator() {
+        return new FutureConsumerGroupListingIterator();
     }
 
     /**
-     * Return a future which yields a collection of ConsumerGroupListing objects.
+     * Return a future which yields a full collection of ConsumerGroupListing objects; will throw exception
+     * if we cannot get a complete collection of consumer listings.
      */
-    public KafkaFuture<Collection<ConsumerGroupListing>> listings() {
-        return future;
+    public KafkaFuture<Collection<ConsumerGroupListing>> all() {
+        return flattenFuture;
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
index 4af996c..9cd2e01 100644
--- a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
+++ b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
@@ -106,15 +106,6 @@ public abstract class KafkaFuture<T> implements Future<T> {
         return allOfFuture;
     }
 
-    public KafkaFuture<T> combine(KafkaFuture<?>... futures) {
-        AllOfAdapter<Object> allOfWaiter = new AllOfAdapter<>(futures.length, this);
-        for (KafkaFuture<?> future : futures) {
-            future.addWaiter(allOfWaiter);
-        }
-
-        return this;
-    }
-
     /**
      * Returns a new KafkaFuture that, when this future completes normally, is executed with this
      * futures's result as the argument to the supplied function.
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
index b1e5b6d..33916ac 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
@@ -141,11 +141,15 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
      */
     @Override
     public <R> KafkaFuture<R> thenApply(BaseFunction<T, R> function) {
-        KafkaFutureImpl<R> future = new KafkaFutureImpl<R>();
+        KafkaFutureImpl<R> future = new KafkaFutureImpl<>();
         addWaiter(new Applicant<>(function, future));
         return future;
     }
 
+    public <R> void copyWith(KafkaFuture<R> future, BaseFunction<R, T> function) {
+        KafkaFutureImpl<R> futureImpl = (KafkaFutureImpl<R>) future;
+        futureImpl.addWaiter(new Applicant<>(function, this));
+    }
 
     /**
      * @See KafkaFutureImpl#thenApply(BaseFunction)
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index a73175c..37b43e5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -73,6 +73,7 @@ public class MockClient implements KafkaClient {
 
     }
 
+    private int correlation;
     private final Time time;
     private final Metadata metadata;
     private Set<String> unavailableTopics;
@@ -464,7 +465,7 @@ public class MockClient implements KafkaClient {
     public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs,
                                           boolean expectResponse, RequestCompletionHandler callback) {
         totalRequestCount.incrementAndGet();
-        return new ClientRequest(nodeId, requestBuilder, 0, "mockClientId", createdTimeMs,
+        return new ClientRequest(nodeId, requestBuilder, correlation++, "mockClientId", createdTimeMs,
                 expectResponse, callback);
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index d224413..d2789b6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -41,6 +41,7 @@ import org.apache.kafka.common.errors.OffsetOutOfRangeException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateAclsResponse;
@@ -79,6 +80,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -647,11 +649,15 @@ public class KafkaAdminClientTest {
     }
 
     //Ignoring test to be fixed on follow-up PR
-    @Ignore
     @Test
-    public void testListConsumerGroups() throws Exception {
+    public void testListConsumerGroups() {
         final HashMap<Integer, Node> nodes = new HashMap<>();
-        nodes.put(0, new Node(0, "localhost", 8121));
+        Node node0 = new Node(0, "localhost", 8121);
+        Node node1 = new Node(1, "localhost", 8122);
+        Node node2 = new Node(2, "localhost", 8123);
+        nodes.put(0, node0);
+        nodes.put(1, node1);
+        nodes.put(2, node2);
 
         final Cluster cluster =
             new Cluster(
@@ -666,27 +672,72 @@ public class KafkaAdminClientTest {
             env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
 
-            env.kafkaClient().prepareResponse(
-                new MetadataResponse(
-                    env.cluster().nodes(),
-                    env.cluster().clusterResource().clusterId(),
-                    env.cluster().controller().id(),
-                    new ArrayList<MetadataResponse.TopicMetadata>()));
+            List<MetadataResponse.PartitionMetadata> partitionMetadata = new ArrayList<>();
+            partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 0, node0,
+                    Collections.singletonList(node0), Collections.singletonList(node0), Collections.<Node>emptyList()));
+            partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 0, node1,
+                    Collections.singletonList(node1), Collections.singletonList(node1), Collections.<Node>emptyList()));
+            partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 0, node2,
+                    Collections.singletonList(node2), Collections.singletonList(node2), Collections.<Node>emptyList()));
 
             env.kafkaClient().prepareResponse(
-                new ListGroupsResponse(
-                    Errors.NONE,
-                    Arrays.asList(
-                        new ListGroupsResponse.Group("group-1", ConsumerProtocol.PROTOCOL_TYPE),
-                        new ListGroupsResponse.Group("group-connect-1", "connector")
-                    )));
+                    new MetadataResponse(
+                            env.cluster().nodes(),
+                            env.cluster().clusterResource().clusterId(),
+                            env.cluster().controller().id(),
+                            Collections.singletonList(new MetadataResponse.TopicMetadata(Errors.NONE, Topic.GROUP_METADATA_TOPIC_NAME, true, partitionMetadata))));
+
+            env.kafkaClient().prepareResponseFrom(
+                    new ListGroupsResponse(
+                            Errors.NONE,
+                            Arrays.asList(
+                                    new ListGroupsResponse.Group("group-1", ConsumerProtocol.PROTOCOL_TYPE),
+                                    new ListGroupsResponse.Group("group-connect-1", "connector")
+                            )),
+                    node0);
+
+            env.kafkaClient().prepareResponseFrom(
+                    new ListGroupsResponse(
+                            Errors.COORDINATOR_NOT_AVAILABLE,
+                            Collections.<ListGroupsResponse.Group>emptyList()
+                    ),
+                    node1);
+
+            env.kafkaClient().prepareResponseFrom(
+                    new ListGroupsResponse(
+                            Errors.NONE,
+                            Arrays.asList(
+                                    new ListGroupsResponse.Group("group-2", ConsumerProtocol.PROTOCOL_TYPE),
+                                    new ListGroupsResponse.Group("group-connect-2", "connector")
+                            )),
+                    node2);
 
             final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups();
-            final List<ConsumerGroupListing> consumerGroups = new ArrayList<>();
 
-            final KafkaFuture<Collection<ConsumerGroupListing>> listings = result.listings();
-            consumerGroups.addAll(listings.get());
-            assertEquals(1, consumerGroups.size());
+            try {
+                Collection<ConsumerGroupListing> listing = result.all().get();
+                fail("Expected to throw exception");
+            } catch (Exception e) {
+                // this is good
+            }
+
+            Iterator<KafkaFuture<ConsumerGroupListing>> iterator = result.iterator();
+            int numListing = 0;
+            int numFailure = 0;
+
+            while (iterator.hasNext()) {
+                KafkaFuture<ConsumerGroupListing> future = iterator.next();
+                try {
+                    ConsumerGroupListing listing = future.get();
+                    numListing++;
+                    assertTrue(listing.groupId().equals("group-1") || listing.groupId().equals("group-2"));
+                } catch (Exception e) {
+                    numFailure++;
+                }
+            }
+
+            assertEquals(2, numListing);
+            assertEquals(1, numFailure);
         }
     }
 
@@ -746,17 +797,15 @@ public class KafkaAdminClientTest {
             env.kafkaClient().prepareResponse(new DescribeGroupsResponse(groupMetadataMap));
 
             final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(Collections.singletonList("group-0"));
-            final KafkaFuture<ConsumerGroupDescription> groupDescriptionFuture = result.describedGroups().get().get("group-0");
-            final ConsumerGroupDescription groupDescription = groupDescriptionFuture.get();
+            final ConsumerGroupDescription groupDescription = result.describedGroups().get("group-0").get();
 
-            assertEquals(1, result.describedGroups().get().size());
+            assertEquals(1, result.describedGroups().size());
             assertEquals("group-0", groupDescription.groupId());
             assertEquals(2, groupDescription.members().size());
         }
     }
 
     @Test
-    @Ignore
     public void testDescribeConsumerGroupOffsets() throws Exception {
         final HashMap<Integer, Node> nodes = new HashMap<>();
         nodes.put(0, new Node(0, "localhost", 8121));
@@ -787,12 +836,12 @@ public class KafkaAdminClientTest {
             env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NONE, responseData));
 
             final ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets("group-0");
+            final Map<TopicPartition, OffsetAndMetadata> partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata().get();
 
-            assertEquals(3, result.partitionsToOffsetAndMetadata().get().size());
-            final TopicPartition topicPartition = result.partitionsToOffsetAndMetadata().get().keySet().iterator().next();
-            assertEquals("my_topic", topicPartition.topic());
-            final OffsetAndMetadata offsetAndMetadata = result.partitionsToOffsetAndMetadata().get().values().iterator().next();
-            assertEquals(10, offsetAndMetadata.offset());
+            assertEquals(3, partitionToOffsetAndMetadata.size());
+            assertEquals(10, partitionToOffsetAndMetadata.get(myTopicPartition0).offset());
+            assertEquals(0, partitionToOffsetAndMetadata.get(myTopicPartition1).offset());
+            assertEquals(20, partitionToOffsetAndMetadata.get(myTopicPartition2).offset());
         }
     }
 
@@ -824,8 +873,8 @@ public class KafkaAdminClientTest {
 
             final DeleteConsumerGroupsResult result = env.adminClient().deleteConsumerGroups(groupIds);
 
-            final Map<String, KafkaFuture<Void>> results = result.deletedGroups().get();
-            assertNull(results.get("group-0").get());
+            final KafkaFuture<Void> results = result.deletedGroups().get("group-0");
+            assertNull(results.get());
         }
     }
 

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.