You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/04/26 00:49:09 UTC
[kafka] branch trunk updated: MINOR: Refactor AdminClient
ListConsumerGroups API (#4884)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6be908a MINOR: Refactor AdminClient ListConsumerGroups API (#4884)
6be908a is described below
commit 6be908a8296456adee254b405605acff55fd47a5
Author: Colin Patrick McCabe <co...@cmccabe.xyz>
AuthorDate: Wed Apr 25 17:49:02 2018 -0700
MINOR: Refactor AdminClient ListConsumerGroups API (#4884)
The current Iterator-based ListConsumerGroups API is synchronous. The API should be asynchronous to fit in with the other AdminClient APIs. Also fix some error handling corner cases.
Reviewers: Guozhang Wang <wa...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
.../kafka/clients/admin/KafkaAdminClient.java | 160 +++++++++++----------
.../clients/admin/ListConsumerGroupsResult.java | 109 +++++++-------
.../kafka/clients/admin/KafkaAdminClientTest.java | 52 ++++---
3 files changed, 166 insertions(+), 155 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index fa3f943..d8c0bad 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -49,6 +49,7 @@ import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownServerException;
@@ -2342,16 +2343,56 @@ public class KafkaAdminClient extends AdminClient {
return new DescribeConsumerGroupsResult(new HashMap<String, KafkaFuture<ConsumerGroupDescription>>(futures));
}
+ private final static class ListConsumerGroupsResults {
+ private final List<Throwable> errors;
+ private final HashMap<String, ConsumerGroupListing> listings;
+ private final HashSet<Node> remaining;
+ private final KafkaFutureImpl<Collection<Object>> future;
+
+ ListConsumerGroupsResults(Collection<Throwable> errors, Collection<Node> leaders,
+ KafkaFutureImpl<Collection<Object>> future) {
+ this.errors = new ArrayList<>(errors);
+ this.listings = new HashMap<>();
+ this.remaining = new HashSet<>(leaders);
+ this.future = future;
+ tryComplete();
+ }
+
+ synchronized void addError(Throwable throwable, Node node) {
+ ApiError error = ApiError.fromThrowable(throwable);
+ if (error.message() == null || error.message().isEmpty()) {
+ errors.add(error.error().exception(
+ "Error listing groups on " + node));
+ } else {
+ errors.add(error.error().exception(
+ "Error listing groups on " + node + ": " + error.message()));
+ }
+ }
+
+ synchronized void addListing(ConsumerGroupListing listing) {
+ listings.put(listing.groupId(), listing);
+ }
+
+ synchronized void tryComplete(Node leader) {
+ remaining.remove(leader);
+ tryComplete();
+ }
+
+ private synchronized void tryComplete() {
+ if (remaining.isEmpty()) {
+ ArrayList<Object> results = new ArrayList<Object>(listings.values());
+ results.addAll(errors);
+ future.complete(results);
+ }
+ }
+ };
+
@Override
public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) {
- final Map<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> futuresMap = new HashMap<>();
- final KafkaFutureImpl<Collection<ConsumerGroupListing>> flattenFuture = new KafkaFutureImpl<>();
- final KafkaFutureImpl<Void> listFuture = new KafkaFutureImpl<>();
-
+ final KafkaFutureImpl<Collection<Object>> all = new KafkaFutureImpl<>();
final long nowMetadata = time.milliseconds();
final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs());
-
- runnable.call(new Call("listNodes", deadline, new LeastLoadedNodeProvider()) {
+ runnable.call(new Call("findGroupsMetadata", deadline, new LeastLoadedNodeProvider()) {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new MetadataRequest.Builder(Collections.singletonList(Topic.GROUP_METADATA_TOPIC_NAME), true);
@@ -2360,68 +2401,38 @@ public class KafkaAdminClient extends AdminClient {
@Override
void handleResponse(AbstractResponse abstractResponse) {
MetadataResponse metadataResponse = (MetadataResponse) abstractResponse;
-
+ final List<Throwable> metadataExceptions = new ArrayList<>();
+ final HashSet<Node> leaders = new HashSet<>();
for (final MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) {
- if (metadata.topic().equals(Topic.GROUP_METADATA_TOPIC_NAME)) {
+ if (metadata.error() != Errors.NONE) {
+ metadataExceptions.add(metadata.error().exception("Unable to locate " +
+ Topic.GROUP_METADATA_TOPIC_NAME));
+ } else if (!metadata.topic().equals(Topic.GROUP_METADATA_TOPIC_NAME)) {
+ metadataExceptions.add(new UnknownServerException("Server returned unrequested " +
+ "information about unexpected topic " + metadata.topic()));
+ } else {
for (final MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata()) {
final Node leader = partitionMetadata.leader();
if (partitionMetadata.error() != Errors.NONE) {
// TODO: KAFKA-6789, retry based on the error code
- KafkaFutureImpl<Collection<ConsumerGroupListing>> future = new KafkaFutureImpl<>();
- future.completeExceptionally(partitionMetadata.error().exception());
- // if it is the leader not found error, then the leader might be NoNode; if there are more than
- // one such error, we will only have one entry in the map. For now it is okay since we are not
- // guaranteeing to return the full list of consumers still.
- futuresMap.put(leader, future);
+ metadataExceptions.add(partitionMetadata.error().exception("Unable to find " +
+ "leader for partition " + partitionMetadata.partition() + " of " +
+ Topic.GROUP_METADATA_TOPIC_NAME));
+ } else if (leader == null || leader.equals(Node.noNode())) {
+ metadataExceptions.add(new LeaderNotAvailableException("Unable to find leader " +
+ "for partition " + partitionMetadata.partition() + " of " +
+ Topic.GROUP_METADATA_TOPIC_NAME));
} else {
- futuresMap.put(leader, new KafkaFutureImpl<Collection<ConsumerGroupListing>>());
+ leaders.add(leader);
}
}
- listFuture.complete(null);
- } else {
- if (metadata.error() != Errors.NONE)
- listFuture.completeExceptionally(metadata.error().exception());
- else
- listFuture.completeExceptionally(new IllegalStateException("Unexpected topic metadata for "
- + metadata.topic() + " is returned; cannot find the brokers to query consumer listings."));
}
}
-
- // we have to flatten the future here instead in the result, because we need to wait until the map of nodes
- // are known from the listNode request.
- flattenFuture.copyWith(
- KafkaFuture.allOf(futuresMap.values().toArray(new KafkaFuture[0])),
- new KafkaFuture.BaseFunction<Void, Collection<ConsumerGroupListing>>() {
- @Override
- public Collection<ConsumerGroupListing> apply(Void v) {
- List<ConsumerGroupListing> listings = new ArrayList<>();
- for (Map.Entry<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> entry : futuresMap.entrySet()) {
- Collection<ConsumerGroupListing> results;
- try {
- results = entry.getValue().get();
- listings.addAll(results);
- } catch (Throwable e) {
- // This should be unreachable, because allOf ensured that all the futures
- // completed successfully.
- throw new RuntimeException(e);
- }
- }
- return listings;
- }
- });
-
- for (final Map.Entry<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> entry : futuresMap.entrySet()) {
- // skip sending the request for those futures who have already failed
- if (entry.getValue().isCompletedExceptionally())
- continue;
-
+ final ListConsumerGroupsResults results =
+ new ListConsumerGroupsResults(metadataExceptions, leaders, all);
+ for (final Node node : leaders) {
final long nowList = time.milliseconds();
-
- final int brokerId = entry.getKey().id();
- final KafkaFutureImpl<Collection<ConsumerGroupListing>> future = entry.getValue();
-
- runnable.call(new Call("listConsumerGroups", deadline, new ConstantNodeIdProvider(brokerId)) {
-
+ runnable.call(new Call("listConsumerGroups", deadline, new ConstantNodeIdProvider(node.id())) {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new ListGroupsRequest.Builder();
@@ -2430,39 +2441,42 @@ public class KafkaAdminClient extends AdminClient {
@Override
void handleResponse(AbstractResponse abstractResponse) {
final ListGroupsResponse response = (ListGroupsResponse) abstractResponse;
-
- if (response.error() != Errors.NONE) {
- future.completeExceptionally(response.error().exception());
- } else {
- final List<ConsumerGroupListing> groupsListing = new ArrayList<>();
- for (ListGroupsResponse.Group group : response.groups()) {
- if (group.protocolType().equals(ConsumerProtocol.PROTOCOL_TYPE) || group.protocolType().isEmpty()) {
- final String groupId = group.groupId();
- final String protocolType = group.protocolType();
- final ConsumerGroupListing groupListing = new ConsumerGroupListing(groupId, protocolType.isEmpty());
- groupsListing.add(groupListing);
+ synchronized (results) {
+ if (response.error() != Errors.NONE) {
+ results.addError(response.error().exception(), node);
+ } else {
+ for (ListGroupsResponse.Group group : response.groups()) {
+ if (group.protocolType().equals(ConsumerProtocol.PROTOCOL_TYPE) ||
+ group.protocolType().isEmpty()) {
+ final String groupId = group.groupId();
+ final String protocolType = group.protocolType();
+ final ConsumerGroupListing groupListing = new ConsumerGroupListing(groupId, protocolType.isEmpty());
+ results.addListing(groupListing);
+ }
}
}
- future.complete(groupsListing);
+ results.tryComplete(node);
}
}
@Override
void handleFailure(Throwable throwable) {
- future.completeExceptionally(throwable);
+ synchronized (results) {
+ results.addError(throwable, node);
+ results.tryComplete(node);
+ }
}
}, nowList);
-
}
}
@Override
void handleFailure(Throwable throwable) {
- listFuture.completeExceptionally(throwable);
+ all.complete(Collections.<Object>singletonList(throwable));
}
}, nowMetadata);
- return new ListConsumerGroupsResult(listFuture, flattenFuture, futuresMap);
+ return new ListConsumerGroupsResult(all);
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
index c3f1236..0ac8529 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
@@ -18,14 +18,11 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.Node;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.internals.KafkaFutureImpl;
-import org.apache.kafka.common.utils.AbstractIterator;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
/**
* The result of the {@link AdminClient#listConsumerGroups()} call.
@@ -34,70 +31,72 @@ import java.util.Map;
*/
@InterfaceStability.Evolving
public class ListConsumerGroupsResult {
- private final Map<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> futuresMap;
- private final KafkaFuture<Collection<ConsumerGroupListing>> flattenFuture;
- private final KafkaFuture<Void> listFuture;
+ private final KafkaFutureImpl<Collection<ConsumerGroupListing>> all;
+ private final KafkaFutureImpl<Collection<ConsumerGroupListing>> valid;
+ private final KafkaFutureImpl<Collection<Throwable>> errors;
- ListConsumerGroupsResult(final KafkaFuture<Void> listFuture,
- final KafkaFuture<Collection<ConsumerGroupListing>> flattenFuture,
- final Map<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> futuresMap) {
- this.flattenFuture = flattenFuture;
- this.listFuture = listFuture;
- this.futuresMap = futuresMap;
- }
-
- private class FutureConsumerGroupListingIterator extends AbstractIterator<KafkaFuture<ConsumerGroupListing>> {
- private Iterator<KafkaFutureImpl<Collection<ConsumerGroupListing>>> futuresIter;
- private Iterator<ConsumerGroupListing> innerIter;
-
- @Override
- protected KafkaFuture<ConsumerGroupListing> makeNext() {
- if (futuresIter == null) {
- try {
- listFuture.get();
- } catch (Exception e) {
- // the list future has failed, there will be no listings to show at all
- return allDone();
- }
-
- futuresIter = futuresMap.values().iterator();
- }
-
- while (innerIter == null || !innerIter.hasNext()) {
- if (futuresIter.hasNext()) {
- KafkaFuture<Collection<ConsumerGroupListing>> collectionFuture = futuresIter.next();
- try {
- Collection<ConsumerGroupListing> collection = collectionFuture.get();
- innerIter = collection.iterator();
- } catch (Exception e) {
- KafkaFutureImpl<ConsumerGroupListing> future = new KafkaFutureImpl<>();
- future.completeExceptionally(e);
- return future;
+ ListConsumerGroupsResult(KafkaFutureImpl<Collection<Object>> future) {
+ this.all = new KafkaFutureImpl<>();
+ this.valid = new KafkaFutureImpl<>();
+ this.errors = new KafkaFutureImpl<>();
+ future.thenApply(new KafkaFuture.BaseFunction<Collection<Object>, Void>() {
+ @Override
+ public Void apply(Collection<Object> results) {
+ ArrayList<Throwable> curErrors = new ArrayList<>();
+ ArrayList<ConsumerGroupListing> curValid = new ArrayList<>();
+ for (Object resultObject : results) {
+ if (resultObject instanceof Throwable) {
+ curErrors.add((Throwable) resultObject);
+ } else {
+ curValid.add((ConsumerGroupListing) resultObject);
}
+ }
+ if (!curErrors.isEmpty()) {
+ all.completeExceptionally(curErrors.get(0));
} else {
- return allDone();
+ all.complete(curValid);
}
+ valid.complete(curValid);
+ errors.complete(curErrors);
+ return null;
}
+ });
+ }
- KafkaFutureImpl<ConsumerGroupListing> future = new KafkaFutureImpl<>();
- future.complete(innerIter.next());
- return future;
- }
+ /**
+ * Returns a future that yields either an exception, or the full set of consumer group
+ * listings.
+ *
+ * In the event of a failure, the future yields nothing but the first exception which
+ * occurred.
+ */
+ public KafkaFutureImpl<Collection<ConsumerGroupListing>> all() {
+ return all;
}
/**
- * Return an iterator of futures for ConsumerGroupListing objects; the returned future will throw exception
- * if we cannot get a complete collection of consumer listings.
+ * Returns a future which yields just the valid listings.
+ *
+ * This future never fails with an error, no matter what happens. Errors are completely
+ * ignored. If nothing can be fetched, an empty collection is yielded.
+ * If there is an error, but some results can be returned, this future will yield
+ * those partial results. When using this future, it is a good idea to also check
+ * the errors future so that errors can be displayed and handled.
*/
- public Iterator<KafkaFuture<ConsumerGroupListing>> iterator() {
- return new FutureConsumerGroupListingIterator();
+ public KafkaFutureImpl<Collection<ConsumerGroupListing>> valid() {
+ return valid;
}
/**
- * Return a future which yields a full collection of ConsumerGroupListing objects; will throw exception
- * if we cannot get a complete collection of consumer listings.
+ * Returns a future which yields just the errors which occurred.
+ *
+ * If this future yields a non-empty collection, it is very likely that elements are
+ * missing from the valid() set.
+ *
+ * This future itself never fails with an error. In the event of an error, this future
+ * will successfully yield a collection containing at least one exception.
*/
- public KafkaFuture<Collection<ConsumerGroupListing>> all() {
- return flattenFuture;
+ public KafkaFutureImpl<Collection<Throwable>> errors() {
+ return errors;
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index d2789b6..0debed3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NotLeaderForPartitionException;
@@ -80,7 +81,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -648,9 +648,8 @@ public class KafkaAdminClientTest {
}
}
- //Ignoring test to be fixed on follow-up PR
@Test
- public void testListConsumerGroups() {
+ public void testListConsumerGroups() throws Exception {
final HashMap<Integer, Node> nodes = new HashMap<>();
Node node0 = new Node(0, "localhost", 8121);
Node node1 = new Node(1, "localhost", 8122);
@@ -685,7 +684,8 @@ public class KafkaAdminClientTest {
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
env.cluster().controller().id(),
- Collections.singletonList(new MetadataResponse.TopicMetadata(Errors.NONE, Topic.GROUP_METADATA_TOPIC_NAME, true, partitionMetadata))));
+ Collections.singletonList(new MetadataResponse.TopicMetadata(Errors.NONE,
+ Topic.GROUP_METADATA_TOPIC_NAME, true, partitionMetadata))));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
@@ -713,31 +713,29 @@ public class KafkaAdminClientTest {
node2);
final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups();
-
- try {
- Collection<ConsumerGroupListing> listing = result.all().get();
- fail("Expected to throw exception");
- } catch (Exception e) {
- // this is good
- }
-
- Iterator<KafkaFuture<ConsumerGroupListing>> iterator = result.iterator();
- int numListing = 0;
- int numFailure = 0;
-
- while (iterator.hasNext()) {
- KafkaFuture<ConsumerGroupListing> future = iterator.next();
- try {
- ConsumerGroupListing listing = future.get();
- numListing++;
- assertTrue(listing.groupId().equals("group-1") || listing.groupId().equals("group-2"));
- } catch (Exception e) {
- numFailure++;
- }
+ assertFutureError(result.all(), CoordinatorNotAvailableException.class);
+ Collection<ConsumerGroupListing> listings = result.valid().get();
+ assertEquals(2, listings.size());
+ for (ConsumerGroupListing listing : listings) {
+ assertTrue(listing.groupId().equals("group-1") || listing.groupId().equals("group-2"));
}
+ assertEquals(1, result.errors().get().size());
- assertEquals(2, numListing);
- assertEquals(1, numFailure);
+ // Test handling the error where we are unable to get metadata for the __consumer_offsets topic.
+ env.kafkaClient().prepareResponse(
+ new MetadataResponse(
+ env.cluster().nodes(),
+ env.cluster().clusterResource().clusterId(),
+ env.cluster().controller().id(),
+ Collections.singletonList(new MetadataResponse.TopicMetadata(
+ Errors.UNKNOWN_TOPIC_OR_PARTITION, Topic.GROUP_METADATA_TOPIC_NAME,
+ true, Collections.<MetadataResponse.PartitionMetadata>emptyList()))));
+ final ListConsumerGroupsResult result2 = env.adminClient().listConsumerGroups();
+ Collection<Throwable> errors = result2.errors().get();
+ assertEquals(1, errors.size());
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.forException(errors.iterator().next()));
+ assertTrue(result2.valid().get().isEmpty());
+ assertFutureError(result2.all(), UnknownTopicOrPartitionException.class);
}
}
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.