You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/01/31 21:23:18 UTC
[kafka] branch trunk updated: KAFKA-6275: Add DeleteGroups API
(KIP-229) (#4479)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1ed6da7 KAFKA-6275: Add DeleteGroups API (KIP-229) (#4479)
1ed6da7 is described below
commit 1ed6da7cc8eb2231f73509c907c2e67af2f249d2
Author: Vahid Hashemian <va...@us.ibm.com>
AuthorDate: Wed Jan 31 13:23:12 2018 -0800
KAFKA-6275: Add DeleteGroups API (KIP-229) (#4479)
Reviewers: Manikumar Reddy O <ma...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
.../common/errors/GroupIdNotFoundException.java | 31 +++
.../common/errors/GroupNotEmptyException.java | 31 +++
.../org/apache/kafka/common/protocol/ApiKeys.java | 5 +-
.../org/apache/kafka/common/protocol/Errors.java | 24 +-
.../kafka/common/requests/AbstractRequest.java | 2 +
.../kafka/common/requests/AbstractResponse.java | 2 +
.../kafka/common/requests/DeleteGroupsRequest.java | 117 ++++++++++
.../common/requests/DeleteGroupsResponse.java | 129 +++++++++++
.../kafka/common/requests/RequestResponseTest.java | 13 ++
core/src/main/scala/kafka/admin/AclCommand.scala | 2 +-
core/src/main/scala/kafka/admin/AdminClient.scala | 43 ++++
core/src/main/scala/kafka/admin/AdminUtils.scala | 3 +-
.../scala/kafka/admin/ConsumerGroupCommand.scala | 84 ++++---
core/src/main/scala/kafka/api/ApiVersion.scala | 10 +-
.../kafka/coordinator/group/GroupCoordinator.scala | 48 +++-
.../coordinator/group/GroupMetadataManager.scala | 17 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 20 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 40 +++-
.../kafka/admin/DeleteConsumerGroupsTest.scala | 251 +++++++++++++++++++++
.../coordinator/group/GroupCoordinatorTest.scala | 88 ++++++++
.../group/GroupMetadataManagerTest.scala | 20 ++
.../scala/unit/kafka/server/RequestQuotaTest.scala | 8 +-
22 files changed, 941 insertions(+), 47 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java
new file mode 100644
index 0000000..1ff30f1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class GroupIdNotFoundException extends ApiException {
+ private final String groupId;
+
+ public GroupIdNotFoundException(String groupId) {
+ super("The group id " + groupId + " was not found");
+ this.groupId = groupId;
+ }
+
+ public String groupId() {
+ return groupId;
+ }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java
new file mode 100644
index 0000000..264e613
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class GroupNotEmptyException extends ApiException {
+ private final String groupId;
+
+ public GroupNotEmptyException(String groupId) {
+ super("The group " + groupId + " is not empty");
+ this.groupId = groupId;
+ }
+
+ public String groupId() {
+ return groupId;
+ }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index b408e80..e0cdfd9 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -43,6 +43,8 @@ import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.DeleteAclsRequest;
import org.apache.kafka.common.requests.DeleteAclsResponse;
+import org.apache.kafka.common.requests.DeleteGroupsRequest;
+import org.apache.kafka.common.requests.DeleteGroupsResponse;
import org.apache.kafka.common.requests.DeleteRecordsRequest;
import org.apache.kafka.common.requests.DeleteRecordsResponse;
import org.apache.kafka.common.requests.DeleteTopicsRequest;
@@ -183,7 +185,8 @@ public enum ApiKeys {
CREATE_DELEGATION_TOKEN(38, "CreateDelegationToken", CreateDelegationTokenRequest.schemaVersions(), CreateDelegationTokenResponse.schemaVersions()),
RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", RenewDelegationTokenRequest.schemaVersions(), RenewDelegationTokenResponse.schemaVersions()),
EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequest.schemaVersions(), ExpireDelegationTokenResponse.schemaVersions()),
- DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(), DescribeDelegationTokenResponse.schemaVersions());
+ DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(), DescribeDelegationTokenResponse.schemaVersions()),
+ DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequest.schemaVersions(), DeleteGroupsResponse.schemaVersions());
private static final ApiKeys[] ID_TO_TYPE;
private static final int MIN_API_KEY = 0;
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index bd5b800..e2b8aea 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -17,8 +17,6 @@
package org.apache.kafka.common.protocol;
import org.apache.kafka.common.errors.ApiException;
-import org.apache.kafka.common.errors.InvalidPrincipalTypeException;
-import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.ConcurrentTransactionsException;
@@ -26,12 +24,15 @@ import org.apache.kafka.common.errors.ControllerMovedException;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.CorruptRecordException;
-import org.apache.kafka.common.errors.LogDirNotFoundException;
import org.apache.kafka.common.errors.DuplicateSequenceException;
+import org.apache.kafka.common.errors.DelegationTokenAuthorizationException;
import org.apache.kafka.common.errors.DelegationTokenDisabledException;
+import org.apache.kafka.common.errors.DelegationTokenExpiredException;
import org.apache.kafka.common.errors.DelegationTokenNotFoundException;
import org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
@@ -41,6 +42,7 @@ import org.apache.kafka.common.errors.InvalidFetchSizeException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.InvalidPidMappingException;
+import org.apache.kafka.common.errors.InvalidPrincipalTypeException;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
@@ -52,6 +54,7 @@ import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.InvalidTxnTimeoutException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.LogDirNotFoundException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.NotCoordinatorException;
@@ -70,10 +73,9 @@ import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.ReplicaNotAvailableException;
import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.DelegationTokenAuthorizationException;
-import org.apache.kafka.common.errors.DelegationTokenExpiredException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
@@ -594,6 +596,18 @@ public enum Errors {
public ApiException build(String message) {
return new InvalidPrincipalTypeException(message);
}
+ }),
+ NON_EMPTY_GROUP(68, "The group is not empty", new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new GroupNotEmptyException(message);
+ }
+ }),
+ GROUP_ID_NOT_FOUND(69, "The group id does not exist", new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new GroupIdNotFoundException(message);
+ }
});
private interface ApiExceptionBuilder {
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index cd213d9..d2b93c4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -222,6 +222,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
return new ExpireDelegationTokenRequest(struct, apiVersion);
case DESCRIBE_DELEGATION_TOKEN:
return new DescribeDelegationTokenRequest(struct, apiVersion);
+ case DELETE_GROUPS:
+ return new DeleteGroupsRequest(struct, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index fb01298..608f6c3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -154,6 +154,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
return new ExpireDelegationTokenResponse(struct);
case DESCRIBE_DELEGATION_TOKEN:
return new DescribeDelegationTokenResponse(struct);
+ case DELETE_GROUPS:
+ return new DeleteGroupsResponse(struct);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java
new file mode 100644
index 0000000..bda6b33
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
+public class DeleteGroupsRequest extends AbstractRequest {
+ private static final String GROUPS_KEY_NAME = "groups";
+
+ /* DeleteGroups api */
+ private static final Schema DELETE_GROUPS_REQUEST_V0 = new Schema(
+ new Field(GROUPS_KEY_NAME, new ArrayOf(STRING), "An array of groups to be deleted."));
+
+ public static Schema[] schemaVersions() {
+ return new Schema[]{DELETE_GROUPS_REQUEST_V0};
+ }
+
+ private final Set<String> groups;
+
+ public static class Builder extends AbstractRequest.Builder<DeleteGroupsRequest> {
+ private final Set<String> groups;
+
+ public Builder(Set<String> groups) {
+ super(ApiKeys.DELETE_GROUPS);
+ this.groups = groups;
+ }
+
+ @Override
+ public DeleteGroupsRequest build(short version) {
+ return new DeleteGroupsRequest(groups, version);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder bld = new StringBuilder();
+ bld.append("(type=DeleteGroupsRequest").
+ append(", groups=(").append(Utils.join(groups, ", ")).append(")").
+ append(")");
+ return bld.toString();
+ }
+ }
+
+ private DeleteGroupsRequest(Set<String> groups, short version) {
+ super(version);
+ this.groups = groups;
+ }
+
+ public DeleteGroupsRequest(Struct struct, short version) {
+ super(version);
+ Object[] groupsArray = struct.getArray(GROUPS_KEY_NAME);
+ Set<String> groups = new HashSet<>(groupsArray.length);
+ for (Object group : groupsArray)
+ groups.add((String) group);
+
+ this.groups = groups;
+ }
+
+ @Override
+ protected Struct toStruct() {
+ Struct struct = new Struct(ApiKeys.DELETE_GROUPS.requestSchema(version()));
+ struct.set(GROUPS_KEY_NAME, groups.toArray());
+ return struct;
+ }
+
+ @Override
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ Errors error = Errors.forException(e);
+ Map<String, Errors> groupErrors = new HashMap<>(groups.size());
+ for (String group : groups)
+ groupErrors.put(group, error);
+
+ switch (version()) {
+ case 0:
+ return new DeleteGroupsResponse(throttleTimeMs, groupErrors);
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ version(), ApiKeys.DELETE_GROUPS.name, ApiKeys.DELETE_GROUPS.latestVersion()));
+ }
+ }
+
+ public Set<String> groups() {
+ return groups;
+ }
+
+ public static DeleteGroupsRequest parse(ByteBuffer buffer, short version) {
+ return new DeleteGroupsRequest(ApiKeys.DELETE_GROUPS.parseRequest(version, buffer), version);
+ }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsResponse.java
new file mode 100644
index 0000000..d97bb0d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsResponse.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+
+public class DeleteGroupsResponse extends AbstractResponse {
+ private static final String GROUP_ERROR_CODES_KEY_NAME = "group_error_codes";
+
+ private static final Schema GROUP_ERROR_CODE = new Schema(
+ GROUP_ID,
+ ERROR_CODE);
+
+ private static final Schema DELETE_GROUPS_RESPONSE_V0 = new Schema(
+ THROTTLE_TIME_MS,
+ new Field(GROUP_ERROR_CODES_KEY_NAME, new ArrayOf(GROUP_ERROR_CODE), "An array of per group error codes."));
+
+ public static Schema[] schemaVersions() {
+ return new Schema[]{DELETE_GROUPS_RESPONSE_V0};
+ }
+
+
+ /**
+ * Possible error codes:
+ *
+ * COORDINATOR_LOAD_IN_PROGRESS (14)
+ * COORDINATOR_NOT_AVAILABLE(15)
+ * NOT_COORDINATOR (16)
+ * INVALID_GROUP_ID(24)
+ * GROUP_AUTHORIZATION_FAILED(30)
+ * NON_EMPTY_GROUP(68)
+ * GROUP_ID_NOT_FOUND(69)
+ */
+
+ private final Map<String, Errors> errors;
+ private final int throttleTimeMs;
+
+ public DeleteGroupsResponse(Map<String, Errors> errors) {
+ this(DEFAULT_THROTTLE_TIME, errors);
+ }
+
+ public DeleteGroupsResponse(int throttleTimeMs, Map<String, Errors> errors) {
+ this.throttleTimeMs = throttleTimeMs;
+ this.errors = errors;
+ }
+
+ public DeleteGroupsResponse(Struct struct) {
+ this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
+ Object[] groupErrorCodesStructs = struct.getArray(GROUP_ERROR_CODES_KEY_NAME);
+ Map<String, Errors> errors = new HashMap<>();
+ for (Object groupErrorCodeStructObj : groupErrorCodesStructs) {
+ Struct groupErrorCodeStruct = (Struct) groupErrorCodeStructObj;
+ String group = groupErrorCodeStruct.get(GROUP_ID);
+ Errors error = Errors.forCode(groupErrorCodeStruct.get(ERROR_CODE));
+ errors.put(group, error);
+ }
+
+ this.errors = errors;
+ }
+
+ @Override
+ protected Struct toStruct(short version) {
+ Struct struct = new Struct(ApiKeys.DELETE_GROUPS.responseSchema(version));
+ struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
+ List<Struct> groupErrorCodeStructs = new ArrayList<>(errors.size());
+ for (Map.Entry<String, Errors> groupError : errors.entrySet()) {
+ Struct groupErrorCodeStruct = struct.instance(GROUP_ERROR_CODES_KEY_NAME);
+ groupErrorCodeStruct.set(GROUP_ID, groupError.getKey());
+ groupErrorCodeStruct.set(ERROR_CODE, groupError.getValue().code());
+ groupErrorCodeStructs.add(groupErrorCodeStruct);
+ }
+ struct.set(GROUP_ERROR_CODES_KEY_NAME, groupErrorCodeStructs.toArray());
+ return struct;
+ }
+
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
+ public Map<String, Errors> errors() {
+ return errors;
+ }
+
+ public boolean hasError(String group) {
+ return errors.containsKey(group) && errors.get(group) != Errors.NONE;
+ }
+
+ public Errors get(String group) {
+ return errors.get(group);
+ }
+
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(errors);
+ }
+
+ public static DeleteGroupsResponse parse(ByteBuffer buffer, short version) {
+ return new DeleteGroupsResponse(ApiKeys.DELETE_GROUPS.responseSchema(version).read(buffer));
+ }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 2740616..b5420b5 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -112,6 +112,9 @@ public class RequestResponseTest {
checkRequest(createDescribeGroupRequest());
checkErrorResponse(createDescribeGroupRequest(), new UnknownServerException());
checkResponse(createDescribeGroupResponse(), 0);
+ checkRequest(createDeleteGroupsRequest());
+ checkErrorResponse(createDeleteGroupsRequest(), new UnknownServerException());
+ checkResponse(createDeleteGroupsResponse(), 0);
checkRequest(createListOffsetRequest(1));
checkErrorResponse(createListOffsetRequest(1), new UnknownServerException());
checkResponse(createListOffsetResponse(1), 1);
@@ -641,6 +644,16 @@ public class RequestResponseTest {
return new LeaveGroupResponse(Errors.NONE);
}
+ private DeleteGroupsRequest createDeleteGroupsRequest() {
+ return new DeleteGroupsRequest.Builder(Collections.singleton("test-group")).build();
+ }
+
+ private DeleteGroupsResponse createDeleteGroupsResponse() {
+ Map<String, Errors> result = new HashMap<>();
+ result.put("test-group", Errors.NONE);
+ return new DeleteGroupsResponse(result);
+ }
+
@SuppressWarnings("deprecation")
private ListOffsetRequest createListOffsetRequest(int version) {
if (version == 0) {
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index fa6333c..6dd2272 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -32,7 +32,7 @@ object AclCommand extends Logging {
val Newline = scala.util.Properties.lineSeparator
val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] (
Topic -> Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs, All),
- Group -> Set(Read, Describe, All),
+ Group -> Set(Read, Describe, Delete, All),
Cluster -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe, All),
TransactionalId -> Set(Describe, Write, All),
DelegationToken -> Set(Describe, All)
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index c092169..772277f 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -369,6 +369,49 @@ class AdminClient(val time: Time,
(response.error, response.tokens().asScala.toList)
}
+ def deleteConsumerGroups(groups: List[String]): Map[String, Errors] = {
+
+ def coordinatorLookup(group: String): Either[Node, Errors] = {
+ try {
+ Left(findCoordinator(group))
+ } catch {
+ case e: Throwable =>
+ if (e.isInstanceOf[TimeoutException])
+ Right(Errors.COORDINATOR_NOT_AVAILABLE)
+ else
+ Right(Errors.forException(e))
+ }
+ }
+
+ var errors: Map[String, Errors] = Map()
+ var groupsPerCoordinator: Map[Node, List[String]] = Map()
+
+ groups.foreach { group =>
+ coordinatorLookup(group) match {
+ case Right(error) =>
+ errors += group -> error
+ case Left(coordinator) =>
+ groupsPerCoordinator.get(coordinator) match {
+ case Some(gList) =>
+ val gListNew = group :: gList
+ groupsPerCoordinator += coordinator -> gListNew
+ case None =>
+ groupsPerCoordinator += coordinator -> List(group)
+ }
+ }
+ }
+
+ groupsPerCoordinator.foreach { case (coordinator, groups) =>
+ val responseBody = send(coordinator, ApiKeys.DELETE_GROUPS, new DeleteGroupsRequest.Builder(groups.toSet.asJava))
+ val response = responseBody.asInstanceOf[DeleteGroupsResponse]
+ groups.foreach {
+ case group if response.hasError(group) => errors += group -> response.errors.get(group)
+ case group => errors += group -> Errors.NONE
+ }
+ }
+
+ errors
+ }
def close() {
running = false
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index f21b942..2ae03aa 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -429,9 +429,10 @@ object AdminUtils extends Logging with AdminUtilities {
* @param topic Topic of the consumer group information we wish to delete
*/
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
- def deleteAllConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, topic: String) {
+ def deleteAllConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, topic: String): Set[String] = {
val groups = zkUtils.getAllConsumerGroupsForTopic(topic)
groups.foreach(group => deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic))
+ groups
}
def topicExists(zkUtils: ZkUtils, topic: String): Boolean =
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 3aa821c..77c5b4d 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -75,12 +75,8 @@ object ConsumerGroupCommand extends Logging {
consumerGroupService.listGroups().foreach(println(_))
else if (opts.options.has(opts.describeOpt))
consumerGroupService.describeGroup()
- else if (opts.options.has(opts.deleteOpt)) {
- consumerGroupService match {
- case service: ZkConsumerGroupService => service.deleteGroups()
- case _ => throw new IllegalStateException(s"delete is not supported for $consumerGroupService.")
- }
- }
+ else if (opts.options.has(opts.deleteOpt))
+ consumerGroupService.deleteGroups()
else if (opts.options.has(opts.resetOffsetsOpt)) {
val offsetsToReset = consumerGroupService.resetOffsets()
if (opts.options.has(opts.exportOpt)) {
@@ -344,6 +340,8 @@ object ConsumerGroupCommand extends Logging {
def resetOffsets(): Map[TopicPartition, OffsetAndMetadata] = throw new UnsupportedOperationException
def exportOffsetsToReset(assignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): String = throw new UnsupportedOperationException
+
+ def deleteGroups(): Map[String, Errors]
}
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
@@ -362,13 +360,15 @@ object ConsumerGroupCommand extends Logging {
zkUtils.getConsumerGroups().toList
}
- def deleteGroups() {
+ def deleteGroups(): Map[String, Errors] = {
if (opts.options.has(opts.groupOpt) && opts.options.has(opts.topicOpt))
- deleteForTopic()
+ deleteGroupsInfoForTopic()
else if (opts.options.has(opts.groupOpt))
- deleteForGroup()
+ deleteGroupsInfo()
else if (opts.options.has(opts.topicOpt))
- deleteAllForTopic()
+ deleteAllGroupsInfoForTopic()
+
+ Map()
}
def collectGroupOffsets(): (Option[String], Option[Seq[PartitionAssignmentState]]) = {
@@ -476,45 +476,57 @@ object ConsumerGroupCommand extends Logging {
}.toMap
}
- private def deleteForGroup() {
+ private def deleteGroupsInfo(): Map[String, Errors] = {
val groups = opts.options.valuesOf(opts.groupOpt)
- groups.asScala.foreach { group =>
+ groups.asScala.map { group =>
try {
- if (AdminUtils.deleteConsumerGroupInZK(zkUtils, group))
+ if (AdminUtils.deleteConsumerGroupInZK(zkUtils, group)) {
println(s"Deleted all consumer group information for group '$group' in zookeeper.")
- else
+ group -> Errors.NONE
+ }
+ else {
printError(s"Delete for group '$group' failed because its consumers are still active.")
+ group -> Errors.NON_EMPTY_GROUP
+ }
}
catch {
case e: ZkNoNodeException =>
printError(s"Delete for group '$group' failed because group does not exist.", Some(e))
+ group -> Errors.forException(e)
}
- }
+ }.toMap
}
- private def deleteForTopic() {
+ private def deleteGroupsInfoForTopic(): Map[String, Errors] = {
val groups = opts.options.valuesOf(opts.groupOpt)
val topic = opts.options.valueOf(opts.topicOpt)
Topic.validate(topic)
- groups.asScala.foreach { group =>
+ groups.asScala.map { group =>
try {
- if (AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic))
+ if (AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic)) {
println(s"Deleted consumer group information for group '$group' topic '$topic' in zookeeper.")
- else
+ group -> Errors.NONE
+ }
+ else {
printError(s"Delete for group '$group' topic '$topic' failed because its consumers are still active.")
+ group -> Errors.NON_EMPTY_GROUP
+ }
}
catch {
case e: ZkNoNodeException =>
printError(s"Delete for group '$group' topic '$topic' failed because group does not exist.", Some(e))
+ group -> Errors.forException(e)
}
- }
+ }.toMap
}
- private def deleteAllForTopic() {
+ private def deleteAllGroupsInfoForTopic(): Map[String, Errors] = {
val topic = opts.options.valueOf(opts.topicOpt)
Topic.validate(topic)
- AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic)
+ val deletedGroups = AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic)
println(s"Deleted consumer group information for all inactive consumer groups for topic '$topic' in zookeeper.")
+ deletedGroups.map(_ -> Errors.NONE).toMap
+
}
private def getZkConsumer(brokerId: Int): Option[SimpleConsumer] = {
@@ -830,6 +842,27 @@ object ConsumerGroupCommand extends Logging {
rows.foldRight("")(_ + "\n" + _)
}
+ override def deleteGroups(): Map[String, Errors] = {
+ val groupsToDelete = opts.options.valuesOf(opts.groupOpt).asScala.toList
+ val result = adminClient.deleteConsumerGroups(groupsToDelete)
+ val successfullyDeleted = result.filter {
+ case (_, error) => error == Errors.NONE
+ }.keySet
+
+ if (successfullyDeleted.size == result.size)
+ println(s"Deletion of requested consumer groups (${successfullyDeleted.mkString("'", ", ", "'")}) was successful.")
+ else {
+ printError("Deletion of some consumer groups failed:")
+ result.foreach {
+ case (group, error) if error != Errors.NONE => println(s"* Group '$group' could not be deleted due to: ${error.toString}")
+ case _ => // no need to print successful deletions individually
+ }
+ if (successfullyDeleted.nonEmpty)
+ println(s"\nThese consumer groups were deleted successfully: ${successfullyDeleted.mkString("'", ", ", "'")}")
+ }
+
+ result
+ }
}
sealed trait LogOffsetResult
@@ -987,10 +1020,9 @@ object ConsumerGroupCommand extends Logging {
s"The new consumer is used by default if the $bootstrapServerOpt option is provided.")
}
- if (options.has(deleteOpt))
- CommandLineUtils.printUsageAndDie(parser, s"Option $deleteOpt is only valid with $zkConnectOpt. Note that " +
- "there is no need to delete group metadata for the new consumer as the group is deleted when the last " +
- "committed offset for that group expires.")
+ if (options.has(deleteOpt) && options.has(topicOpt))
+ CommandLineUtils.printUsageAndDie(parser, s"When deleting a consumer group the option $topicOpt is only " +
+ s"valid with $zkConnectOpt. The new consumer does not support topic-specific offset deletion from a consumer group.")
}
if (describeOptPresent)
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index e509fc5..f95fb89 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -72,7 +72,9 @@ object ApiVersion {
"0.11.0" -> KAFKA_0_11_0_IV2,
// Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and FetchRequest V6 via KIP-112
"1.0-IV0" -> KAFKA_1_0_IV0,
- "1.0" -> KAFKA_1_0_IV0
+ "1.0" -> KAFKA_1_0_IV0,
+ // Introduced DeleteGroupsRequest V0 via KIP-229
+ "1.1-IV0" -> KAFKA_1_1_IV0
)
private val versionPattern = "\\.".r
@@ -184,3 +186,9 @@ case object KAFKA_1_0_IV0 extends ApiVersion {
val id: Int = 13
}
+case object KAFKA_1_1_IV0 extends ApiVersion {
+ val version: String = "1.1-IV0"
+ val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+ val id: Int = 14
+}
+
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index ee4fc4b..5ae8552 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -338,6 +338,52 @@ class GroupCoordinator(val brokerId: Int,
}
}
+ def handleDeleteGroups(groupIds: Set[String]): Map[String, Errors] = {
+ if (!isActive.get) {
+ groupIds.map(_ -> Errors.COORDINATOR_NOT_AVAILABLE).toMap
+ } else {
+ var groupErrors: Map[String, Errors] = Map()
+ var eligibleGroups: Seq[GroupMetadata] = Seq()
+
+ groupIds.foreach { groupId =>
+ if (!validGroupId(groupId))
+ groupErrors += groupId -> Errors.INVALID_GROUP_ID
+ else if (!isCoordinatorForGroup(groupId))
+ groupErrors += groupId -> Errors.NOT_COORDINATOR
+ else if (isCoordinatorLoadInProgress(groupId))
+ groupErrors += groupId -> Errors.COORDINATOR_LOAD_IN_PROGRESS
+ else {
+ groupManager.getGroup(groupId) match {
+ case None =>
+ groupErrors += groupId ->
+ (if (groupManager.groupNotExists(groupId)) Errors.GROUP_ID_NOT_FOUND else Errors.NOT_COORDINATOR)
+ case Some(group) =>
+ group.inLock {
+ group.currentState match {
+ case Dead =>
+ groupErrors += groupId ->
+ (if (groupManager.groupNotExists(groupId)) Errors.GROUP_ID_NOT_FOUND else Errors.NOT_COORDINATOR)
+ case Empty =>
+ group.transitionTo(Dead)
+ eligibleGroups :+= group
+ case _ =>
+ groupErrors += groupId -> Errors.NON_EMPTY_GROUP
+ }
+ }
+ }
+ }
+ }
+
+ if (eligibleGroups.nonEmpty) {
+ groupManager.cleanupGroupMetadata(None, eligibleGroups, Long.MaxValue)
+ groupErrors ++= eligibleGroups.map(_.groupId -> Errors.NONE).toMap
+ info(s"The following groups were deleted: ${eligibleGroups.map(_.groupId).mkString(", ")}")
+ }
+
+ groupErrors
+ }
+ }
+
def handleHeartbeat(groupId: String,
memberId: String,
generationId: Int,
@@ -522,7 +568,7 @@ class GroupCoordinator(val brokerId: Int,
}
def handleDeletedPartitions(topicPartitions: Seq[TopicPartition]) {
- groupManager.cleanupGroupMetadata(Some(topicPartitions))
+ groupManager.cleanupGroupMetadata(Some(topicPartitions), groupManager.currentGroups, time.milliseconds())
}
private def validateGroup(groupId: String): Option[Errors] = {
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 6599698..3391fc3 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -158,6 +158,13 @@ class GroupMetadataManager(brokerId: Int,
def isLoading(): Boolean = inLock(partitionLock) { loadingPartitions.nonEmpty }
+ // return true iff group is owned and the group doesn't exist
+ def groupNotExists(groupId: String) = inLock(partitionLock) {
+ isGroupLocal(groupId) && getGroup(groupId).forall { group =>
+ group.inLock(group.is(Dead))
+ }
+ }
+
// visible for testing
private[group] def isGroupOpenForProducer(producerId: Long, groupId: String) = openGroupsForProducer.get(producerId) match {
case Some(groups) =>
@@ -706,14 +713,16 @@ class GroupMetadataManager(brokerId: Int,
// visible for testing
private[group] def cleanupGroupMetadata(): Unit = {
- cleanupGroupMetadata(None)
+ cleanupGroupMetadata(None, groupMetadataCache.values, time.milliseconds())
}
- def cleanupGroupMetadata(deletedTopicPartitions: Option[Seq[TopicPartition]]) {
- val startMs = time.milliseconds()
+ def cleanupGroupMetadata(deletedTopicPartitions: Option[Seq[TopicPartition]],
+ groups: Iterable[GroupMetadata],
+ startMs: Long) {
var offsetsRemoved = 0
- groupMetadataCache.foreach { case (groupId, group) =>
+ groups.foreach { group =>
+ val groupId = group.groupId
val (removedOffsets, groupIsDead, generation) = group.inLock {
val removedOffsets = deletedTopicPartitions match {
case Some(topicPartitions) => group.removeOffsets(topicPartitions)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 13f5164..2dd6951 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -141,7 +141,8 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)
case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
- case ApiKeys.DESCRIBE_DELEGATION_TOKEN=> handleDescribeTokensRequest(request)
+ case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
+ case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
}
} catch {
case e: FatalExitError => throw e
@@ -488,7 +489,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val nonExistingTopicResponseData = mutable.ArrayBuffer[(TopicPartition, FetchResponse.PartitionData)]()
val authorizedRequestInfo = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]()
- if (fetchRequest.isFromFollower() && !authorize(request.session, ClusterAction, Resource.ClusterResource))
+ if (fetchRequest.isFromFollower() && !authorize(request.session, ClusterAction, Resource.ClusterResource))
for (topicPartition <- fetchRequest.fetchData.asScala.keys)
unauthorizedTopicResponseData += topicPartition -> new FetchResponse.PartitionData(Errors.CLUSTER_AUTHORIZATION_FAILED,
FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
@@ -1221,6 +1222,21 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
+ def handleDeleteGroupsRequest(request: RequestChannel.Request): Unit = {
+ val deleteGroupsRequest = request.body[DeleteGroupsRequest]
+ var groups = deleteGroupsRequest.groups.asScala.toSet
+
+ val (authorizedGroups, unauthorizedGroups) = groups.partition { group =>
+ authorize(request.session, Delete, new Resource(Group, group))
+ }
+
+ val groupDeletionResult = groupCoordinator.handleDeleteGroups(authorizedGroups) ++
+ unauthorizedGroups.map(_ -> Errors.GROUP_AUTHORIZATION_FAILED)
+
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new DeleteGroupsResponse(requestThrottleMs, groupDeletionResult.asJava))
+ }
+
def handleHeartbeatRequest(request: RequestChannel.Request) {
val heartbeatRequest = request.body[HeartbeatRequest]
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 39c1ea3..248d219 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -78,6 +78,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val groupReadAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
val groupDescribeAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
+ val groupDeleteAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)))
val clusterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)))
val clusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)))
val clusterAlterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Alter)))
@@ -125,6 +126,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.DESCRIBE_GROUPS -> classOf[DescribeGroupsResponse],
ApiKeys.HEARTBEAT -> classOf[HeartbeatResponse],
ApiKeys.LEAVE_GROUP -> classOf[LeaveGroupResponse],
+ ApiKeys.DELETE_GROUPS -> classOf[DeleteGroupsResponse],
ApiKeys.LEADER_AND_ISR -> classOf[requests.LeaderAndIsrResponse],
ApiKeys.STOP_REPLICA -> classOf[requests.StopReplicaResponse],
ApiKeys.CONTROLLED_SHUTDOWN -> classOf[requests.ControlledShutdownResponse],
@@ -162,6 +164,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.DESCRIBE_GROUPS -> ((resp: DescribeGroupsResponse) => resp.groups.get(group).error),
ApiKeys.HEARTBEAT -> ((resp: HeartbeatResponse) => resp.error),
ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.error),
+ ApiKeys.DELETE_GROUPS -> ((resp: DeleteGroupsResponse) => resp.get(group)),
ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => resp.responses.asScala.find(_._1 == tp).get._2),
ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => resp.responses.asScala.find(_._1 == tp).get._2),
ApiKeys.CONTROLLED_SHUTDOWN -> ((resp: requests.ControlledShutdownResponse) => resp.error),
@@ -202,6 +205,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.DESCRIBE_GROUPS -> groupDescribeAcl,
ApiKeys.HEARTBEAT -> groupReadAcl,
ApiKeys.LEAVE_GROUP -> groupReadAcl,
+ ApiKeys.DELETE_GROUPS -> groupDeleteAcl,
ApiKeys.LEADER_AND_ISR -> clusterAcl,
ApiKeys.STOP_REPLICA -> clusterAcl,
ApiKeys.CONTROLLED_SHUTDOWN -> clusterAcl,
@@ -343,6 +347,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
private def leaveGroupRequest = new LeaveGroupRequest.Builder(group, "").build()
+ private def deleteGroupsRequest = new DeleteGroupsRequest.Builder(Set(group).asJava).build()
+
private def leaderAndIsrRequest = {
new requests.LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, brokerId, Int.MaxValue,
Map(tp -> new LeaderAndIsrRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava, false)).asJava,
@@ -468,7 +474,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.DELETE_RECORDS -> deleteRecordsRequest,
ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest,
ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest,
- ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest
+ ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest,
+ ApiKeys.DELETE_GROUPS -> deleteGroupsRequest
)
for ((key, request) <- requestKeyToRequest) {
@@ -495,7 +502,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
def testFetchFollowerRequest() {
val key = ApiKeys.FETCH
val request = createFetchFollowerRequest
-
+
removeAllAcls()
val resources = Set(topicResource.resourceType, Resource.ClusterResource.resourceType)
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false)
@@ -503,7 +510,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val readAcls = topicReadAcl.get(topicResource).get
addAndVerifyAcls(readAcls, topicResource)
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false)
-
+
val clusterAcls = clusterAcl.get(Resource.ClusterResource).get
addAndVerifyAcls(clusterAcls, Resource.ClusterResource)
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true)
@@ -961,6 +968,33 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
@Test
+ def testDeleteGroupApiWithDeleteGroupAcl() {
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)), groupResource)
+ this.consumers.head.assign(List(tp).asJava)
+ this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
+ val result = AdminClient.createSimplePlaintext(brokerList).deleteConsumerGroups(List(group))
+ assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.NONE))
+ }
+
+ @Test
+ def testDeleteGroupApiWithNoDeleteGroupAcl() {
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+ this.consumers.head.assign(List(tp).asJava)
+ this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
+ val result = AdminClient.createSimplePlaintext(brokerList).deleteConsumerGroups(List(group))
+ assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.GROUP_AUTHORIZATION_FAILED))
+ }
+
+ @Test
+ def testDeleteGroupApiWithNoDeleteGroupAcl2() {
+ val result = AdminClient.createSimplePlaintext(brokerList).deleteConsumerGroups(List(group))
+ assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.GROUP_AUTHORIZATION_FAILED))
+ }
+
+ @Test
def testUnauthorizedDeleteTopicsWithoutDescribe() {
val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)
val version = ApiKeys.DELETE_TOPICS.latestVersion
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
new file mode 100644
index 0000000..cc236d5
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.admin
+
+import kafka.admin.ConsumerGroupCommandTest
+import kafka.utils.TestUtils
+import org.apache.kafka.common.protocol.Errors
+import org.junit.Assert._
+import org.junit.Test
+
+class DeleteConsumerGroupTest extends ConsumerGroupCommandTest {
+
+ @Test(expected = classOf[joptsimple.OptionException])
+ def testDeleteWithTopicOption() {
+ TestUtils.createOffsetsTopic(zkClient, servers)
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group, "--topic")
+ getConsumerGroupService(cgcArgs)
+ fail("Expected an error due to presence of mutually exclusive options")
+ }
+
+ @Test
+ def testDeleteCmdNonExistingGroup() {
+ TestUtils.createOffsetsTopic(zkClient, servers)
+ val missingGroup = "missing.group"
+
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", missingGroup)
+ val service = getConsumerGroupService(cgcArgs)
+
+ val output = TestUtils.grabConsoleOutput(service.deleteGroups())
+ assertTrue(s"The expected error (${Errors.GROUP_ID_NOT_FOUND}) was not detected while deleting consumer group",
+ output.contains(s"Group '$missingGroup' could not be deleted due to: ${Errors.GROUP_ID_NOT_FOUND.toString}"))
+ }
+
+ @Test
+ def testDeleteNonExistingGroup() {
+ TestUtils.createOffsetsTopic(zkClient, servers)
+ val missingGroup = "missing.group"
+
+ // note the group to be deleted is a different (non-existing) group
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", missingGroup)
+ val service = getConsumerGroupService(cgcArgs)
+
+ val result = service.deleteGroups()
+ assertTrue(s"The expected error (${Errors.GROUP_ID_NOT_FOUND}) was not detected while deleting consumer group",
+ result.size == 1 && result.keySet.contains(missingGroup) && result.get(missingGroup).contains(Errors.GROUP_ID_NOT_FOUND))
+ }
+
+ @Test
+ def testDeleteCmdInvalidGroupId() {
+ TestUtils.createOffsetsTopic(zkClient, servers)
+ val invalidGroupId = ""
+
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", invalidGroupId)
+ val service = getConsumerGroupService(cgcArgs)
+
+ val output = TestUtils.grabConsoleOutput(service.deleteGroups())
+ assertTrue(s"The expected error (${Errors.INVALID_GROUP_ID}) was not detected while deleting consumer group",
+ output.contains(s"Group '$invalidGroupId' could not be deleted due to: ${Errors.INVALID_GROUP_ID.toString}"))
+ }
+
+ @Test
+ def testDeleteInvalidGroupId() {
+ TestUtils.createOffsetsTopic(zkClient, servers)
+ val invalidGroupId = ""
+
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", invalidGroupId)
+ val service = getConsumerGroupService(cgcArgs)
+
+ val result = service.deleteGroups()
+ assertTrue(s"The expected error (${Errors.INVALID_GROUP_ID}) was not detected while deleting consumer group",
+ result.size == 1 && result.keySet.contains(invalidGroupId) && result.get(invalidGroupId).contains(Errors.INVALID_GROUP_ID))
+ }
+
+ @Test
+ def testDeleteCmdNonEmptyGroup() {
+ TestUtils.createOffsetsTopic(zkClient, servers)
+
+ // run one consumer in the group
+ addConsumerGroupExecutor(numConsumers = 1)
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
+ val service = getConsumerGroupService(cgcArgs)
+
+ TestUtils.waitUntilTrue(() => {
+ service.listGroups().contains(group)
+ }, "The group did not initialize as expected.")
+
+ val output = TestUtils.grabConsoleOutput(service.deleteGroups())
+ assertTrue(s"The expected error (${Errors.NON_EMPTY_GROUP}) was not detected while deleting consumer group",
+ output.contains(s"Group '$group' could not be deleted due to: ${Errors.NON_EMPTY_GROUP}"))
+ }
+
+ @Test
+ def testDeleteNonEmptyGroup() {
+ TestUtils.createOffsetsTopic(zkClient, servers)
+
+ // run one consumer in the group
+ addConsumerGroupExecutor(numConsumers = 1)
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
+ val service = getConsumerGroupService(cgcArgs)
+
+ TestUtils.waitUntilTrue(() => {
+ service.listGroups().contains(group)
+ }, "The group did not initialize as expected.")
+
+ val result = service.deleteGroups()
+ assertTrue(s"The expected error (${Errors.NON_EMPTY_GROUP}) was not detected while deleting consumer group",
+ result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.NON_EMPTY_GROUP))
+ }
+
+ @Test
+ def testDeleteCmdEmptyGroup() {
+ TestUtils.createOffsetsTopic(zkClient, servers)
+
+ // run one consumer in the group
+ val executor = addConsumerGroupExecutor(numConsumers = 1)
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
+ val service = getConsumerGroupService(cgcArgs)
+
+ TestUtils.waitUntilTrue(() => {
+ service.listGroups().contains(group)
+ }, "The group did not initialize as expected.")
+
+ executor.shutdown()
+
+ TestUtils.waitUntilTrue(() => {
+ service.collectGroupState().state == "Empty"
+ }, "The group did become empty as expected.")
+
+ val output = TestUtils.grabConsoleOutput(service.deleteGroups())
+ assertTrue(s"The consumer group could not be deleted as expected",
+ output.contains(s"Deletion of requested consumer groups ('$group') was successful."))
+ }
+
+ @Test
+ def testDeleteEmptyGroup() {
+ TestUtils.createOffsetsTopic(zkClient, servers)
+
+ // run one consumer in the group
+ val executor = addConsumerGroupExecutor(numConsumers = 1)
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
+ val service = getConsumerGroupService(cgcArgs)
+
+ TestUtils.waitUntilTrue(() => {
+ service.listGroups().contains(group)
+ }, "The group did not initialize as expected.")
+
+ executor.shutdown()
+
+ TestUtils.waitUntilTrue(() => {
+ service.collectGroupState().state == "Empty"
+ }, "The group did become empty as expected.")
+
+ val result = service.deleteGroups()
+ assertTrue(s"The consumer group could not be deleted as expected",
+ result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.NONE))
+ }
+
+ @Test
+ def testDeleteCmdWithMixOfSuccessAndError() {
+ TestUtils.createOffsetsTopic(zkClient, servers)
+ val missingGroup = "missing.group"
+
+ // run one consumer in the group
+ val executor = addConsumerGroupExecutor(numConsumers = 1)
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
+ val service = getConsumerGroupService(cgcArgs)
+
+ TestUtils.waitUntilTrue(() => {
+ service.listGroups().contains(group)
+ }, "The group did not initialize as expected.")
+
+ executor.shutdown()
+
+ TestUtils.waitUntilTrue(() => {
+ service.collectGroupState().state == "Empty"
+ }, "The group did become empty as expected.")
+
+ val service2 = getConsumerGroupService(cgcArgs ++ Array("--group", missingGroup))
+ val output = TestUtils.grabConsoleOutput(service2.deleteGroups())
+ assertTrue(s"The consumer group deletion did not work as expected",
+ output.contains(s"Group '$missingGroup' could not be deleted due to: ${Errors.GROUP_ID_NOT_FOUND}") &&
+ output.contains(s"These consumer groups were deleted successfully: '$group'"))
+ }
+
+ @Test
+ def testDeleteWithMixOfSuccessAndError() {
+ TestUtils.createOffsetsTopic(zkClient, servers)
+ val missingGroup = "missing.group"
+
+ // run one consumer in the group
+ val executor = addConsumerGroupExecutor(numConsumers = 1)
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
+ val service = getConsumerGroupService(cgcArgs)
+
+ TestUtils.waitUntilTrue(() => {
+ service.listGroups().contains(group)
+ }, "The group did not initialize as expected.")
+
+ executor.shutdown()
+
+ TestUtils.waitUntilTrue(() => {
+ service.collectGroupState().state == "Empty"
+ }, "The group did become empty as expected.")
+
+ val service2 = getConsumerGroupService(cgcArgs ++ Array("--group", missingGroup))
+ val result = service2.deleteGroups()
+ assertTrue(s"The consumer group deletion did not work as expected",
+ result.size == 2 &&
+ result.keySet.contains(group) && result.get(group).contains(Errors.NONE) &&
+ result.keySet.contains(missingGroup) && result.get(missingGroup).contains(Errors.GROUP_ID_NOT_FOUND))
+ }
+
+ @Test
+ def testDeleteCmdWithShortInitialization() {
+ // run one consumer in the group
+ val executor = addConsumerGroupExecutor(numConsumers = 1)
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
+ val service = getConsumerGroupService(cgcArgs)
+
+ val output = TestUtils.grabConsoleOutput(service.deleteGroups())
+ assertTrue(s"The consumer group deletion did not work as expected",
+ output.contains(s"Group '$group' could not be deleted due to: ${Errors.COORDINATOR_NOT_AVAILABLE}"))
+ }
+
+ @Test
+ def testDeleteWithShortInitialization() {
+ // run one consumer in the group
+ val executor = addConsumerGroupExecutor(numConsumers = 1)
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
+ val service = getConsumerGroupService(cgcArgs)
+
+ val result = service.deleteGroups()
+ assertTrue(s"The consumer group deletion did not work as expected",
+ result.size == 1 &&
+ result.keySet.contains(group) && result.get(group).contains(Errors.COORDINATOR_NOT_AVAILABLE))
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index a62e7aa..2c9e81d 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -30,6 +30,7 @@ import org.easymock.{Capture, EasyMock, IAnswer}
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
+import kafka.cluster.Partition
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.internals.Topic
import org.junit.Assert._
@@ -1271,6 +1272,93 @@ class GroupCoordinatorTest extends JUnitSuite {
}
@Test
+ def testDeleteNonEmptyGroup() {
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+
+ val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet)
+ assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NON_EMPTY_GROUP))
+ }
+
+ @Test
+ def testDeleteGroupWithInvalidGroupId() {
+ val invalidGroupId = ""
+ val result = groupCoordinator.handleDeleteGroups(Set(invalidGroupId).toSet)
+ assert(result.size == 1 && result.contains(invalidGroupId) && result.get(invalidGroupId).contains(Errors.INVALID_GROUP_ID))
+ }
+
+ @Test
+ def testDeleteGroupWithWrongCoordinator() {
+ val result = groupCoordinator.handleDeleteGroups(Set(otherGroupId).toSet)
+ assert(result.size == 1 && result.contains(otherGroupId) && result.get(otherGroupId).contains(Errors.NOT_COORDINATOR))
+ }
+
+ @Test
+ def testDeleteEmptyGroup() {
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+
+ EasyMock.reset(replicaManager)
+ val leaveGroupResult = leaveGroup(groupId, joinGroupResult.memberId)
+ assertEquals(Errors.NONE, leaveGroupResult)
+
+ val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
+ val partition = EasyMock.niceMock(classOf[Partition])
+
+ EasyMock.reset(replicaManager)
+ EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
+ EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
+ EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
+ EasyMock.replay(replicaManager, partition)
+
+ val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet)
+ assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NONE))
+ }
+
+ @Test
+ def testDeleteEmptyGroupWithStoredOffsets() {
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+ val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+ val assignedMemberId = joinGroupResult.memberId
+ val joinGroupError = joinGroupResult.error
+ assertEquals(Errors.NONE, joinGroupError)
+
+ EasyMock.reset(replicaManager)
+ val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
+ val syncGroupError = syncGroupResult._2
+ assertEquals(Errors.NONE, syncGroupError)
+
+ EasyMock.reset(replicaManager)
+ val tp = new TopicPartition("topic", 0)
+ val offset = OffsetAndMetadata(0)
+ val commitOffsetResult = commitOffsets(groupId, assignedMemberId, joinGroupResult.generationId, immutable.Map(tp -> offset))
+ assertEquals(Errors.NONE, commitOffsetResult(tp))
+
+ val describeGroupResult = groupCoordinator.handleDescribeGroup(groupId)
+ assertEquals(Stable.toString, describeGroupResult._2.state)
+ assertEquals(assignedMemberId, describeGroupResult._2.members.head.memberId)
+
+ EasyMock.reset(replicaManager)
+ val leaveGroupResult = leaveGroup(groupId, assignedMemberId)
+ assertEquals(Errors.NONE, leaveGroupResult)
+
+ val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
+ val partition = EasyMock.niceMock(classOf[Partition])
+
+ EasyMock.reset(replicaManager)
+ EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
+ EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
+ EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
+ EasyMock.replay(replicaManager, partition)
+
+ val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet)
+ assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NONE))
+
+ assertEquals(Dead.toString, groupCoordinator.handleDescribeGroup(groupId)._2.state)
+ }
+
+ @Test
def shouldDelayInitialRebalanceByGroupInitialRebalanceDelayOnEmptyGroup() {
val firstJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
timer.advanceClock(GroupInitialRebalanceDelay / 2)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 62ebf29..b358c4e 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -497,6 +497,26 @@ class GroupMetadataManagerTest {
}
}
+ @Test
+ def testGroupNotExists() {
+ // group is not owned
+ assertFalse(groupMetadataManager.groupNotExists(groupId))
+
+ groupMetadataManager.addPartitionOwnership(groupPartitionId)
+ // group is owned but does not exist yet
+ assertTrue(groupMetadataManager.groupNotExists(groupId))
+
+ val group = new GroupMetadata(groupId, initialState = Empty)
+ groupMetadataManager.addGroup(group)
+
+ // group is owned but not Dead
+ assertFalse(groupMetadataManager.groupNotExists(groupId))
+
+ group.transitionTo(Dead)
+ // group is owned and Dead
+ assertTrue(groupMetadataManager.groupNotExists(groupId))
+ }
+
private def appendConsumerOffsetCommit(buffer: ByteBuffer, baseOffset: Long, offsets: Map[TopicPartition, Long]) = {
val builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, baseOffset)
val commitRecords = createCommittedOffsetRecords(offsets)
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index a90fa64..bfbae2b 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -312,12 +312,15 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.EXPIRE_DELEGATION_TOKEN =>
new ExpireDelegationTokenRequest.Builder(ByteBuffer.allocate(10), 1000)
- case ApiKeys.DESCRIBE_DELEGATION_TOKEN=>
+ case ApiKeys.DESCRIBE_DELEGATION_TOKEN =>
new DescribeDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test")))
- case ApiKeys.RENEW_DELEGATION_TOKEN=>
+ case ApiKeys.RENEW_DELEGATION_TOKEN =>
new RenewDelegationTokenRequest.Builder(ByteBuffer.allocate(10), 1000)
+ case ApiKeys.DELETE_GROUPS =>
+ new DeleteGroupsRequest.Builder(Collections.singleton("test-group"))
+
case _ =>
throw new IllegalArgumentException("Unsupported API key " + apiKey)
}
@@ -416,6 +419,7 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.DESCRIBE_DELEGATION_TOKEN=> new DescribeDelegationTokenResponse(response).throttleTimeMs
case ApiKeys.EXPIRE_DELEGATION_TOKEN => new ExpireDelegationTokenResponse(response).throttleTimeMs
case ApiKeys.RENEW_DELEGATION_TOKEN => new RenewDelegationTokenResponse(response).throttleTimeMs
+ case ApiKeys.DELETE_GROUPS => new DeleteGroupsResponse(response).throttleTimeMs
case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId")
}
}
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.