You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/04/26 00:49:09 UTC

[kafka] branch trunk updated: MINOR: Refactor AdminClient ListConsumerGroups API (#4884)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6be908a  MINOR: Refactor AdminClient ListConsumerGroups API (#4884)
6be908a is described below

commit 6be908a8296456adee254b405605acff55fd47a5
Author: Colin Patrick McCabe <co...@cmccabe.xyz>
AuthorDate: Wed Apr 25 17:49:02 2018 -0700

    MINOR: Refactor AdminClient ListConsumerGroups API (#4884)
    
    The current Iterator-based ListConsumerGroups API is synchronous.  The API should be asynchronous to fit in with the other AdminClient APIs.  Also fix some error handling corner cases.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../kafka/clients/admin/KafkaAdminClient.java      | 160 +++++++++++----------
 .../clients/admin/ListConsumerGroupsResult.java    | 109 +++++++-------
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  52 ++++---
 3 files changed, 166 insertions(+), 155 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index fa3f943..d8c0bad 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -49,6 +49,7 @@ import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownServerException;
@@ -2342,16 +2343,56 @@ public class KafkaAdminClient extends AdminClient {
         return new DescribeConsumerGroupsResult(new HashMap<String, KafkaFuture<ConsumerGroupDescription>>(futures));
     }
 
+    private final static class ListConsumerGroupsResults {
+        private final List<Throwable> errors;
+        private final HashMap<String, ConsumerGroupListing> listings;
+        private final HashSet<Node> remaining;
+        private final KafkaFutureImpl<Collection<Object>> future;
+
+        ListConsumerGroupsResults(Collection<Throwable> errors, Collection<Node> leaders,
+                                  KafkaFutureImpl<Collection<Object>> future) {
+            this.errors = new ArrayList<>(errors);
+            this.listings = new HashMap<>();
+            this.remaining = new HashSet<>(leaders);
+            this.future = future;
+            tryComplete();
+        }
+
+        synchronized void addError(Throwable throwable, Node node) {
+            ApiError error = ApiError.fromThrowable(throwable);
+            if (error.message() == null || error.message().isEmpty()) {
+                errors.add(error.error().exception(
+                    "Error listing groups on " + node));
+            } else {
+                errors.add(error.error().exception(
+                    "Error listing groups on " + node + ": " + error.message()));
+            }
+        }
+
+        synchronized void addListing(ConsumerGroupListing listing) {
+            listings.put(listing.groupId(), listing);
+        }
+
+        synchronized void tryComplete(Node leader) {
+            remaining.remove(leader);
+            tryComplete();
+        }
+
+        private synchronized void tryComplete() {
+            if (remaining.isEmpty()) {
+                ArrayList<Object> results = new ArrayList<Object>(listings.values());
+                results.addAll(errors);
+                future.complete(results);
+            }
+        }
+    };
+
     @Override
     public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) {
-        final Map<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> futuresMap = new HashMap<>();
-        final KafkaFutureImpl<Collection<ConsumerGroupListing>> flattenFuture = new KafkaFutureImpl<>();
-        final KafkaFutureImpl<Void> listFuture = new KafkaFutureImpl<>();
-
+        final KafkaFutureImpl<Collection<Object>> all = new KafkaFutureImpl<>();
         final long nowMetadata = time.milliseconds();
         final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs());
-
-        runnable.call(new Call("listNodes", deadline, new LeastLoadedNodeProvider()) {
+        runnable.call(new Call("findGroupsMetadata", deadline, new LeastLoadedNodeProvider()) {
             @Override
             AbstractRequest.Builder createRequest(int timeoutMs) {
                 return new MetadataRequest.Builder(Collections.singletonList(Topic.GROUP_METADATA_TOPIC_NAME), true);
@@ -2360,68 +2401,38 @@ public class KafkaAdminClient extends AdminClient {
             @Override
             void handleResponse(AbstractResponse abstractResponse) {
                 MetadataResponse metadataResponse = (MetadataResponse) abstractResponse;
-
+                final List<Throwable> metadataExceptions = new ArrayList<>();
+                final HashSet<Node> leaders = new HashSet<>();
                 for (final MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) {
-                    if (metadata.topic().equals(Topic.GROUP_METADATA_TOPIC_NAME)) {
+                    if (metadata.error() != Errors.NONE) {
+                        metadataExceptions.add(metadata.error().exception("Unable to locate " +
+                            Topic.GROUP_METADATA_TOPIC_NAME));
+                    } else if (!metadata.topic().equals(Topic.GROUP_METADATA_TOPIC_NAME)) {
+                        metadataExceptions.add(new UnknownServerException("Server returned unrequested " +
+                            "information about unexpected topic " + metadata.topic()));
+                    } else {
                         for (final MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata()) {
                             final Node leader = partitionMetadata.leader();
                             if (partitionMetadata.error() != Errors.NONE) {
                                 // TODO: KAFKA-6789, retry based on the error code
-                                KafkaFutureImpl<Collection<ConsumerGroupListing>> future = new KafkaFutureImpl<>();
-                                future.completeExceptionally(partitionMetadata.error().exception());
-                                // if it is the leader not found error, then the leader might be NoNode; if there are more than
-                                // one such error, we will only have one entry in the map. For now it is okay since we are not
-                                // guaranteeing to return the full list of consumers still.
-                                futuresMap.put(leader, future);
+                                metadataExceptions.add(partitionMetadata.error().exception("Unable to find " +
+                                    "leader for partition " + partitionMetadata.partition() + " of " +
+                                    Topic.GROUP_METADATA_TOPIC_NAME));
+                            } else if (leader == null || leader.equals(Node.noNode())) {
+                                metadataExceptions.add(new LeaderNotAvailableException("Unable to find leader " +
+                                    "for partition " + partitionMetadata.partition() + " of " +
+                                    Topic.GROUP_METADATA_TOPIC_NAME));
                             } else {
-                                futuresMap.put(leader, new KafkaFutureImpl<Collection<ConsumerGroupListing>>());
+                                leaders.add(leader);
                             }
                         }
-                        listFuture.complete(null);
-                    } else {
-                        if (metadata.error() != Errors.NONE)
-                            listFuture.completeExceptionally(metadata.error().exception());
-                        else
-                            listFuture.completeExceptionally(new IllegalStateException("Unexpected topic metadata for "
-                                    + metadata.topic() + " is returned; cannot find the brokers to query consumer listings."));
                     }
                 }
-
-                // we have to flatten the future here instead in the result, because we need to wait until the map of nodes
-                // are known from the listNode request.
-                flattenFuture.copyWith(
-                        KafkaFuture.allOf(futuresMap.values().toArray(new KafkaFuture[0])),
-                        new KafkaFuture.BaseFunction<Void, Collection<ConsumerGroupListing>>() {
-                            @Override
-                            public Collection<ConsumerGroupListing> apply(Void v) {
-                                List<ConsumerGroupListing> listings = new ArrayList<>();
-                                for (Map.Entry<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> entry : futuresMap.entrySet()) {
-                                    Collection<ConsumerGroupListing> results;
-                                    try {
-                                        results = entry.getValue().get();
-                                        listings.addAll(results);
-                                    } catch (Throwable e) {
-                                        // This should be unreachable, because allOf ensured that all the futures
-                                        // completed successfully.
-                                        throw new RuntimeException(e);
-                                    }
-                                }
-                                return listings;
-                            }
-                        });
-
-                for (final Map.Entry<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> entry : futuresMap.entrySet()) {
-                    // skip sending the request for those futures who have already failed
-                    if (entry.getValue().isCompletedExceptionally())
-                        continue;
-
+                final ListConsumerGroupsResults results =
+                    new ListConsumerGroupsResults(metadataExceptions, leaders, all);
+                for (final Node node : leaders) {
                     final long nowList = time.milliseconds();
-
-                    final int brokerId = entry.getKey().id();
-                    final KafkaFutureImpl<Collection<ConsumerGroupListing>> future = entry.getValue();
-
-                    runnable.call(new Call("listConsumerGroups", deadline, new ConstantNodeIdProvider(brokerId)) {
-
+                    runnable.call(new Call("listConsumerGroups", deadline, new ConstantNodeIdProvider(node.id())) {
                         @Override
                         AbstractRequest.Builder createRequest(int timeoutMs) {
                             return new ListGroupsRequest.Builder();
@@ -2430,39 +2441,42 @@ public class KafkaAdminClient extends AdminClient {
                         @Override
                         void handleResponse(AbstractResponse abstractResponse) {
                             final ListGroupsResponse response = (ListGroupsResponse) abstractResponse;
-
-                            if (response.error() != Errors.NONE) {
-                                future.completeExceptionally(response.error().exception());
-                            } else {
-                                final List<ConsumerGroupListing> groupsListing = new ArrayList<>();
-                                for (ListGroupsResponse.Group group : response.groups()) {
-                                    if (group.protocolType().equals(ConsumerProtocol.PROTOCOL_TYPE) || group.protocolType().isEmpty()) {
-                                        final String groupId = group.groupId();
-                                        final String protocolType = group.protocolType();
-                                        final ConsumerGroupListing groupListing = new ConsumerGroupListing(groupId, protocolType.isEmpty());
-                                        groupsListing.add(groupListing);
+                            synchronized (results) {
+                                if (response.error() != Errors.NONE) {
+                                    results.addError(response.error().exception(), node);
+                                } else {
+                                    for (ListGroupsResponse.Group group : response.groups()) {
+                                        if (group.protocolType().equals(ConsumerProtocol.PROTOCOL_TYPE) ||
+                                            group.protocolType().isEmpty()) {
+                                            final String groupId = group.groupId();
+                                            final String protocolType = group.protocolType();
+                                            final ConsumerGroupListing groupListing = new ConsumerGroupListing(groupId, protocolType.isEmpty());
+                                            results.addListing(groupListing);
+                                        }
                                     }
                                 }
-                                future.complete(groupsListing);
+                                results.tryComplete(node);
                             }
                         }
 
                         @Override
                         void handleFailure(Throwable throwable) {
-                            future.completeExceptionally(throwable);
+                            synchronized (results) {
+                                results.addError(throwable, node);
+                                results.tryComplete(node);
+                            }
                         }
                     }, nowList);
-
                 }
             }
 
             @Override
             void handleFailure(Throwable throwable) {
-                listFuture.completeExceptionally(throwable);
+                all.complete(Collections.<Object>singletonList(throwable));
             }
         }, nowMetadata);
 
-        return new ListConsumerGroupsResult(listFuture, flattenFuture, futuresMap);
+        return new ListConsumerGroupsResult(all);
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
index c3f1236..0ac8529 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
@@ -18,14 +18,11 @@
 package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.Node;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
-import org.apache.kafka.common.utils.AbstractIterator;
 
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
 
 /**
  * The result of the {@link AdminClient#listConsumerGroups()} call.
@@ -34,70 +31,72 @@ import java.util.Map;
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupsResult {
-    private final Map<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> futuresMap;
-    private final KafkaFuture<Collection<ConsumerGroupListing>> flattenFuture;
-    private final KafkaFuture<Void> listFuture;
+    private final KafkaFutureImpl<Collection<ConsumerGroupListing>> all;
+    private final KafkaFutureImpl<Collection<ConsumerGroupListing>> valid;
+    private final KafkaFutureImpl<Collection<Throwable>> errors;
 
-    ListConsumerGroupsResult(final KafkaFuture<Void> listFuture,
-                             final KafkaFuture<Collection<ConsumerGroupListing>> flattenFuture,
-                             final Map<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> futuresMap) {
-        this.flattenFuture = flattenFuture;
-        this.listFuture = listFuture;
-        this.futuresMap = futuresMap;
-    }
-
-    private class FutureConsumerGroupListingIterator extends AbstractIterator<KafkaFuture<ConsumerGroupListing>> {
-        private Iterator<KafkaFutureImpl<Collection<ConsumerGroupListing>>> futuresIter;
-        private Iterator<ConsumerGroupListing> innerIter;
-
-        @Override
-        protected KafkaFuture<ConsumerGroupListing> makeNext() {
-            if (futuresIter == null) {
-                try {
-                    listFuture.get();
-                } catch (Exception e) {
-                    // the list future has failed, there will be no listings to show at all
-                    return allDone();
-                }
-
-                futuresIter = futuresMap.values().iterator();
-            }
-
-            while (innerIter == null || !innerIter.hasNext()) {
-                if (futuresIter.hasNext()) {
-                    KafkaFuture<Collection<ConsumerGroupListing>> collectionFuture = futuresIter.next();
-                    try {
-                        Collection<ConsumerGroupListing> collection = collectionFuture.get();
-                        innerIter = collection.iterator();
-                    } catch (Exception e) {
-                        KafkaFutureImpl<ConsumerGroupListing> future = new KafkaFutureImpl<>();
-                        future.completeExceptionally(e);
-                        return future;
+    ListConsumerGroupsResult(KafkaFutureImpl<Collection<Object>> future) {
+        this.all = new KafkaFutureImpl<>();
+        this.valid = new KafkaFutureImpl<>();
+        this.errors = new KafkaFutureImpl<>();
+        future.thenApply(new KafkaFuture.BaseFunction<Collection<Object>, Void>() {
+            @Override
+            public Void apply(Collection<Object> results) {
+                ArrayList<Throwable> curErrors = new ArrayList<>();
+                ArrayList<ConsumerGroupListing> curValid = new ArrayList<>();
+                for (Object resultObject : results) {
+                    if (resultObject instanceof Throwable) {
+                        curErrors.add((Throwable) resultObject);
+                    } else {
+                        curValid.add((ConsumerGroupListing) resultObject);
                     }
+                }
+                if (!curErrors.isEmpty()) {
+                    all.completeExceptionally(curErrors.get(0));
                 } else {
-                    return allDone();
+                    all.complete(curValid);
                 }
+                valid.complete(curValid);
+                errors.complete(curErrors);
+                return null;
             }
+        });
+    }
 
-            KafkaFutureImpl<ConsumerGroupListing> future = new KafkaFutureImpl<>();
-            future.complete(innerIter.next());
-            return future;
-        }
+    /**
+     * Returns a future that yields either an exception, or the full set of consumer group
+     * listings.
+     *
+     * In the event of a failure, the future yields nothing but the first exception which
+     * occurred.
+     */
+    public KafkaFutureImpl<Collection<ConsumerGroupListing>> all() {
+        return all;
     }
 
     /**
-     * Return an iterator of futures for ConsumerGroupListing objects; the returned future will throw exception
-     * if we cannot get a complete collection of consumer listings.
+     * Returns a future which yields just the valid listings.
+     *
+     * This future never fails with an error, no matter what happens.  Errors are completely
+     * ignored.  If nothing can be fetched, an empty collection is yielded.
+     * If there is an error, but some results can be returned, this future will yield
+     * those partial results.  When using this future, it is a good idea to also check
+     * the errors future so that errors can be displayed and handled.
      */
-    public Iterator<KafkaFuture<ConsumerGroupListing>> iterator() {
-        return new FutureConsumerGroupListingIterator();
+    public KafkaFutureImpl<Collection<ConsumerGroupListing>> valid() {
+        return valid;
     }
 
     /**
-     * Return a future which yields a full collection of ConsumerGroupListing objects; will throw exception
-     * if we cannot get a complete collection of consumer listings.
+     * Returns a future which yields just the errors which occurred.
+     *
+     * If this future yields a non-empty collection, it is very likely that elements are
+     * missing from the valid() set.
+     *
+     * This future itself never fails with an error.  In the event of an error, this future
+     * will successfully yield a collection containing at least one exception.
      */
-    public KafkaFuture<Collection<ConsumerGroupListing>> all() {
-        return flattenFuture;
+    public KafkaFutureImpl<Collection<Throwable>> errors() {
+        return errors;
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index d2789b6..0debed3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.NotLeaderForPartitionException;
@@ -80,7 +81,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -648,9 +648,8 @@ public class KafkaAdminClientTest {
         }
     }
 
-    //Ignoring test to be fixed on follow-up PR
     @Test
-    public void testListConsumerGroups() {
+    public void testListConsumerGroups() throws Exception {
         final HashMap<Integer, Node> nodes = new HashMap<>();
         Node node0 = new Node(0, "localhost", 8121);
         Node node1 = new Node(1, "localhost", 8122);
@@ -685,7 +684,8 @@ public class KafkaAdminClientTest {
                             env.cluster().nodes(),
                             env.cluster().clusterResource().clusterId(),
                             env.cluster().controller().id(),
-                            Collections.singletonList(new MetadataResponse.TopicMetadata(Errors.NONE, Topic.GROUP_METADATA_TOPIC_NAME, true, partitionMetadata))));
+                            Collections.singletonList(new MetadataResponse.TopicMetadata(Errors.NONE,
+                                Topic.GROUP_METADATA_TOPIC_NAME, true, partitionMetadata))));
 
             env.kafkaClient().prepareResponseFrom(
                     new ListGroupsResponse(
@@ -713,31 +713,29 @@ public class KafkaAdminClientTest {
                     node2);
 
             final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups();
-
-            try {
-                Collection<ConsumerGroupListing> listing = result.all().get();
-                fail("Expected to throw exception");
-            } catch (Exception e) {
-                // this is good
-            }
-
-            Iterator<KafkaFuture<ConsumerGroupListing>> iterator = result.iterator();
-            int numListing = 0;
-            int numFailure = 0;
-
-            while (iterator.hasNext()) {
-                KafkaFuture<ConsumerGroupListing> future = iterator.next();
-                try {
-                    ConsumerGroupListing listing = future.get();
-                    numListing++;
-                    assertTrue(listing.groupId().equals("group-1") || listing.groupId().equals("group-2"));
-                } catch (Exception e) {
-                    numFailure++;
-                }
+            assertFutureError(result.all(), CoordinatorNotAvailableException.class);
+            Collection<ConsumerGroupListing> listings = result.valid().get();
+            assertEquals(2, listings.size());
+            for (ConsumerGroupListing listing : listings) {
+                assertTrue(listing.groupId().equals("group-1") || listing.groupId().equals("group-2"));
             }
+            assertEquals(1, result.errors().get().size());
 
-            assertEquals(2, numListing);
-            assertEquals(1, numFailure);
+            // Test handling the error where we are unable to get metadata for the __consumer_offsets topic.
+            env.kafkaClient().prepareResponse(
+                new MetadataResponse(
+                    env.cluster().nodes(),
+                    env.cluster().clusterResource().clusterId(),
+                    env.cluster().controller().id(),
+                    Collections.singletonList(new MetadataResponse.TopicMetadata(
+                        Errors.UNKNOWN_TOPIC_OR_PARTITION, Topic.GROUP_METADATA_TOPIC_NAME,
+                        true, Collections.<MetadataResponse.PartitionMetadata>emptyList()))));
+            final ListConsumerGroupsResult result2 = env.adminClient().listConsumerGroups();
+            Collection<Throwable> errors = result2.errors().get();
+            assertEquals(1, errors.size());
+            assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.forException(errors.iterator().next()));
+            assertTrue(result2.valid().get().isEmpty());
+            assertFutureError(result2.all(), UnknownTopicOrPartitionException.class);
         }
     }
 

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.