You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2019/03/01 15:54:21 UTC
[kafka] branch trunk updated: KAFKA-7922: Return authorized
operations in describe consumer group responses (KIP-430 Part-1)
This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f11fa5e KAFKA-7922: Return authorized operations in describe consumer group responses (KIP-430 Part-1)
f11fa5e is described below
commit f11fa5ef402193e2da785466e698e11b56bd19c7
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Fri Mar 1 21:23:52 2019 +0530
KAFKA-7922: Return authorized operations in describe consumer group responses (KIP-430 Part-1)
- Use automatic RPC generation in DescribeGroups Request/Response classes
Author: Manikumar Reddy <ma...@gmail.com>
Reviewers: Rajini Sivaram <ra...@googlemail.com>
Closes #6322 from omkreddy/KIP-430-Return-Ops
---
.../clients/admin/ConsumerGroupDescription.java | 31 ++-
.../admin/DescribeConsumerGroupsOptions.java | 10 +
.../kafka/clients/admin/KafkaAdminClient.java | 51 +++-
.../org/apache/kafka/common/protocol/ApiKeys.java | 8 +-
.../kafka/common/requests/AbstractResponse.java | 2 +-
.../common/requests/DescribeGroupsRequest.java | 74 ++----
.../common/requests/DescribeGroupsResponse.java | 290 +++++----------------
.../java/org/apache/kafka/common/utils/Utils.java | 24 ++
.../common/message/DescribeGroupsRequest.json | 7 +-
.../common/message/DescribeGroupsResponse.json | 7 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 30 ++-
.../kafka/common/requests/RequestResponseTest.java | 19 +-
.../org/apache/kafka/common/utils/UtilsTest.java | 27 +-
core/src/main/scala/kafka/admin/AclCommand.scala | 10 +-
core/src/main/scala/kafka/admin/AdminClient.scala | 24 +-
.../scala/kafka/security/auth/ResourceType.scala | 7 +
core/src/main/scala/kafka/server/KafkaApis.scala | 64 ++++-
.../kafka/api/AdminClientIntegrationTest.scala | 13 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 10 +-
.../api/DescribeAuthorizedOperationsTest.scala | 121 +++++++++
.../scala/unit/kafka/server/RequestQuotaTest.scala | 8 +-
21 files changed, 462 insertions(+), 375 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
index 8947293..8dd6018 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
@@ -19,12 +19,14 @@ package org.apache.kafka.clients.admin;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.Node;
+import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.utils.Utils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
+import java.util.Set;
/**
* A detailed description of a single consumer group in the cluster.
@@ -36,13 +38,15 @@ public class ConsumerGroupDescription {
private final String partitionAssignor;
private final ConsumerGroupState state;
private final Node coordinator;
+ private Set<AclOperation> authorizedOperations;
public ConsumerGroupDescription(String groupId,
boolean isSimpleConsumerGroup,
Collection<MemberDescription> members,
String partitionAssignor,
ConsumerGroupState state,
- Node coordinator) {
+ Node coordinator,
+ Set<AclOperation> authorizedOperations) {
this.groupId = groupId == null ? "" : groupId;
this.isSimpleConsumerGroup = isSimpleConsumerGroup;
this.members = members == null ? Collections.emptyList() :
@@ -50,23 +54,26 @@ public class ConsumerGroupDescription {
this.partitionAssignor = partitionAssignor == null ? "" : partitionAssignor;
this.state = state;
this.coordinator = coordinator;
+ this.authorizedOperations = authorizedOperations;
}
@Override
- public boolean equals(Object o) {
+ public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
- ConsumerGroupDescription that = (ConsumerGroupDescription) o;
+ final ConsumerGroupDescription that = (ConsumerGroupDescription) o;
return isSimpleConsumerGroup == that.isSimpleConsumerGroup &&
- groupId.equals(that.groupId) &&
- members.equals(that.members) &&
- partitionAssignor.equals(that.partitionAssignor) &&
- state.equals(that.state);
+ Objects.equals(groupId, that.groupId) &&
+ Objects.equals(members, that.members) &&
+ Objects.equals(partitionAssignor, that.partitionAssignor) &&
+ state == that.state &&
+ Objects.equals(coordinator, that.coordinator) &&
+ Objects.equals(authorizedOperations, that.authorizedOperations);
}
@Override
public int hashCode() {
- return Objects.hash(isSimpleConsumerGroup, groupId, members, partitionAssignor, state);
+ return Objects.hash(groupId, isSimpleConsumerGroup, members, partitionAssignor, state, coordinator, authorizedOperations);
}
/**
@@ -111,6 +118,13 @@ public class ConsumerGroupDescription {
return coordinator;
}
+ /**
+ * authorizedOperations for this group
+ */
+ public Set<AclOperation> authorizedOperations() {
+ return authorizedOperations;
+ }
+
@Override
public String toString() {
return "(groupId=" + groupId +
@@ -119,6 +133,7 @@ public class ConsumerGroupDescription {
", partitionAssignor=" + partitionAssignor +
", state=" + state +
", coordinator=" + coordinator +
+ ", authorizedOperations=" + authorizedOperations +
")";
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsOptions.java
index 7daff1a..8f05f61 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsOptions.java
@@ -28,4 +28,14 @@ import java.util.Collection;
*/
@InterfaceStability.Evolving
public class DescribeConsumerGroupsOptions extends AbstractOptions<DescribeConsumerGroupsOptions> {
+ private boolean includeAuthorizedOperations;
+
+ public DescribeConsumerGroupsOptions includeAuthorizedOperations(boolean includeAuthorizedOperations) {
+ this.includeAuthorizedOperations = includeAuthorizedOperations;
+ return this;
+ }
+
+ public boolean includeAuthorizedOperations() {
+ return includeAuthorizedOperations;
+ }
}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 1567d90..95337d0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -45,6 +45,7 @@ import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
@@ -62,6 +63,9 @@ import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicSet;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.DescribeGroupsRequestData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
+import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -128,9 +132,11 @@ import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@@ -2430,7 +2436,10 @@ public class KafkaAdminClient extends AdminClient {
runnable.call(new Call("describeConsumerGroups", deadline, new ConstantNodeIdProvider(nodeId)) {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
- return new DescribeGroupsRequest.Builder(Collections.singletonList(groupId));
+ return new DescribeGroupsRequest.Builder(
+ new DescribeGroupsRequestData()
+ .setGroups(Collections.singletonList(groupId))
+ .setIncludeAuthorizedOperations(options.includeAuthorizedOperations()));
}
@Override
@@ -2438,23 +2447,27 @@ public class KafkaAdminClient extends AdminClient {
final DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse;
KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId);
- final DescribeGroupsResponse.GroupMetadata groupMetadata = response.groups().get(groupId);
+ final DescribedGroup describedGroup = response.data()
+ .groups()
+ .stream()
+ .filter(group -> groupId.equals(group.groupId()))
+ .findFirst().get();
- final Errors groupError = groupMetadata.error();
+ final Errors groupError = Errors.forCode(describedGroup.errorCode());
if (groupError != Errors.NONE) {
// TODO: KAFKA-6789, we can retry based on the error code
future.completeExceptionally(groupError.exception());
} else {
- final String protocolType = groupMetadata.protocolType();
+ final String protocolType = describedGroup.protocolType();
if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) {
- final List<DescribeGroupsResponse.GroupMember> members = groupMetadata.members();
+ final List<DescribedGroupMember> members = describedGroup.members();
final List<MemberDescription> memberDescriptions = new ArrayList<>(members.size());
-
- for (DescribeGroupsResponse.GroupMember groupMember : members) {
+ final Set<AclOperation> authorizedOperations = validAclOperations(describedGroup.authorizedOperations());
+ for (DescribedGroupMember groupMember : members) {
Set<TopicPartition> partitions = Collections.emptySet();
- if (groupMember.memberAssignment().remaining() > 0) {
+ if (groupMember.memberAssignment().length > 0) {
final PartitionAssignor.Assignment assignment = ConsumerProtocol.
- deserializeAssignment(groupMember.memberAssignment().duplicate());
+ deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
partitions = new HashSet<>(assignment.partitions());
}
final MemberDescription memberDescription =
@@ -2465,12 +2478,12 @@ public class KafkaAdminClient extends AdminClient {
memberDescriptions.add(memberDescription);
}
final ConsumerGroupDescription consumerGroupDescription =
- new ConsumerGroupDescription(groupId,
- protocolType.isEmpty(),
+ new ConsumerGroupDescription(groupId, protocolType.isEmpty(),
memberDescriptions,
- groupMetadata.protocol(),
- ConsumerGroupState.parse(groupMetadata.state()),
- fcResponse.node());
+ describedGroup.protocolData(),
+ ConsumerGroupState.parse(describedGroup.groupState()),
+ fcResponse.node(),
+ authorizedOperations);
future.complete(consumerGroupDescription);
}
}
@@ -2495,6 +2508,16 @@ public class KafkaAdminClient extends AdminClient {
return new DescribeConsumerGroupsResult(new HashMap<>(futures));
}
+ private Set<AclOperation> validAclOperations(final int authorizedOperations) {
+ return Utils.from32BitField(authorizedOperations)
+ .stream()
+ .map(AclOperation::fromCode)
+ .filter(operation -> operation != AclOperation.UNKNOWN
+ && operation != AclOperation.ALL
+ && operation != AclOperation.ANY)
+ .collect(Collectors.toSet());
+ }
+
private boolean handleFindCoordinatorError(FindCoordinatorResponse response, KafkaFutureImpl<?> future) {
Errors error = response.error();
if (error.exception() instanceof RetriableException) {
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 937b044..19bf6f0 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -18,6 +18,8 @@ package org.apache.kafka.common.protocol;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.DescribeGroupsRequestData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
@@ -61,8 +63,6 @@ import org.apache.kafka.common.requests.DescribeConfigsRequest;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.kafka.common.requests.DescribeDelegationTokenRequest;
import org.apache.kafka.common.requests.DescribeDelegationTokenResponse;
-import org.apache.kafka.common.requests.DescribeGroupsRequest;
-import org.apache.kafka.common.requests.DescribeGroupsResponse;
import org.apache.kafka.common.requests.DescribeLogDirsRequest;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
@@ -139,8 +139,8 @@ public enum ApiKeys {
HEARTBEAT(12, "Heartbeat", HeartbeatRequest.schemaVersions(), HeartbeatResponse.schemaVersions()),
LEAVE_GROUP(13, "LeaveGroup", LeaveGroupRequestData.SCHEMAS, LeaveGroupResponseData.SCHEMAS),
SYNC_GROUP(14, "SyncGroup", SyncGroupRequest.schemaVersions(), SyncGroupResponse.schemaVersions()),
- DESCRIBE_GROUPS(15, "DescribeGroups", DescribeGroupsRequest.schemaVersions(),
- DescribeGroupsResponse.schemaVersions()),
+ DESCRIBE_GROUPS(15, "DescribeGroups", DescribeGroupsRequestData.SCHEMAS,
+ DescribeGroupsResponseData.SCHEMAS),
LIST_GROUPS(16, "ListGroups", ListGroupsRequest.schemaVersions(), ListGroupsResponse.schemaVersions()),
SASL_HANDSHAKE(17, "SaslHandshake", SaslHandshakeRequestData.SCHEMAS, SaslHandshakeResponseData.SCHEMAS),
API_VERSIONS(18, "ApiVersions", ApiVersionsRequest.schemaVersions(), ApiVersionsResponse.schemaVersions()) {
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 68b505b..959379c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -101,7 +101,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
case LEADER_AND_ISR:
return new LeaderAndIsrResponse(struct);
case DESCRIBE_GROUPS:
- return new DescribeGroupsResponse(struct);
+ return new DescribeGroupsResponse(struct, version);
case LIST_GROUPS:
return new ListGroupsResponse(struct);
case SASL_HANDSHAKE:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
index 006af4f..8f003ac 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
@@ -16,97 +16,65 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
+import static org.apache.kafka.common.requests.AbstractResponse.DEFAULT_THROTTLE_TIME;
public class DescribeGroupsRequest extends AbstractRequest {
- private static final String GROUP_IDS_KEY_NAME = "group_ids";
-
- /* Describe group api */
- private static final Schema DESCRIBE_GROUPS_REQUEST_V0 = new Schema(
- new Field(GROUP_IDS_KEY_NAME, new ArrayOf(STRING), "List of groupIds to request metadata for (an " +
- "empty groupId array will return empty group metadata)."));
-
- /* v1 request is the same as v0. Throttle time has been added to response */
- private static final Schema DESCRIBE_GROUPS_REQUEST_V1 = DESCRIBE_GROUPS_REQUEST_V0;
-
- /**
- * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
- */
- private static final Schema DESCRIBE_GROUPS_REQUEST_V2 = DESCRIBE_GROUPS_REQUEST_V1;
-
- public static Schema[] schemaVersions() {
- return new Schema[]{DESCRIBE_GROUPS_REQUEST_V0, DESCRIBE_GROUPS_REQUEST_V1, DESCRIBE_GROUPS_REQUEST_V2};
- }
-
public static class Builder extends AbstractRequest.Builder<DescribeGroupsRequest> {
- private final List<String> groupIds;
+ private final DescribeGroupsRequestData data;
- public Builder(List<String> groupIds) {
+ public Builder(DescribeGroupsRequestData data) {
super(ApiKeys.DESCRIBE_GROUPS);
- this.groupIds = groupIds;
+ this.data = data;
}
@Override
public DescribeGroupsRequest build(short version) {
- return new DescribeGroupsRequest(this.groupIds, version);
+ return new DescribeGroupsRequest(data, version);
}
@Override
public String toString() {
- return "(type=DescribeGroupsRequest, groupIds=(" + Utils.join(groupIds, ",") + "))";
+ return data.toString();
}
}
- private final List<String> groupIds;
+ private final DescribeGroupsRequestData data;
+ private final short version;
- private DescribeGroupsRequest(List<String> groupIds, short version) {
+ private DescribeGroupsRequest(DescribeGroupsRequestData data, short version) {
super(ApiKeys.DESCRIBE_GROUPS, version);
- this.groupIds = groupIds;
+ this.data = data;
+ this.version = version;
}
public DescribeGroupsRequest(Struct struct, short version) {
super(ApiKeys.DESCRIBE_GROUPS, version);
- this.groupIds = new ArrayList<>();
- for (Object groupId : struct.getArray(GROUP_IDS_KEY_NAME))
- this.groupIds.add((String) groupId);
+ this.data = new DescribeGroupsRequestData(struct, version);
+ this.version = version;
}
- public List<String> groupIds() {
- return groupIds;
+ public DescribeGroupsRequestData data() {
+ return data;
}
@Override
protected Struct toStruct() {
- Struct struct = new Struct(ApiKeys.DESCRIBE_GROUPS.requestSchema(version()));
- struct.set(GROUP_IDS_KEY_NAME, groupIds.toArray());
- return struct;
+ return data.toStruct(version);
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
- short version = version();
- switch (version) {
- case 0:
- return DescribeGroupsResponse.fromError(Errors.forException(e), groupIds);
- case 1:
- case 2:
- return DescribeGroupsResponse.fromError(throttleTimeMs, Errors.forException(e), groupIds);
-
- default:
- throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
- version, this.getClass().getSimpleName(), ApiKeys.DESCRIBE_GROUPS.latestVersion()));
+ if (version == 0) {
+ return DescribeGroupsResponse.fromError(DEFAULT_THROTTLE_TIME, Errors.forException(e), data.groups());
+ } else {
+ return DescribeGroupsResponse.fromError(throttleTimeMs, Errors.forException(e), data.groups());
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index 06c2471..61b9ea2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -16,80 +16,23 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
+import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
-import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.types.Type.BYTES;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
+import java.util.Set;
public class DescribeGroupsResponse extends AbstractResponse {
- private static final String GROUPS_KEY_NAME = "groups";
-
- private static final String GROUP_STATE_KEY_NAME = "state";
- private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type";
- private static final String PROTOCOL_KEY_NAME = "protocol";
-
- private static final String MEMBERS_KEY_NAME = "members";
- private static final String CLIENT_ID_KEY_NAME = "client_id";
- private static final String CLIENT_HOST_KEY_NAME = "client_host";
- private static final String MEMBER_METADATA_KEY_NAME = "member_metadata";
- private static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment";
-
- private static final Schema DESCRIBE_GROUPS_RESPONSE_MEMBER_V0 = new Schema(
- MEMBER_ID,
- new Field(CLIENT_ID_KEY_NAME, STRING, "The client id used in the member's latest join group request"),
- new Field(CLIENT_HOST_KEY_NAME, STRING, "The client host used in the request session corresponding to the " +
- "member's join group."),
- new Field(MEMBER_METADATA_KEY_NAME, BYTES, "The metadata corresponding to the current group protocol in " +
- "use (will only be present if the group is stable)."),
- new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES, "The current assignment provided by the group leader " +
- "(will only be present if the group is stable)."));
-
- private static final Schema DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0 = new Schema(
- ERROR_CODE,
- GROUP_ID,
- new Field(GROUP_STATE_KEY_NAME, STRING, "The current state of the group (one of: Dead, Stable, CompletingRebalance, " +
- "PreparingRebalance, or empty if there is no active group)"),
- new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "The current group protocol type (will be empty if there is no active group)"),
- new Field(PROTOCOL_KEY_NAME, STRING, "The current group protocol (only provided if the group is Stable)"),
- new Field(MEMBERS_KEY_NAME, new ArrayOf(DESCRIBE_GROUPS_RESPONSE_MEMBER_V0), "Current group members " +
- "(only provided if the group is not Dead)"));
-
- private static final Schema DESCRIBE_GROUPS_RESPONSE_V0 = new Schema(
- new Field(GROUPS_KEY_NAME, new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0)));
- private static final Schema DESCRIBE_GROUPS_RESPONSE_V1 = new Schema(
- THROTTLE_TIME_MS,
- new Field(GROUPS_KEY_NAME, new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0)));
-
- /**
- * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
- */
- private static final Schema DESCRIBE_GROUPS_RESPONSE_V2 = DESCRIBE_GROUPS_RESPONSE_V1;
-
- public static Schema[] schemaVersions() {
- return new Schema[] {DESCRIBE_GROUPS_RESPONSE_V0, DESCRIBE_GROUPS_RESPONSE_V1, DESCRIBE_GROUPS_RESPONSE_V2};
- }
-
- public static final String UNKNOWN_STATE = "";
- public static final String UNKNOWN_PROTOCOL_TYPE = "";
- public static final String UNKNOWN_PROTOCOL = "";
-
/**
* Possible per-group error codes:
*
@@ -99,197 +42,92 @@ public class DescribeGroupsResponse extends AbstractResponse {
* AUTHORIZATION_FAILED (29)
*/
- private final Map<String, GroupMetadata> groups;
- private final int throttleTimeMs;
+ private DescribeGroupsResponseData data;
- public DescribeGroupsResponse(Map<String, GroupMetadata> groups) {
- this(DEFAULT_THROTTLE_TIME, groups);
+ public DescribeGroupsResponse(DescribeGroupsResponseData data) {
+ this.data = data;
}
- public DescribeGroupsResponse(int throttleTimeMs, Map<String, GroupMetadata> groups) {
- this.throttleTimeMs = throttleTimeMs;
- this.groups = groups;
+ public DescribeGroupsResponse(Struct struct, short version) {
+ this.data = new DescribeGroupsResponseData(struct, version);
}
- public DescribeGroupsResponse(Struct struct) {
- this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
- this.groups = new HashMap<>();
- for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) {
- Struct groupStruct = (Struct) groupObj;
+ public static DescribedGroupMember groupMember(
+ final String memberId,
+ final String clientId,
+ final String clientHost,
+ final byte[] assignment,
+ final byte[] metadata) {
+ return new DescribedGroupMember()
+ .setMemberId(memberId)
+ .setClientId(clientId)
+ .setClientHost(clientHost)
+ .setMemberAssignment(assignment)
+ .setMemberMetadata(metadata);
+ }
- String groupId = groupStruct.get(GROUP_ID);
- Errors error = Errors.forCode(groupStruct.get(ERROR_CODE));
- String state = groupStruct.getString(GROUP_STATE_KEY_NAME);
- String protocolType = groupStruct.getString(PROTOCOL_TYPE_KEY_NAME);
- String protocol = groupStruct.getString(PROTOCOL_KEY_NAME);
+ public static DescribedGroup groupMetadata(
+ final String groupId,
+ final Errors error,
+ final String state,
+ final String protocolType,
+ final String protocol,
+ final List<DescribedGroupMember> members,
+ final Set<Byte> authorizedOperations) {
+ DescribedGroup groupMetada = new DescribedGroup();
+ groupMetada.setGroupId(groupId)
+ .setErrorCode(error.code())
+ .setGroupState(state)
+ .setProtocolType(protocolType)
+ .setProtocolData(protocol)
+ .setMembers(members)
+ .setAuthorizedOperations(Utils.to32BitField(authorizedOperations));
+ return groupMetada;
+ }
- List<GroupMember> members = new ArrayList<>();
- for (Object memberObj : groupStruct.getArray(MEMBERS_KEY_NAME)) {
- Struct memberStruct = (Struct) memberObj;
- String memberId = memberStruct.get(MEMBER_ID);
- String clientId = memberStruct.getString(CLIENT_ID_KEY_NAME);
- String clientHost = memberStruct.getString(CLIENT_HOST_KEY_NAME);
- ByteBuffer memberMetadata = memberStruct.getBytes(MEMBER_METADATA_KEY_NAME);
- ByteBuffer memberAssignment = memberStruct.getBytes(MEMBER_ASSIGNMENT_KEY_NAME);
- members.add(new GroupMember(memberId, clientId, clientHost,
- memberMetadata, memberAssignment));
- }
- this.groups.put(groupId, new GroupMetadata(error, state, protocolType, protocol, members));
- }
+ public DescribeGroupsResponseData data() {
+ return data;
}
@Override
- public int throttleTimeMs() {
- return throttleTimeMs;
+ protected Struct toStruct(short version) {
+ return data.toStruct(version);
}
- public Map<String, GroupMetadata> groups() {
- return groups;
+ @Override
+ public int throttleTimeMs() {
+ return data.throttleTimeMs();
}
+ public static final String UNKNOWN_STATE = "";
+ public static final String UNKNOWN_PROTOCOL_TYPE = "";
+ public static final String UNKNOWN_PROTOCOL = "";
+
@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> errorCounts = new HashMap<>();
- for (GroupMetadata response : groups.values())
- updateErrorCounts(errorCounts, response.error);
+ data.groups().forEach(describedGroup -> {
+ updateErrorCounts(errorCounts, Errors.forCode(describedGroup.errorCode()));
+ });
return errorCounts;
}
- public static class GroupMetadata {
- private final Errors error;
- private final String state;
- private final String protocolType;
- private final String protocol;
- private final List<GroupMember> members;
-
- public GroupMetadata(Errors error,
- String state,
- String protocolType,
- String protocol,
- List<GroupMember> members) {
- this.error = error;
- this.state = state;
- this.protocolType = protocolType;
- this.protocol = protocol;
- this.members = members;
- }
-
- public Errors error() {
- return error;
- }
-
- public String state() {
- return state;
- }
-
- public String protocolType() {
- return protocolType;
- }
-
- public String protocol() {
- return protocol;
- }
-
- public List<GroupMember> members() {
- return members;
- }
-
- public static GroupMetadata forError(Errors error) {
- return new DescribeGroupsResponse.GroupMetadata(
- error,
- DescribeGroupsResponse.UNKNOWN_STATE,
- DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE,
- DescribeGroupsResponse.UNKNOWN_PROTOCOL,
- Collections.emptyList());
- }
- }
-
- public static class GroupMember {
- private final String memberId;
- private final String clientId;
- private final String clientHost;
- private final ByteBuffer memberMetadata;
- private final ByteBuffer memberAssignment;
-
- public GroupMember(String memberId,
- String clientId,
- String clientHost,
- ByteBuffer memberMetadata,
- ByteBuffer memberAssignment) {
- this.memberId = memberId;
- this.clientId = clientId;
- this.clientHost = clientHost;
- this.memberMetadata = memberMetadata;
- this.memberAssignment = memberAssignment;
- }
-
- public String memberId() {
- return memberId;
- }
-
- public String clientId() {
- return clientId;
- }
-
- public String clientHost() {
- return clientHost;
- }
-
- public ByteBuffer memberMetadata() {
- return memberMetadata;
- }
-
- public ByteBuffer memberAssignment() {
- return memberAssignment;
- }
- }
-
- public static DescribeGroupsResponse fromError(Errors error, List<String> groupIds) {
- return fromError(DEFAULT_THROTTLE_TIME, error, groupIds);
+ public static DescribedGroup forError(String groupId, Errors error) {
+ return groupMetadata(groupId, error, DescribeGroupsResponse.UNKNOWN_STATE, DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE,
+ DescribeGroupsResponse.UNKNOWN_PROTOCOL, Collections.emptyList(), Collections.emptySet());
}
public static DescribeGroupsResponse fromError(int throttleTimeMs, Errors error, List<String> groupIds) {
- GroupMetadata errorMetadata = GroupMetadata.forError(error);
- Map<String, GroupMetadata> groups = new HashMap<>();
+ DescribeGroupsResponseData describeGroupsResponseData = new DescribeGroupsResponseData();
+ describeGroupsResponseData.setThrottleTimeMs(throttleTimeMs);
for (String groupId : groupIds)
- groups.put(groupId, errorMetadata);
- return new DescribeGroupsResponse(throttleTimeMs, groups);
- }
-
- @Override
- protected Struct toStruct(short version) {
- Struct struct = new Struct(ApiKeys.DESCRIBE_GROUPS.responseSchema(version));
- struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
-
- List<Struct> groupStructs = new ArrayList<>();
- for (Map.Entry<String, GroupMetadata> groupEntry : groups.entrySet()) {
- Struct groupStruct = struct.instance(GROUPS_KEY_NAME);
- GroupMetadata group = groupEntry.getValue();
- groupStruct.set(GROUP_ID, groupEntry.getKey());
- groupStruct.set(ERROR_CODE, group.error.code());
- groupStruct.set(GROUP_STATE_KEY_NAME, group.state);
- groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType);
- groupStruct.set(PROTOCOL_KEY_NAME, group.protocol);
- List<Struct> membersList = new ArrayList<>();
- for (GroupMember member : group.members) {
- Struct memberStruct = groupStruct.instance(MEMBERS_KEY_NAME);
- memberStruct.set(MEMBER_ID, member.memberId);
- memberStruct.set(CLIENT_ID_KEY_NAME, member.clientId);
- memberStruct.set(CLIENT_HOST_KEY_NAME, member.clientHost);
- memberStruct.set(MEMBER_METADATA_KEY_NAME, member.memberMetadata);
- memberStruct.set(MEMBER_ASSIGNMENT_KEY_NAME, member.memberAssignment);
- membersList.add(memberStruct);
- }
- groupStruct.set(MEMBERS_KEY_NAME, membersList.toArray());
- groupStructs.add(groupStruct);
- }
- struct.set(GROUPS_KEY_NAME, groupStructs.toArray());
-
- return struct;
+ describeGroupsResponseData.groups().add(DescribeGroupsResponse.forError(groupId, error));
+ return new DescribeGroupsResponse(describeGroupsResponseData);
}
public static DescribeGroupsResponse parse(ByteBuffer buffer, short version) {
- return new DescribeGroupsResponse(ApiKeys.DESCRIBE_GROUPS.parseResponse(version, buffer));
+ return new DescribeGroupsResponse(
+ ApiKeys.DESCRIBE_GROUPS.responseSchema(version).read(buffer), version);
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 8c9d162..53417d2 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -1005,4 +1005,28 @@ public final class Utils {
.collect(Collectors.collectingAndThen(Collectors.toList(), finisher));
}
+ public static int to32BitField(final Set<Byte> bytes) {
+ int value = 0;
+ for (final byte b : bytes)
+ value |= 1 << checkRange(b);
+ return value;
+ }
+
+ private static byte checkRange(final byte i) {
+ if (i > 31)
+ throw new IllegalArgumentException("out of range: i>31, i = " + i);
+ if (i < 0)
+ throw new IllegalArgumentException("out of range: i<0, i = " + i);
+ return i;
+ }
+
+ public static Set<Byte> from32BitField(final int intValue) {
+ Set<Byte> result = new HashSet<>();
+ for (int itr = intValue, count = 0; itr != 0; itr >>>= 1) {
+ if ((itr & 1) != 0)
+ result.add((byte) count);
+ count++;
+ }
+ return result;
+ }
}
diff --git a/clients/src/main/resources/common/message/DescribeGroupsRequest.json b/clients/src/main/resources/common/message/DescribeGroupsRequest.json
index 8039a7e..3557bae 100644
--- a/clients/src/main/resources/common/message/DescribeGroupsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeGroupsRequest.json
@@ -18,9 +18,12 @@
"type": "request",
"name": "DescribeGroupsRequest",
// Versions 1 and 2 are the same as version 0.
- "validVersions": "0-2",
+ // Starting in version 3, authorized operations can be requested.
+ "validVersions": "0-3",
"fields": [
{ "name": "Groups", "type": "[]string", "versions": "0+",
- "about": "The names of the groups to describe" }
+ "about": "The names of the groups to describe" },
+ { "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "3+",
+ "about": "Whether to include authorized operations." }
]
}
diff --git a/clients/src/main/resources/common/message/DescribeGroupsResponse.json b/clients/src/main/resources/common/message/DescribeGroupsResponse.json
index c5fcd82..f4677a7 100644
--- a/clients/src/main/resources/common/message/DescribeGroupsResponse.json
+++ b/clients/src/main/resources/common/message/DescribeGroupsResponse.json
@@ -19,7 +19,8 @@
"name": "DescribeGroupsResponse",
// Version 1 added throttle time.
// Starting in version 2, on quota violation, brokers send out responses before throttling.
- "validVersions": "0-2",
+ // Starting in version 3, brokers can send authorized operations.
+ "validVersions": "0-3",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
@@ -51,7 +52,9 @@
// This is currently only provided if the group is in the Stable state.
{ "name": "MemberAssignment", "type": "bytes", "versions": "0+",
"about": "The current assignment provided by the group leader." }
- ]}
+ ]},
+ { "name": "AuthorizedOperations", "type": "int32", "versions": "3+",
+ "about": "32-bit bitfield to represent authorized operations for this group." }
]}
]
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index debb3a8..782dc16 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -53,6 +53,7 @@ import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
@@ -1055,7 +1056,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
- final Map<String, DescribeGroupsResponse.GroupMetadata> groupMetadataMap = new HashMap<>();
+ DescribeGroupsResponseData data = new DescribeGroupsResponseData();
TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
@@ -1066,29 +1067,34 @@ public class KafkaAdminClientTest {
topicPartitions.add(2, myTopicPartition2);
final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(topicPartitions));
+ byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()];
+ memberAssignment.get(memberAssignmentBytes);
- groupMetadataMap.put(
- "group-0",
- new DescribeGroupsResponse.GroupMetadata(
+ data.groups().add(DescribeGroupsResponse.groupMetadata(
+ "group-0",
Errors.NONE,
"",
ConsumerProtocol.PROTOCOL_TYPE,
"",
asList(
- new DescribeGroupsResponse.GroupMember("0", "clientId0", "clientHost", null, memberAssignment),
- new DescribeGroupsResponse.GroupMember("1", "clientId1", "clientHost", null, memberAssignment))));
- groupMetadataMap.put(
- "group-connect-0",
- new DescribeGroupsResponse.GroupMetadata(
+ DescribeGroupsResponse.groupMember("0", "clientId0", "clientHost", memberAssignmentBytes, null),
+ DescribeGroupsResponse.groupMember("1", "clientId1", "clientHost", memberAssignmentBytes, null)
+ ),
+ Collections.emptySet()));
+
+ data.groups().add(DescribeGroupsResponse.groupMetadata(
+ "group-connect-0",
Errors.NONE,
"",
"connect",
"",
asList(
- new DescribeGroupsResponse.GroupMember("0", "clientId0", "clientHost", null, memberAssignment),
- new DescribeGroupsResponse.GroupMember("1", "clientId1", "clientHost", null, memberAssignment))));
+ DescribeGroupsResponse.groupMember("0", "clientId0", "clientHost", memberAssignmentBytes, null),
+ DescribeGroupsResponse.groupMember("1", "clientId1", "clientHost", memberAssignmentBytes, null)
+ ),
+ Collections.emptySet()));
- env.kafkaClient().prepareResponse(new DescribeGroupsResponse(groupMetadataMap));
+ env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(singletonList("group-0"));
final ConsumerGroupDescription groupDescription = result.describedGroups().get("group-0").get();
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index c105d3b..f1e2063 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -38,6 +38,8 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.DescribeGroupsRequestData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
import org.apache.kafka.common.message.ElectPreferredLeadersRequestData.TopicPartitions;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
@@ -767,18 +769,21 @@ public class RequestResponseTest {
}
private DescribeGroupsRequest createDescribeGroupRequest() {
- return new DescribeGroupsRequest.Builder(singletonList("test-group")).build();
+ return new DescribeGroupsRequest.Builder(
+ new DescribeGroupsRequestData().
+ setGroups(Collections.singletonList("test-group"))).build();
}
private DescribeGroupsResponse createDescribeGroupResponse() {
String clientId = "consumer-1";
String clientHost = "localhost";
- ByteBuffer empty = ByteBuffer.allocate(0);
- DescribeGroupsResponse.GroupMember member = new DescribeGroupsResponse.GroupMember("memberId",
- clientId, clientHost, empty, empty);
- DescribeGroupsResponse.GroupMetadata metadata = new DescribeGroupsResponse.GroupMetadata(Errors.NONE,
- "STABLE", "consumer", "roundrobin", asList(member));
- return new DescribeGroupsResponse(Collections.singletonMap("test-group", metadata));
+ DescribeGroupsResponseData describeGroupsResponseData = new DescribeGroupsResponseData();
+ DescribeGroupsResponseData.DescribedGroupMember member = DescribeGroupsResponse.groupMember("memberId",
+ clientId, clientHost, new byte[0], new byte[0]);
+ DescribeGroupsResponseData.DescribedGroup metadata = DescribeGroupsResponse.groupMetadata("test-group", Errors.NONE,
+ "STABLE", "consumer", "roundrobin", asList(member), Collections.emptySet());
+ describeGroupsResponseData.groups().add(metadata);
+ return new DescribeGroupsResponse(describeGroupsResponseData);
}
private LeaveGroupRequest createLeaveGroupRequest() {
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index d5029b6..172a992 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -30,16 +30,19 @@ import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
-import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Random;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.formatAddress;
import static org.apache.kafka.common.utils.Utils.formatBytes;
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
+import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.common.utils.Utils.validHostPattern;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -110,8 +113,8 @@ public class UtilsTest {
@Test
public void testJoin() {
assertEquals("", Utils.join(Collections.emptyList(), ","));
- assertEquals("1", Utils.join(Arrays.asList("1"), ","));
- assertEquals("1,2,3", Utils.join(Arrays.asList(1, 2, 3), ","));
+ assertEquals("1", Utils.join(asList("1"), ","));
+ assertEquals("1,2,3", Utils.join(asList(1, 2, 3), ","));
}
@Test
@@ -466,4 +469,22 @@ public class UtilsTest {
Utils.delete(tempDir);
assertFalse(Files.exists(tempDir.toPath()));
}
+
+ @Test
+ public void testConvertTo32BitField() {
+ Set<Byte> bytes = mkSet((byte) 0, (byte) 1, (byte) 5, (byte) 10, (byte) 31);
+ int bitField = Utils.to32BitField(bytes);
+ assertEquals(bytes, Utils.from32BitField(bitField));
+
+ bytes = new HashSet<>();
+ bitField = Utils.to32BitField(bytes);
+ assertEquals(bytes, Utils.from32BitField(bitField));
+
+ bytes = mkSet((byte) 0, (byte) 11, (byte) 32);
+ try {
+ Utils.to32BitField(bytes);
+ fail("Expected exception not thrown");
+ } catch (IllegalArgumentException e) {
+ }
+ }
}
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index 51177b2..f945b25 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -41,14 +41,6 @@ object AclCommand extends Logging {
private val Newline = scala.util.Properties.lineSeparator
- val ResourceTypeToValidOperations: Map[JResourceType, Set[Operation]] = Map[JResourceType, Set[Operation]](
- JResourceType.TOPIC -> Set(Read, Write, Create, Describe, Delete, Alter, DescribeConfigs, AlterConfigs, All),
- JResourceType.GROUP -> Set(Read, Describe, Delete, All),
- JResourceType.CLUSTER -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe, All),
- JResourceType.TRANSACTIONAL_ID -> Set(Describe, Write, All),
- JResourceType.DELEGATION_TOKEN -> Set(Describe, All)
- )
-
def main(args: Array[String]) {
val opts = new AclCommandOptions(args)
@@ -454,7 +446,7 @@ object AclCommand extends Logging {
private def validateOperation(opts: AclCommandOptions, resourceToAcls: Map[ResourcePatternFilter, Set[Acl]]): Unit = {
for ((resource, acls) <- resourceToAcls) {
- val validOps = ResourceTypeToValidOperations(resource.resourceType)
+ val validOps = ResourceType.fromJava(resource.resourceType).supportedOperations + All
if ((acls.map(_.operation) -- validOps).nonEmpty)
CommandLineUtils.printUsageAndDie(opts.parser, s"ResourceType ${resource.resourceType} only supports operations ${validOps.mkString(",")}")
}
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index aa5dfec..bd09db4 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -27,15 +27,15 @@ import org.apache.kafka.common.config.ConfigDef.ValidString._
import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException}
+import org.apache.kafka.common.message.{DescribeGroupsRequestData, DescribeGroupsResponseData}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.Selector
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
-import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMetadata
import org.apache.kafka.common.requests.OffsetFetchResponse
import org.apache.kafka.common.utils.LogContext
-import org.apache.kafka.common.utils.{KafkaThread, Time, Utils}
+import org.apache.kafka.common.utils.{KafkaThread, Time}
import org.apache.kafka.common.{Node, TopicPartition}
import scala.collection.JavaConverters._
@@ -233,20 +233,20 @@ class AdminClient(val time: Time,
consumers: Option[List[ConsumerSummary]],
coordinator: Node)
- def describeConsumerGroupHandler(coordinator: Node, groupId: String): GroupMetadata = {
+ def describeConsumerGroupHandler(coordinator: Node, groupId: String): DescribeGroupsResponseData.DescribedGroup = {
val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS,
- new DescribeGroupsRequest.Builder(Collections.singletonList(groupId)))
+ new DescribeGroupsRequest.Builder(new DescribeGroupsRequestData().setGroups(Collections.singletonList(groupId))))
val response = responseBody.asInstanceOf[DescribeGroupsResponse]
- val metadata = response.groups.get(groupId)
- if (metadata == null)
- throw new KafkaException(s"Response from broker contained no metadata for group $groupId")
+ val metadata = response.data().groups().asScala.find(group => groupId.equals(group.groupId()))
+ .getOrElse(throw new KafkaException(s"Response from broker contained no metadata for group $groupId"))
metadata
}
def describeConsumerGroup(groupId: String, timeoutMs: Long = 0): ConsumerGroupSummary = {
- def isValidConsumerGroupResponse(metadata: DescribeGroupsResponse.GroupMetadata): Boolean =
- metadata.error == Errors.NONE && (metadata.state == "Dead" || metadata.state == "Empty" || metadata.protocolType == ConsumerProtocol.PROTOCOL_TYPE)
+ def isValidConsumerGroupResponse(metadata: DescribeGroupsResponseData.DescribedGroup): Boolean =
+ metadata.errorCode() == Errors.NONE.code() && (metadata.groupState() == "Dead" ||
+ metadata.groupState() == "Empty" || metadata.protocolType == ConsumerProtocol.PROTOCOL_TYPE)
val startTime = time.milliseconds
val coordinator = findCoordinator(groupId, timeoutMs)
@@ -262,16 +262,16 @@ class AdminClient(val time: Time,
throw new TimeoutException("The consumer group command timed out while waiting for group to initialize")
val consumers = metadata.members.asScala.map { consumer =>
- ConsumerSummary(consumer.memberId, consumer.clientId, consumer.clientHost, metadata.state match {
+ ConsumerSummary(consumer.memberId, consumer.clientId, consumer.clientHost, metadata.groupState() match {
case "Stable" =>
- val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(Utils.readBytes(consumer.memberAssignment)))
+ val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(consumer.memberAssignment))
assignment.partitions.asScala.toList
case _ =>
List()
})
}.toList
- ConsumerGroupSummary(metadata.state, metadata.protocol, Some(consumers), coordinator)
+ ConsumerGroupSummary(metadata.groupState(), metadata.protocolData(), Some(consumers), coordinator)
}
def deleteConsumerGroups(groups: List[String]): Map[String, Errors] = {
diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala
index 65a0373..ae55c47 100644
--- a/core/src/main/scala/kafka/security/auth/ResourceType.scala
+++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala
@@ -23,6 +23,8 @@ import org.apache.kafka.common.resource.{ResourceType => JResourceType}
sealed trait ResourceType extends BaseEnum with Ordered[ ResourceType ] {
def error: Errors
def toJava: JResourceType
+ // this method output will not include "All" Operation type
+ def supportedOperations: Set[Operation]
override def compare(that: ResourceType): Int = this.name compare that.name
}
@@ -31,30 +33,35 @@ case object Topic extends ResourceType {
val name = "Topic"
val error = Errors.TOPIC_AUTHORIZATION_FAILED
val toJava = JResourceType.TOPIC
+ val supportedOperations = Set(Read, Write, Create, Describe, Delete, Alter, DescribeConfigs, AlterConfigs)
}
case object Group extends ResourceType {
val name = "Group"
val error = Errors.GROUP_AUTHORIZATION_FAILED
val toJava = JResourceType.GROUP
+ val supportedOperations = Set(Read, Describe, Delete)
}
case object Cluster extends ResourceType {
val name = "Cluster"
val error = Errors.CLUSTER_AUTHORIZATION_FAILED
val toJava = JResourceType.CLUSTER
+ val supportedOperations = Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe)
}
case object TransactionalId extends ResourceType {
val name = "TransactionalId"
val error = Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
val toJava = JResourceType.TRANSACTIONAL_ID
+ val supportedOperations = Set(Describe, Write)
}
case object DelegationToken extends ResourceType {
val name = "DelegationToken"
val error = Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED
val toJava = JResourceType.DELEGATION_TOKEN
+ val supportedOperations : Set[Operation] = Set(Describe)
}
object ResourceType {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index bdd794c..2361ee5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -18,6 +18,7 @@
package kafka.server
import java.lang.{Long => JLong}
+import java.lang.{Byte => JByte}
import java.nio.ByteBuffer
import java.util
import java.util.concurrent.ConcurrentHashMap
@@ -44,7 +45,7 @@ import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
-import org.apache.kafka.common.message.CreateTopicsResponseData
+import org.apache.kafka.common.message.{CreateTopicsResponseData, DescribeGroupsResponseData}
import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultSet}
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData
import org.apache.kafka.common.message.LeaveGroupResponseData
@@ -1186,24 +1187,65 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleDescribeGroupRequest(request: RequestChannel.Request) {
+
+ def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = {
+ def createResponse(requestThrottleMs: Int): AbstractResponse = {
+ describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
+ new DescribeGroupsResponse(describeGroupsResponseData)
+ }
+ sendResponseMaybeThrottle(request, createResponse)
+ }
+
val describeRequest = request.body[DescribeGroupsRequest]
+ val describeGroupsResponseData = new DescribeGroupsResponseData()
- val groups = describeRequest.groupIds.asScala.map { groupId =>
- if (!authorize(request.session, Describe, Resource(Group, groupId, LITERAL))) {
- groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.GROUP_AUTHORIZATION_FAILED)
+ describeRequest.data().groups().asScala.foreach { groupId =>
+ val resource = Resource(Group, groupId, LITERAL)
+ if (!authorize(request.session, Describe, resource)) {
+ describeGroupsResponseData.groups().add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED))
} else {
val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
val members = summary.members.map { member =>
- val metadata = ByteBuffer.wrap(member.metadata)
- val assignment = ByteBuffer.wrap(member.assignment)
- new DescribeGroupsResponse.GroupMember(member.memberId, member.clientId, member.clientHost, metadata, assignment)
+ new DescribeGroupsResponseData.DescribedGroupMember()
+ .setMemberId(member.memberId)
+ .setClientId(member.clientId)
+ .setClientHost(member.clientHost)
+ .setMemberAssignment(member.assignment)
+ .setMemberMetadata(member.assignment)
}
- groupId -> new DescribeGroupsResponse.GroupMetadata(error, summary.state, summary.protocolType,
- summary.protocol, members.asJava)
+
+ val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
+ .setErrorCode(error.code())
+ .setGroupId(groupId)
+ .setGroupState(summary.state)
+ .setProtocolType(summary.protocolType)
+ .setProtocolData(summary.protocol)
+ .setMembers(members.asJava)
+
+ if (request.header.apiVersion >= 3) {
+ if (error == Errors.NONE && describeRequest.data().includeAuthorizedOperations()) {
+ describedGroup.setAuthorizedOperations(authorizedOperations(request.session, resource))
+ } else {
+ describedGroup.setAuthorizedOperations(0)
+ }
+ }
+
+ describeGroupsResponseData.groups().add(describedGroup)
}
- }.toMap
+ }
+
+ sendResponseCallback(describeGroupsResponseData)
+ }
+
+
+ private def authorizedOperations(session: RequestChannel.Session, resource: Resource): Int = {
+ val authorizedOps = authorizer match {
+ case None => resource.resourceType.supportedOperations
+ case Some(auth) => resource.resourceType.supportedOperations
+ .filter(operation => authorize(session, operation, resource))
+ }
- sendResponseMaybeThrottle(request, requestThrottleMs => new DescribeGroupsResponse(requestThrottleMs, groups.asJava))
+ Utils.to32BitField(authorizedOps.map(operation => operation.toJava.code().asInstanceOf[JByte]).asJava)
}
def handleListGroupsRequest(request: RequestChannel.Request) {
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 96de860..eed8004 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -47,14 +47,14 @@ import org.junit.Assert._
import scala.util.Random
import scala.collection.JavaConverters._
-
import kafka.zk.KafkaZkClient
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
-
import java.lang.{Long => JLong}
+import kafka.security.auth.Group
+
/**
* An integration test of the KafkaAdminClient.
*
@@ -1142,12 +1142,14 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
!matching.isEmpty
}, s"Expected to be able to list $testGroupId")
- val result = client.describeConsumerGroups(Seq(testGroupId, fakeGroupId).asJava)
+ val result = client.describeConsumerGroups(Seq(testGroupId, fakeGroupId).asJava,
+ new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
assertEquals(2, result.describedGroups().size())
// Test that we can get information about the test consumer group.
assertTrue(result.describedGroups().containsKey(testGroupId))
val testGroupDescription = result.describedGroups().get(testGroupId).get()
+
assertEquals(testGroupId, testGroupDescription.groupId())
assertFalse(testGroupDescription.isSimpleConsumerGroup())
assertEquals(1, testGroupDescription.members().size())
@@ -1157,14 +1159,19 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertEquals(testNumPartitions, topicPartitions.size())
assertEquals(testNumPartitions, topicPartitions.asScala.
count(tp => tp.topic().equals(testTopicName)))
+ val expectedOperations = Group.supportedOperations
+ .map(operation => operation.toJava).asJava
+ assertEquals(expectedOperations, testGroupDescription.authorizedOperations())
// Test that the fake group is listed as dead.
assertTrue(result.describedGroups().containsKey(fakeGroupId))
val fakeGroupDescription = result.describedGroups().get(fakeGroupId).get()
+
assertEquals(fakeGroupId, fakeGroupDescription.groupId())
assertEquals(0, fakeGroupDescription.members().size())
assertEquals("", fakeGroupDescription.partitionAssignor())
assertEquals(ConsumerGroupState.DEAD, fakeGroupDescription.state())
+ assertEquals(expectedOperations, fakeGroupDescription.authorizedOperations())
// Test that all() returns 2 results
assertEquals(2, result.all().get().size())
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index e0c2832..044f595 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -33,9 +33,8 @@ import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
-import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.{CreateTopicsRequestData, DescribeGroupsRequestData, LeaveGroupRequestData}
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
-import org.apache.kafka.common.message.LeaveGroupRequestData
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records, SimpleRecord}
@@ -163,7 +162,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.UPDATE_METADATA -> ((resp: requests.UpdateMetadataResponse) => resp.error),
ApiKeys.JOIN_GROUP -> ((resp: JoinGroupResponse) => resp.error),
ApiKeys.SYNC_GROUP -> ((resp: SyncGroupResponse) => resp.error),
- ApiKeys.DESCRIBE_GROUPS -> ((resp: DescribeGroupsResponse) => resp.groups.get(group).error),
+ ApiKeys.DESCRIBE_GROUPS -> ((resp: DescribeGroupsResponse) => {
+ val errorCode = resp.data().groups().asScala.find(g => group.equals(g.groupId())).head.errorCode()
+ Errors.forCode(errorCode)
+ }),
ApiKeys.HEARTBEAT -> ((resp: HeartbeatResponse) => resp.error),
ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.error),
ApiKeys.DELETE_GROUPS -> ((resp: DeleteGroupsResponse) => resp.get(group)),
@@ -318,7 +320,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
private def createDescribeGroupsRequest = {
- new DescribeGroupsRequest.Builder(List(group).asJava).build()
+ new DescribeGroupsRequest.Builder(new DescribeGroupsRequestData().setGroups(List(group).asJava)).build()
}
private def createOffsetCommitRequest = {
diff --git a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
new file mode 100644
index 0000000..5e53592
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package kafka.api
+
+import java.io.File
+import java.util
+import java.util.Properties
+
+import kafka.security.auth.{Allow, Alter, Authorizer, ClusterAction, Group, Operation, PermissionType, SimpleAclAuthorizer, Acl => AuthAcl, Resource => AuthResource}
+import kafka.server.KafkaConfig
+import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, DescribeConsumerGroupsOptions}
+import org.apache.kafka.common.acl._
+import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.utils.Utils
+import org.junit.Assert.assertEquals
+import org.junit.{After, Before, Test}
+
+import scala.collection.JavaConverters._
+
+class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslSetup {
+ override val serverCount = 1
+ this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
+ this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName)
+
+ var client: AdminClient = _
+ val group1 = "group1"
+ val group2 = "group2"
+ val group3 = "group3"
+
+ override protected def securityProtocol = SecurityProtocol.SASL_SSL
+
+ override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
+
+ override def configureSecurityBeforeServersStart() {
+ val authorizer = CoreUtils.createObject[Authorizer](classOf[SimpleAclAuthorizer].getName)
+ try {
+ authorizer.configure(this.configs.head.originals())
+ authorizer.addAcls(Set(clusterAcl(JaasTestUtils.KafkaServerPrincipalUnqualifiedName, Allow, ClusterAction),
+ clusterAcl(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, Allow, Alter)),
+ AuthResource.ClusterResource)
+ } finally {
+ authorizer.close()
+ }
+ }
+
+ @Before
+ override def setUp(): Unit = {
+ startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KafkaServerContextName))
+ super.setUp()
+ TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
+ }
+
+ private def clusterAcl(userName: String, permissionType: PermissionType, operation: Operation): AuthAcl = {
+ new AuthAcl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, userName), permissionType,
+ AuthAcl.WildCardHost, operation)
+ }
+
+ @After
+ override def tearDown(): Unit = {
+ if (client != null)
+ Utils.closeQuietly(client, "AdminClient")
+ super.tearDown()
+ closeSasl()
+ }
+
+ val group1Acl = new AclBinding(new ResourcePattern(ResourceType.GROUP, group1, PatternType.LITERAL),
+ new AccessControlEntry("User:" + JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.ALL, AclPermissionType.ALLOW))
+
+ val group2Acl = new AclBinding(new ResourcePattern(ResourceType.GROUP, group2, PatternType.LITERAL),
+ new AccessControlEntry("User:" + JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
+
+ val group3Acl = new AclBinding(new ResourcePattern(ResourceType.GROUP, group3, PatternType.LITERAL),
+ new AccessControlEntry("User:" + JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.DELETE, AclPermissionType.ALLOW))
+
+ def createConfig(): Properties = {
+ val adminClientConfig = new Properties()
+ adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+ adminClientConfig.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000")
+ val securityProps: util.Map[Object, Object] =
+ TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
+ securityProps.asScala.foreach { case (key, value) => adminClientConfig.put(key.asInstanceOf[String], value) }
+ adminClientConfig
+ }
+
+ @Test
+ def testConsumerGroupAuthorizedOperations(): Unit = {
+ client = AdminClient.create(createConfig())
+
+ val results = client.createAcls(List(group1Acl, group2Acl, group3Acl).asJava)
+ assertEquals(Set(group1Acl, group2Acl, group3Acl), results.values.keySet.asScala)
+ results.all.get
+
+ val describeConsumerGroupsResult = client.describeConsumerGroups(Seq(group1, group2, group3).asJava,
+ new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
+ assertEquals(3, describeConsumerGroupsResult.describedGroups().size())
+ val expectedOperations = Group.supportedOperations
+ .map(operation => operation.toJava).asJava
+
+ val group1Description = describeConsumerGroupsResult.describedGroups().get(group1).get
+ assertEquals(expectedOperations, group1Description.authorizedOperations())
+
+ val group2Description = describeConsumerGroupsResult.describedGroups().get(group2).get
+ assertEquals(Set(AclOperation.DESCRIBE), group2Description.authorizedOperations().asScala.toSet)
+
+ val group3Description = describeConsumerGroupsResult.describedGroups().get(group3).get
+ assertEquals(Set(AclOperation.DESCRIBE, AclOperation.DELETE), group3Description.authorizedOperations().asScala.toSet)
+ }
+
+}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 2d82e7f..b0eb7cb 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -24,10 +24,9 @@ import kafka.security.auth._
import kafka.utils.TestUtils
import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.message.{ElectPreferredLeadersRequestData, LeaveGroupRequestData}
+import org.apache.kafka.common.message.{CreateTopicsRequestData, DescribeGroupsRequestData, ElectPreferredLeadersRequestData, LeaveGroupRequestData}
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
import org.apache.kafka.common.{Node, TopicPartition}
-import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
import org.apache.kafka.common.message.SaslHandshakeRequestData
import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
@@ -268,7 +267,7 @@ class RequestQuotaTest extends BaseRequestTest {
new SyncGroupRequest.Builder("test-sync-group", 1, "", Map[String, ByteBuffer]().asJava)
case ApiKeys.DESCRIBE_GROUPS =>
- new DescribeGroupsRequest.Builder(List("test-group").asJava)
+ new DescribeGroupsRequest.Builder(new DescribeGroupsRequestData().setGroups(List("test-group").asJava))
case ApiKeys.LIST_GROUPS =>
new ListGroupsRequest.Builder()
@@ -443,7 +442,8 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.HEARTBEAT => new HeartbeatResponse(response).throttleTimeMs
case ApiKeys.LEAVE_GROUP => new LeaveGroupResponse(response).throttleTimeMs
case ApiKeys.SYNC_GROUP => new SyncGroupResponse(response).throttleTimeMs
- case ApiKeys.DESCRIBE_GROUPS => new DescribeGroupsResponse(response).throttleTimeMs
+ case ApiKeys.DESCRIBE_GROUPS =>
+ new DescribeGroupsResponse(response, ApiKeys.DESCRIBE_GROUPS.latestVersion()).throttleTimeMs
case ApiKeys.LIST_GROUPS => new ListGroupsResponse(response).throttleTimeMs
case ApiKeys.API_VERSIONS => new ApiVersionsResponse(response).throttleTimeMs
case ApiKeys.CREATE_TOPICS =>