You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/05/23 19:13:22 UTC

[kafka] branch trunk updated: MINOR: Remove dependence on __consumer_offsets in AdminClient listConsumerGroups

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0f86e68  MINOR: Remove dependence on __consumer_offsets in AdminClient listConsumerGroups
0f86e68 is described below

commit 0f86e6884070ec9c867c9354ae331aceeb90b1e8
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed May 23 12:13:04 2018 -0700

    MINOR: Remove dependence on __consumer_offsets in AdminClient listConsumerGroups
    
    Avoid dependence on the internal __consumer_offsets topic to handle `listConsumerGroups()` since it unnecessarily requires users to have Describe access on an internal topic. Instead we query each broker independently. For most clusters, this amounts to the same thing since the default number of partitions for __consumer_offsets is 50. This also provides better encapsulation since it avoids exposing the use of __consumer_offsets, which gives us more flexibility in the future.
    
    Author: Jason Gustafson <ja...@confluent.io>
    
    Reviewers: Dong Lin <li...@gmail.com>
    
    Closes #5007 from hachikuji/remove-admin-use-of-offsets-topic
---
 .../internals => }/StaleMetadataException.java     | 13 +++-
 .../kafka/clients/admin/KafkaAdminClient.java      | 88 +++++++++-------------
 .../kafka/clients/consumer/internals/Fetcher.java  |  1 +
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 82 ++++++++++++--------
 4 files changed, 97 insertions(+), 87 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java b/clients/src/main/java/org/apache/kafka/clients/StaleMetadataException.java
similarity index 74%
rename from clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
rename to clients/src/main/java/org/apache/kafka/clients/StaleMetadataException.java
index 53110d3..dafc2d5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
+++ b/clients/src/main/java/org/apache/kafka/clients/StaleMetadataException.java
@@ -14,13 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.clients.consumer.internals;
+package org.apache.kafka.clients;
 
 import org.apache.kafka.common.errors.InvalidMetadataException;
 
 /**
- * Thrown when metadata is old and needs to be refreshed.
+ * Thrown when current metadata cannot be used. This is often used as a way to trigger a metadata
+ * update before retrying another operation.
+ *
+ * Note: this is not a public API.
  */
 public class StaleMetadataException extends InvalidMetadataException {
     private static final long serialVersionUID = 1L;
+
+    public StaleMetadataException() {}
+
+    public StaleMetadataException(String message) {
+        super(message);
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 9ae8bcd..c935ef9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.StaleMetadataException;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
 import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
@@ -49,13 +50,11 @@ import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.InvalidTopicException;
-import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
-import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -97,22 +96,22 @@ import org.apache.kafka.common.requests.DescribeConfigsRequest;
 import org.apache.kafka.common.requests.DescribeConfigsResponse;
 import org.apache.kafka.common.requests.DescribeDelegationTokenRequest;
 import org.apache.kafka.common.requests.DescribeDelegationTokenResponse;
+import org.apache.kafka.common.requests.DescribeGroupsRequest;
+import org.apache.kafka.common.requests.DescribeGroupsResponse;
 import org.apache.kafka.common.requests.DescribeLogDirsRequest;
 import org.apache.kafka.common.requests.DescribeLogDirsResponse;
 import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
 import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
-import org.apache.kafka.common.requests.MetadataRequest;
-import org.apache.kafka.common.requests.MetadataResponse;
-import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
-import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
-import org.apache.kafka.common.requests.DescribeGroupsRequest;
-import org.apache.kafka.common.requests.DescribeGroupsResponse;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.ListGroupsRequest;
 import org.apache.kafka.common.requests.ListGroupsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.OffsetFetchRequest;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
+import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
 import org.apache.kafka.common.requests.Resource;
 import org.apache.kafka.common.requests.ResourceType;
 import org.apache.kafka.common.security.token.delegation.DelegationToken;
@@ -2427,9 +2426,9 @@ public class KafkaAdminClient extends AdminClient {
         private final HashSet<Node> remaining;
         private final KafkaFutureImpl<Collection<Object>> future;
 
-        ListConsumerGroupsResults(Collection<Throwable> errors, Collection<Node> leaders,
+        ListConsumerGroupsResults(Collection<Node> leaders,
                                   KafkaFutureImpl<Collection<Object>> future) {
-            this.errors = new ArrayList<>(errors);
+            this.errors = new ArrayList<>();
             this.listings = new HashMap<>();
             this.remaining = new HashSet<>(leaders);
             this.future = future;
@@ -2439,11 +2438,9 @@ public class KafkaAdminClient extends AdminClient {
         synchronized void addError(Throwable throwable, Node node) {
             ApiError error = ApiError.fromThrowable(throwable);
             if (error.message() == null || error.message().isEmpty()) {
-                errors.add(error.error().exception(
-                    "Error listing groups on " + node));
+                errors.add(error.error().exception("Error listing groups on " + node));
             } else {
-                errors.add(error.error().exception(
-                    "Error listing groups on " + node + ": " + error.message()));
+                errors.add(error.error().exception("Error listing groups on " + node + ": " + error.message()));
             }
         }
 
@@ -2470,45 +2467,24 @@ public class KafkaAdminClient extends AdminClient {
         final KafkaFutureImpl<Collection<Object>> all = new KafkaFutureImpl<>();
         final long nowMetadata = time.milliseconds();
         final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs());
-        runnable.call(new Call("findGroupsMetadata", deadline, new LeastLoadedNodeProvider()) {
+        runnable.call(new Call("findAllBrokers", deadline, new LeastLoadedNodeProvider()) {
             @Override
             AbstractRequest.Builder createRequest(int timeoutMs) {
-                return new MetadataRequest.Builder(Collections.singletonList(Topic.GROUP_METADATA_TOPIC_NAME), true);
+                return new MetadataRequest.Builder(Collections.emptyList(), true);
             }
 
             @Override
             void handleResponse(AbstractResponse abstractResponse) {
                 MetadataResponse metadataResponse = (MetadataResponse) abstractResponse;
-                final List<Throwable> metadataExceptions = new ArrayList<>();
-                final HashSet<Node> leaders = new HashSet<>();
-                for (final MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) {
-                    if (metadata.error() != Errors.NONE) {
-                        metadataExceptions.add(metadata.error().exception("Unable to locate " +
-                            Topic.GROUP_METADATA_TOPIC_NAME));
-                    } else if (!metadata.topic().equals(Topic.GROUP_METADATA_TOPIC_NAME)) {
-                        metadataExceptions.add(new UnknownServerException("Server returned unrequested " +
-                            "information about unexpected topic " + metadata.topic()));
-                    } else {
-                        for (final MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata()) {
-                            final Node leader = partitionMetadata.leader();
-                            if (partitionMetadata.error() != Errors.NONE) {
-                                // TODO: KAFKA-6789, retry based on the error code
-                                metadataExceptions.add(partitionMetadata.error().exception("Unable to find " +
-                                    "leader for partition " + partitionMetadata.partition() + " of " +
-                                    Topic.GROUP_METADATA_TOPIC_NAME));
-                            } else if (leader == null || leader.equals(Node.noNode())) {
-                                metadataExceptions.add(new LeaderNotAvailableException("Unable to find leader " +
-                                    "for partition " + partitionMetadata.partition() + " of " +
-                                    Topic.GROUP_METADATA_TOPIC_NAME));
-                            } else {
-                                leaders.add(leader);
-                            }
-                        }
-                    }
-                }
-                final ListConsumerGroupsResults results =
-                    new ListConsumerGroupsResults(metadataExceptions, leaders, all);
-                for (final Node node : leaders) {
+                Cluster cluster = metadataResponse.cluster();
+
+                if (cluster.nodes().isEmpty())
+                    throw new StaleMetadataException("Metadata fetch failed due to missing broker list");
+
+                HashSet<Node> allNodes = new HashSet<>(cluster.nodes());
+                final ListConsumerGroupsResults results = new ListConsumerGroupsResults(allNodes, all);
+
+                for (final Node node : allNodes) {
                     final long nowList = time.milliseconds();
                     runnable.call(new Call("listConsumerGroups", deadline, new ConstantNodeIdProvider(node.id())) {
                         @Override
@@ -2516,6 +2492,15 @@ public class KafkaAdminClient extends AdminClient {
                             return new ListGroupsRequest.Builder();
                         }
 
+                        private void maybeAddConsumerGroup(ListGroupsResponse.Group group) {
+                            String protocolType = group.protocolType();
+                            if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) {
+                                final String groupId = group.groupId();
+                                final ConsumerGroupListing groupListing = new ConsumerGroupListing(groupId, protocolType.isEmpty());
+                                results.addListing(groupListing);
+                            }
+                        }
+
                         @Override
                         void handleResponse(AbstractResponse abstractResponse) {
                             final ListGroupsResponse response = (ListGroupsResponse) abstractResponse;
@@ -2524,13 +2509,7 @@ public class KafkaAdminClient extends AdminClient {
                                     results.addError(response.error().exception(), node);
                                 } else {
                                     for (ListGroupsResponse.Group group : response.groups()) {
-                                        if (group.protocolType().equals(ConsumerProtocol.PROTOCOL_TYPE) ||
-                                            group.protocolType().isEmpty()) {
-                                            final String groupId = group.groupId();
-                                            final String protocolType = group.protocolType();
-                                            final ConsumerGroupListing groupListing = new ConsumerGroupListing(groupId, protocolType.isEmpty());
-                                            results.addListing(groupListing);
-                                        }
+                                        maybeAddConsumerGroup(group);
                                     }
                                 }
                                 results.tryComplete(node);
@@ -2550,7 +2529,8 @@ public class KafkaAdminClient extends AdminClient {
 
             @Override
             void handleFailure(Throwable throwable) {
-                all.complete(Collections.<Object>singletonList(throwable));
+                KafkaException exception = new KafkaException("Failed to find brokers to send ListGroups", throwable);
+                all.complete(Collections.singletonList(exception));
             }
         }, nowMetadata);
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index b3791ff..38f324f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.FetchSessionHandler;
 import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.StaleMetadataException;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 92cda40..575f7f8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
@@ -44,7 +45,6 @@ import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
-import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateAclsResponse;
@@ -761,34 +761,32 @@ public class KafkaAdminClientTest {
         nodes.put(1, node1);
         nodes.put(2, node2);
 
-        final Cluster cluster =
-            new Cluster(
+        final Cluster cluster = new Cluster(
                 "mockClusterId",
                 nodes.values(),
-                Collections.<PartitionInfo>emptyList(),
-                Collections.<String>emptySet(),
-                Collections.<String>emptySet(), nodes.get(0));
+                Collections.emptyList(),
+                Collections.emptySet(),
+                Collections.emptySet(), nodes.get(0));
 
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
 
-            List<MetadataResponse.PartitionMetadata> partitionMetadata = new ArrayList<>();
-            partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 0, node0,
-                    singletonList(node0), singletonList(node0), Collections.<Node>emptyList()));
-            partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 0, node1,
-                    singletonList(node1), singletonList(node1), Collections.<Node>emptyList()));
-            partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 0, node2,
-                    singletonList(node2), singletonList(node2), Collections.<Node>emptyList()));
+            // Empty metadata response should be retried
+            env.kafkaClient().prepareResponse(
+                    new MetadataResponse(
+                            Collections.emptyList(),
+                            env.cluster().clusterResource().clusterId(),
+                            -1,
+                            Collections.emptyList()));
 
             env.kafkaClient().prepareResponse(
                     new MetadataResponse(
                             env.cluster().nodes(),
                             env.cluster().clusterResource().clusterId(),
                             env.cluster().controller().id(),
-                            singletonList(new MetadataResponse.TopicMetadata(Errors.NONE,
-                                Topic.GROUP_METADATA_TOPIC_NAME, true, partitionMetadata))));
+                            Collections.emptyList()));
 
             env.kafkaClient().prepareResponseFrom(
                     new ListGroupsResponse(
@@ -802,7 +800,7 @@ public class KafkaAdminClientTest {
             env.kafkaClient().prepareResponseFrom(
                     new ListGroupsResponse(
                             Errors.COORDINATOR_NOT_AVAILABLE,
-                            Collections.<ListGroupsResponse.Group>emptyList()
+                            Collections.emptyList()
                     ),
                     node1);
 
@@ -823,22 +821,44 @@ public class KafkaAdminClientTest {
                 assertTrue(listing.groupId().equals("group-1") || listing.groupId().equals("group-2"));
             }
             assertEquals(1, result.errors().get().size());
+        }
+    }
 
-            // Test handling the error where we are unable to get metadata for the __consumer_offsets topic.
+    @Test
+    public void testListConsumerGroupsMetadataFailure() throws Exception {
+        final HashMap<Integer, Node> nodes = new HashMap<>();
+        Node node0 = new Node(0, "localhost", 8121);
+        Node node1 = new Node(1, "localhost", 8122);
+        Node node2 = new Node(2, "localhost", 8123);
+        nodes.put(0, node0);
+        nodes.put(1, node1);
+        nodes.put(2, node2);
+
+        final Cluster cluster = new Cluster(
+                "mockClusterId",
+                nodes.values(),
+                Collections.emptyList(),
+                Collections.emptySet(),
+                Collections.emptySet(), nodes.get(0));
+        final Time time = new MockTime();
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
+                AdminClientConfig.RETRIES_CONFIG, "0")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.emptySet());
+            env.kafkaClient().setNode(env.cluster().controller());
+
+            // Empty metadata causes the request to fail since we have no list of brokers
+            // to send the ListGroups requests to
             env.kafkaClient().prepareResponse(
-                new MetadataResponse(
-                    env.cluster().nodes(),
-                    env.cluster().clusterResource().clusterId(),
-                    env.cluster().controller().id(),
-                    singletonList(new MetadataResponse.TopicMetadata(
-                        Errors.UNKNOWN_TOPIC_OR_PARTITION, Topic.GROUP_METADATA_TOPIC_NAME,
-                        true, Collections.<MetadataResponse.PartitionMetadata>emptyList()))));
-            final ListConsumerGroupsResult result2 = env.adminClient().listConsumerGroups();
-            Collection<Throwable> errors = result2.errors().get();
-            assertEquals(1, errors.size());
-            assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.forException(errors.iterator().next()));
-            assertTrue(result2.valid().get().isEmpty());
-            assertFutureError(result2.all(), UnknownTopicOrPartitionException.class);
+                    new MetadataResponse(
+                            Collections.emptyList(),
+                            env.cluster().clusterResource().clusterId(),
+                            -1,
+                            Collections.emptyList()));
+
+            final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups();
+            assertFutureError(result.all(), KafkaException.class);
         }
     }
 

-- 
To stop receiving notification emails like this one, please contact
lindong@apache.org.