You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2019/05/28 18:23:39 UTC

[kafka] branch 2.3 updated: MINOR: Auth operations must be null when talking to a pre-KIP-430 broker (#6812)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 321af2d  MINOR: Auth operations must be null when talking to a pre-KIP-430 broker (#6812)
321af2d is described below

commit 321af2d41b0fc9a568c6f7b72e3708386dae6603
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Tue May 28 11:22:09 2019 -0700

    MINOR: Auth operations must be null when talking to a pre-KIP-430 broker (#6812)
    
    Authorized operations must be null when talking to a pre-KIP-430 broker.
    If we present this as the empty set instead, it is impossible for clients
    to know if they have no permissions, or are talking to an old broker.
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
---
 .../clients/admin/ConsumerGroupDescription.java    |  2 +-
 .../clients/admin/DescribeClusterOptions.java      |  4 +
 .../kafka/clients/admin/DescribeClusterResult.java |  3 +-
 .../kafka/clients/admin/KafkaAdminClient.java      |  5 +-
 .../kafka/clients/admin/TopicDescription.java      |  4 +-
 .../org/apache/kafka/common/acl/AclOperation.java  |  3 +
 .../common/requests/DescribeGroupsResponse.java    | 19 +++++
 .../kafka/common/requests/MetadataResponse.java    |  5 +-
 .../common/message/DescribeGroupsResponse.json     |  2 +-
 .../resources/common/message/MetadataResponse.json |  4 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 96 +++++++++++++++++++++-
 11 files changed, 137 insertions(+), 10 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
index 52f23ed..4590c74 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
@@ -128,7 +128,7 @@ public class ConsumerGroupDescription {
     }
 
     /**
-     * authorizedOperations for this group
+     * authorizedOperations for this group, or null if that information is not known.
      */
     public  Set<AclOperation> authorizedOperations() {
         return authorizedOperations;
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
index abde154..7fb7bd1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
@@ -45,6 +45,10 @@ public class DescribeClusterOptions extends AbstractOptions<DescribeClusterOptio
         return this;
     }
 
+    /**
+     * Specify if authorized operations should be included in the response.  Note that some
+     * older brokers cannot not supply this information even if it is requested.
+     */
     public boolean includeAuthorizedOperations() {
         return includeAuthorizedOperations;
     }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResult.java
index 23f876a..21125a2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResult.java
@@ -71,7 +71,8 @@ public class DescribeClusterResult {
     }
 
     /**
-     * Returns a future which yields authorized operations.
+     * Returns a future which yields authorized operations.  The future value will be non-null if the
+     * broker supplied this information, and null otherwise.
      */
     public KafkaFuture<Set<AclOperation>> authorizedOperations() {
         return authorizedOperations;
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index f0e6635..e394458 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1579,7 +1579,7 @@ public class KafkaAdminClient extends AdminClient {
                 controllerFuture.complete(controller(response));
                 clusterIdFuture.complete(response.clusterId());
                 authorizedOperationsFuture.complete(
-                    validAclOperations(response.data().clusterAuthorizedOperations()));
+                        validAclOperations(response.data().clusterAuthorizedOperations()));
             }
 
             private Node controller(MetadataResponse response) {
@@ -2631,6 +2631,9 @@ public class KafkaAdminClient extends AdminClient {
     }
 
     private Set<AclOperation> validAclOperations(final int authorizedOperations) {
+        if (authorizedOperations == MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED) {
+            return null;
+        }
         return Utils.from32BitField(authorizedOperations)
             .stream()
             .map(AclOperation::fromCode)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
index c6d44e8..ea9bf05 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
@@ -70,7 +70,7 @@ public class TopicDescription {
      * @param internal Whether the topic is internal to Kafka
      * @param partitions A list of partitions where the index represents the partition id and the element contains
      *                   leadership and replica information for that partition.
-     * @param authorizedOperations authorized operations for this topic
+     * @param authorizedOperations authorized operations for this topic, or null if this is not known.
      */
     TopicDescription(String name, boolean internal, List<TopicPartitionInfo> partitions,
                             Set<AclOperation> authorizedOperations) {
@@ -104,7 +104,7 @@ public class TopicDescription {
     }
 
     /**
-     * authorized operations for this topic
+     * authorized operations for this topic, or null if this is not known.
      */
     public Set<AclOperation>  authorizedOperations() {
         return authorizedOperations;
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java b/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
index 7befd6e..6710697 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
@@ -108,6 +108,9 @@ public enum AclOperation {
      */
     IDEMPOTENT_WRITE((byte) 12);
 
+    // Note: we cannot have more than 30 ACL operations without modifying the format used
+    // to describe ACL operations in MetadataResponse.
+
     private final static HashMap<Byte, AclOperation> CODE_TO_VALUE = new HashMap<>();
 
     static {
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index 61b9ea2..823512f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -85,6 +85,25 @@ public class DescribeGroupsResponse extends AbstractResponse {
         return  groupMetada;
     }
 
+    public static DescribedGroup groupMetadata(
+        final String groupId,
+        final Errors error,
+        final String state,
+        final String protocolType,
+        final String protocol,
+        final List<DescribedGroupMember> members,
+        final int authorizedOperations) {
+        DescribedGroup groupMetada = new DescribedGroup();
+        groupMetada.setGroupId(groupId)
+            .setErrorCode(error.code())
+            .setGroupState(state)
+            .setProtocolType(protocolType)
+            .setProtocolData(protocol)
+            .setMembers(members)
+            .setAuthorizedOperations(authorizedOperations);
+        return  groupMetada;
+    }
+
     public DescribeGroupsResponseData data() {
         return data;
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 3455d5b..39b6180 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -55,6 +55,8 @@ import java.util.stream.Collectors;
 public class MetadataResponse extends AbstractResponse {
     public static final int NO_CONTROLLER_ID = -1;
 
+    public static final int AUTHORIZED_OPERATIONS_OMITTED = Integer.MIN_VALUE;
+
     private MetadataResponseData data;
 
     public MetadataResponse(MetadataResponseData data) {
@@ -421,7 +423,8 @@ public class MetadataResponse extends AbstractResponse {
 
     public static MetadataResponse prepareResponse(int throttleTimeMs, List<Node> brokers, String clusterId,
                                                    int controllerId, List<TopicMetadata> topicMetadataList) {
-        return prepareResponse(throttleTimeMs, brokers, clusterId, controllerId, topicMetadataList, 0);
+        return prepareResponse(throttleTimeMs, brokers, clusterId, controllerId, topicMetadataList,
+                MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
     }
 
     public static MetadataResponse prepareResponse(List<Node> brokers, String clusterId, int controllerId,
diff --git a/clients/src/main/resources/common/message/DescribeGroupsResponse.json b/clients/src/main/resources/common/message/DescribeGroupsResponse.json
index e0cbc1e..dd8525a 100644
--- a/clients/src/main/resources/common/message/DescribeGroupsResponse.json
+++ b/clients/src/main/resources/common/message/DescribeGroupsResponse.json
@@ -53,7 +53,7 @@
         { "name": "MemberAssignment", "type": "bytes", "versions": "0+",
           "about": "The current assignment provided by the group leader." }
       ]},
-      { "name": "AuthorizedOperations", "type": "int32", "versions": "3+",
+      { "name": "AuthorizedOperations", "type": "int32", "versions": "3+",  "default": "-2147483648",
         "about": "32-bit bitfield to represent authorized operations for this group." }
     ]}
   ]
diff --git a/clients/src/main/resources/common/message/MetadataResponse.json b/clients/src/main/resources/common/message/MetadataResponse.json
index f54ef28..bb09cdb 100644
--- a/clients/src/main/resources/common/message/MetadataResponse.json
+++ b/clients/src/main/resources/common/message/MetadataResponse.json
@@ -77,10 +77,10 @@
         { "name": "OfflineReplicas", "type": "[]int32", "versions": "5+", "ignorable": true,
           "about": "The set of offline replicas of this partition." }
       ]},
-      { "name": "TopicAuthorizedOperations", "type": "int32", "versions": "8+",
+      { "name": "TopicAuthorizedOperations", "type": "int32", "versions": "8+", "default": "-2147483648",
         "about": "32-bit bitfield to represent authorized operations for this topic." }
     ]},
-    { "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "8+",
+    { "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "8+", "default": "-2147483648",
       "about": "32-bit bitfield to represent authorized operations for this cluster." }
   ]
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 6aaa75b..02eda60 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -104,6 +104,7 @@ import org.slf4j.LoggerFactory;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -493,11 +494,12 @@ public class KafkaAdminClientTest {
             env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(initializedCluster.nodes(),
                     initializedCluster.clusterResource().clusterId(), 1,
                     singletonList(new MetadataResponse.TopicMetadata(Errors.NONE, topic, false,
-                            singletonList(partitionMetadata)))));
+                            singletonList(partitionMetadata), MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED))));
 
             DescribeTopicsResult result = env.adminClient().describeTopics(Collections.singleton(topic));
             Map<String, TopicDescription> topicDescriptions = result.all().get();
             assertEquals(leader, topicDescriptions.get(topic).partitions().get(0).leader());
+            assertEquals(null, topicDescriptions.get(topic).authorizedOperations());
         }
     }
 
@@ -925,6 +927,61 @@ public class KafkaAdminClientTest {
     }
 
     @Test
+    public void testDescribeCluster() throws Exception {
+        final HashMap<Integer, Node> nodes = new HashMap<>();
+        Node node0 = new Node(0, "localhost", 8121);
+        Node node1 = new Node(1, "localhost", 8122);
+        Node node2 = new Node(2, "localhost", 8123);
+        Node node3 = new Node(3, "localhost", 8124);
+        nodes.put(0, node0);
+        nodes.put(1, node1);
+        nodes.put(2, node2);
+        nodes.put(3, node3);
+
+        final Cluster cluster = new Cluster(
+                "mockClusterId",
+                nodes.values(),
+                Collections.emptyList(),
+                Collections.emptySet(),
+                Collections.emptySet(), nodes.get(0));
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            // Prepare the metadata response used for the first describe cluster
+            MetadataResponse response = MetadataResponse.prepareResponse(0,
+                    new ArrayList<>(nodes.values()),
+                    env.cluster().clusterResource().clusterId(),
+                    2,
+                    Collections.emptyList(),
+                    MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
+            env.kafkaClient().prepareResponse(response);
+
+            // Prepare the metadata response used for the second describe cluster
+            MetadataResponse response2 = MetadataResponse.prepareResponse(0,
+                    new ArrayList<>(nodes.values()),
+                    env.cluster().clusterResource().clusterId(),
+                    3,
+                    Collections.emptyList(),
+                    1 << AclOperation.DESCRIBE.code() | 1 << AclOperation.ALTER.code());
+            env.kafkaClient().prepareResponse(response2);
+
+            // Test DescribeCluster with the authorized operations omitted.
+            final DescribeClusterResult result = env.adminClient().describeCluster();
+            assertEquals(env.cluster().clusterResource().clusterId(), result.clusterId().get());
+            assertEquals(2, result.controller().get().id());
+            assertEquals(null, result.authorizedOperations().get());
+
+            // Test DescribeCluster with the authorized operations included.
+            final DescribeClusterResult result2 = env.adminClient().describeCluster();
+            assertEquals(env.cluster().clusterResource().clusterId(), result2.clusterId().get());
+            assertEquals(3, result2.controller().get().id());
+            assertEquals(new HashSet<>(Arrays.asList(AclOperation.DESCRIBE, AclOperation.ALTER)),
+                    result2.authorizedOperations().get());
+        }
+    }
+
+    @Test
     public void testListConsumerGroups() throws Exception {
         final HashMap<Integer, Node> nodes = new HashMap<>();
         Node node0 = new Node(0, "localhost", 8121);
@@ -1157,6 +1214,43 @@ public class KafkaAdminClientTest {
     }
 
     @Test
+    public void testDescribeConsumerGroupsWithAuthorizedOperationsOmitted() throws Exception {
+        final HashMap<Integer, Node> nodes = new HashMap<>();
+        nodes.put(0, new Node(0, "localhost", 8121));
+
+        final Cluster cluster =
+            new Cluster(
+                "mockClusterId",
+                nodes.values(),
+                Collections.<PartitionInfo>emptyList(),
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet(), nodes.get(0));
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, env.cluster().controller()));
+
+            DescribeGroupsResponseData data = new DescribeGroupsResponseData();
+            data.groups().add(DescribeGroupsResponse.groupMetadata(
+                "group-0",
+                Errors.NONE,
+                "",
+                ConsumerProtocol.PROTOCOL_TYPE,
+                "",
+                Collections.emptyList(),
+                MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED));
+
+            env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
+
+            final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(singletonList("group-0"));
+            final ConsumerGroupDescription groupDescription = result.describedGroups().get("group-0").get();
+
+            assertNull(groupDescription.authorizedOperations());
+        }
+    }
+
+    @Test
     public void testDescribeConsumerGroupOffsets() throws Exception {
         final HashMap<Integer, Node> nodes = new HashMap<>();
         nodes.put(0, new Node(0, "localhost", 8121));