You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2019/05/25 00:20:39 UTC
[kafka] branch trunk updated: KAFKA-8341. Retry Consumer group
operation for NOT_COORDINATOR error (#6723)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 46a02f3 KAFKA-8341. Retry Consumer group operation for NOT_COORDINATOR error (#6723)
46a02f3 is described below
commit 46a02f3231cd6d340c622636159b9f59b4b3cb6e
Author: soondenana <50...@users.noreply.github.com>
AuthorDate: Fri May 24 17:20:22 2019 -0700
KAFKA-8341. Retry Consumer group operation for NOT_COORDINATOR error (#6723)
An API call for consumer groups must send a FindCoordinatorRequest to find the consumer group coordinator, and then send a follow-up request to that node. But the coordinator might move after the FindCoordinatorRequest but before the follow-up request is sent. In that case we currently fail.
This change fixes that by detecting this error and then retrying. This fixes listConsumerGroupOffsets, deleteConsumerGroups, and describeConsumerGroups.
Reviewers: Colin P. McCabe <cm...@apache.org>, Boyang Chen <bc...@outlook.com>
---
.../kafka/clients/admin/KafkaAdminClient.java | 478 ++++++++++++---------
.../kafka/clients/admin/KafkaAdminClientTest.java | 124 +++++-
2 files changed, 383 insertions(+), 219 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index f0e6635..e612593 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -166,6 +166,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.kafka.common.requests.MetadataRequest.convertToMetadataRequestTopic;
@@ -2510,21 +2511,93 @@ public class KafkaAdminClient extends AdminClient {
return new DescribeDelegationTokenResult(tokensFuture);
}
+ /**
+ * Context class to encapsulate parameters of a call to find and use a consumer group coordinator.
+ * Some of the parameters are provided at construction and are immutable whereas others are provided
+ * as "Call" are completed and values are available, like node id of the coordinator.
+ *
+ * @param <T> The type of return value of the KafkaFuture
+ * @param <O> The type of configuration option. Different for different consumer group commands.
+ */
+ private final static class ConsumerGroupOperationContext<T, O extends AbstractOptions<O>> {
+ final private String groupId;
+ final private O options;
+ final private long deadline;
+ final private KafkaFutureImpl<T> future;
+ private Optional<Node> node;
+
+ public ConsumerGroupOperationContext(String groupId,
+ O options,
+ long deadline,
+ KafkaFutureImpl<T> future) {
+ this.groupId = groupId;
+ this.options = options;
+ this.deadline = deadline;
+ this.future = future;
+ this.node = Optional.empty();
+ }
+
+ public String getGroupId() {
+ return groupId;
+ }
+
+ public O getOptions() {
+ return options;
+ }
+
+ public long getDeadline() {
+ return deadline;
+ }
+
+ public KafkaFutureImpl<T> getFuture() {
+ return future;
+ }
+
+ public Optional<Node> getNode() {
+ return node;
+ }
+
+ public void setNode(Node node) {
+ this.node = Optional.ofNullable(node);
+ }
+
+ public boolean hasCoordinatorMoved(AbstractResponse response) {
+ return response.errorCounts().keySet()
+ .stream()
+ .anyMatch(error -> error == Errors.NOT_COORDINATOR);
+ }
+ }
+
+ private void rescheduleTask(ConsumerGroupOperationContext<?, ?> context, Supplier<Call> nextCall) {
+ log.info("Node {} is no longer the Coordinator. Retrying with new coordinator.",
+ context.getNode().orElse(null));
+ // Requeue the task so that we can try with new coordinator
+ context.setNode(null);
+ Call findCoordinatorCall = getFindCoordinatorCall(context, nextCall);
+ runnable.call(findCoordinatorCall, time.milliseconds());
+ }
+
+ private static <T> Map<String, KafkaFutureImpl<T>> createFutures(Collection<String> groupIds) {
+ return new HashSet<>(groupIds).stream().collect(
+ Collectors.toMap(groupId -> groupId,
+ groupId -> {
+ if (groupIdIsUnrepresentable(groupId)) {
+ KafkaFutureImpl<T> future = new KafkaFutureImpl<>();
+ future.completeExceptionally(new InvalidGroupIdException("The given group id '" +
+ groupId + "' cannot be represented in a request."));
+ return future;
+ } else {
+ return new KafkaFutureImpl<>();
+ }
+ }
+ ));
+ }
+
@Override
public DescribeConsumerGroupsResult describeConsumerGroups(final Collection<String> groupIds,
final DescribeConsumerGroupsOptions options) {
- final Map<String, KafkaFutureImpl<ConsumerGroupDescription>> futures = new HashMap<>(groupIds.size());
- for (String groupId: groupIds) {
- if (groupIdIsUnrepresentable(groupId)) {
- KafkaFutureImpl<ConsumerGroupDescription> future = new KafkaFutureImpl<>();
- future.completeExceptionally(new InvalidGroupIdException("The given group id '" +
- groupId + "' cannot be represented in a request."));
- futures.put(groupId, future);
- } else if (!futures.containsKey(groupId)) {
- futures.put(groupId, new KafkaFutureImpl<>());
- }
- }
+ final Map<String, KafkaFutureImpl<ConsumerGroupDescription>> futures = createFutures(groupIds);
// TODO: KAFKA-6788, we should consider grouping the request per coordinator and send one request with a list of
// all consumer groups this coordinator host
@@ -2537,97 +2610,134 @@ public class KafkaAdminClient extends AdminClient {
final long startFindCoordinatorMs = time.milliseconds();
final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs());
+ ConsumerGroupOperationContext<ConsumerGroupDescription, DescribeConsumerGroupsOptions> context =
+ new ConsumerGroupOperationContext<>(groupId, options, deadline, futures.get(groupId));
+ Call findCoordinatorCall = getFindCoordinatorCall(context,
+ () -> KafkaAdminClient.this.getDescribeConsumerGroupsCall(context));
+ runnable.call(findCoordinatorCall, startFindCoordinatorMs);
+ }
- runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) {
- @Override
- FindCoordinatorRequest.Builder createRequest(int timeoutMs) {
- return new FindCoordinatorRequest.Builder(
- new FindCoordinatorRequestData()
+ return new DescribeConsumerGroupsResult(new HashMap<>(futures));
+ }
+
+ /**
+ * Returns a {@code Call} object to fetch the coordinator for a consumer group id. Takes another Call
+ * parameter to schedule action that need to be taken using the coordinator. The param is a Supplier
+ * so that it can be lazily created, so that it can use the results of find coordinator call in its
+ * construction.
+ *
+ * @param <T> The type of return value of the KafkaFuture, like ConsumerGroupDescription, Void etc.
+ * @param <O> The type of configuration option, like DescribeConsumerGroupsOptions, ListConsumerGroupsOptions etc
+ */
+ private <T, O extends AbstractOptions<O>> Call getFindCoordinatorCall(ConsumerGroupOperationContext<T, O> context,
+ Supplier<Call> nextCall) {
+ return new Call("findCoordinator", context.getDeadline(), new LeastLoadedNodeProvider()) {
+ @Override
+ FindCoordinatorRequest.Builder createRequest(int timeoutMs) {
+ return new FindCoordinatorRequest.Builder(
+ new FindCoordinatorRequestData()
.setKeyType(CoordinatorType.GROUP.id())
- .setKey(groupId));
- }
+ .setKey(context.getGroupId()));
+ }
- @Override
- void handleResponse(AbstractResponse abstractResponse) {
- final FindCoordinatorResponse fcResponse = (FindCoordinatorResponse) abstractResponse;
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
- if (handleGroupRequestError(fcResponse.error(), futures.get(groupId)))
- return;
+ if (handleGroupRequestError(response.error(), context.getFuture()))
+ return;
- final long nowDescribeConsumerGroups = time.milliseconds();
- final int nodeId = fcResponse.node().id();
- runnable.call(new Call("describeConsumerGroups", deadline, new ConstantNodeIdProvider(nodeId)) {
- @Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
- return new DescribeGroupsRequest.Builder(
- new DescribeGroupsRequestData()
- .setGroups(Collections.singletonList(groupId))
- .setIncludeAuthorizedOperations(options.includeAuthorizedOperations()));
- }
+ context.setNode(response.node());
- @Override
- void handleResponse(AbstractResponse abstractResponse) {
- final DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse;
+ runnable.call(nextCall.get(), time.milliseconds());
+ }
- KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId);
- final DescribedGroup describedGroup = response.data()
- .groups()
- .stream()
- .filter(group -> groupId.equals(group.groupId()))
- .findFirst().get();
+ @Override
+ void handleFailure(Throwable throwable) {
+ context.getFuture().completeExceptionally(throwable);
+ }
+ };
+ }
- final Errors groupError = Errors.forCode(describedGroup.errorCode());
+ private Call getDescribeConsumerGroupsCall(
+ ConsumerGroupOperationContext<ConsumerGroupDescription, DescribeConsumerGroupsOptions> context) {
+ return new Call("describeConsumerGroups",
+ context.getDeadline(),
+ new ConstantNodeIdProvider(context.getNode().get().id())) {
+ @Override
+ AbstractRequest.Builder createRequest(int timeoutMs) {
+ return new DescribeGroupsRequest.Builder(
+ new DescribeGroupsRequestData()
+ .setGroups(Collections.singletonList(context.getGroupId()))
+ .setIncludeAuthorizedOperations(context.getOptions().includeAuthorizedOperations()));
+ }
- if (handleGroupRequestError(groupError, future))
- return;
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ final DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse;
- final String protocolType = describedGroup.protocolType();
- if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) {
- final List<DescribedGroupMember> members = describedGroup.members();
- final List<MemberDescription> memberDescriptions = new ArrayList<>(members.size());
- final Set<AclOperation> authorizedOperations = validAclOperations(describedGroup.authorizedOperations());
- for (DescribedGroupMember groupMember : members) {
- Set<TopicPartition> partitions = Collections.emptySet();
- if (groupMember.memberAssignment().length > 0) {
- final PartitionAssignor.Assignment assignment = ConsumerProtocol.
- deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
- partitions = new HashSet<>(assignment.partitions());
- }
- final MemberDescription memberDescription =
- new MemberDescription(groupMember.memberId(),
- groupMember.clientId(),
- groupMember.clientHost(),
- new MemberAssignment(partitions));
- memberDescriptions.add(memberDescription);
- }
- final ConsumerGroupDescription consumerGroupDescription =
- new ConsumerGroupDescription(groupId, protocolType.isEmpty(),
- memberDescriptions,
- describedGroup.protocolData(),
- ConsumerGroupState.parse(describedGroup.groupState()),
- fcResponse.node(),
- authorizedOperations);
- future.complete(consumerGroupDescription);
- }
- }
+ List<DescribedGroup> describedGroups = response.data().groups();
+ if (describedGroups.isEmpty()) {
+ context.getFuture().completeExceptionally(
+ new InvalidGroupIdException("No consumer group found for GroupId: " + context.getGroupId()));
+ return;
+ }
- @Override
- void handleFailure(Throwable throwable) {
- KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId);
- future.completeExceptionally(throwable);
- }
- }, nowDescribeConsumerGroups);
+ if (describedGroups.size() > 1 ||
+ !describedGroups.get(0).groupId().equals(context.getGroupId())) {
+ String ids = Arrays.toString(describedGroups.stream().map(DescribedGroup::groupId).toArray());
+ context.getFuture().completeExceptionally(new InvalidGroupIdException(
+ "DescribeConsumerGroup request for GroupId: " + context.getGroupId() + " returned " + ids));
+ return;
}
- @Override
- void handleFailure(Throwable throwable) {
- KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId);
- future.completeExceptionally(throwable);
+ final DescribedGroup describedGroup = describedGroups.get(0);
+
+ // If coordinator changed since we fetched it, retry
+ if (context.hasCoordinatorMoved(response)) {
+ rescheduleTask(context, () -> getDescribeConsumerGroupsCall(context));
+ return;
}
- }, startFindCoordinatorMs);
- }
- return new DescribeConsumerGroupsResult(new HashMap<>(futures));
+ final Errors groupError = Errors.forCode(describedGroup.errorCode());
+ if (handleGroupRequestError(groupError, context.getFuture()))
+ return;
+
+ final String protocolType = describedGroup.protocolType();
+ if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) {
+ final List<DescribedGroupMember> members = describedGroup.members();
+ final List<MemberDescription> memberDescriptions = new ArrayList<>(members.size());
+ final Set<AclOperation> authorizedOperations = validAclOperations(describedGroup.authorizedOperations());
+ for (DescribedGroupMember groupMember : members) {
+ Set<TopicPartition> partitions = Collections.emptySet();
+ if (groupMember.memberAssignment().length > 0) {
+ final PartitionAssignor.Assignment assignment = ConsumerProtocol.
+ deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
+ partitions = new HashSet<>(assignment.partitions());
+ }
+ final MemberDescription memberDescription =
+ new MemberDescription(groupMember.memberId(),
+ groupMember.clientId(),
+ groupMember.clientHost(),
+ new MemberAssignment(partitions));
+ memberDescriptions.add(memberDescription);
+ }
+ final ConsumerGroupDescription consumerGroupDescription =
+ new ConsumerGroupDescription(context.getGroupId(), protocolType.isEmpty(),
+ memberDescriptions,
+ describedGroup.protocolData(),
+ ConsumerGroupState.parse(describedGroup.groupState()),
+ context.getNode().get(),
+ authorizedOperations);
+ context.getFuture().complete(consumerGroupDescription);
+ }
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ context.getFuture().completeExceptionally(throwable);
+ }
+ };
}
private Set<AclOperation> validAclOperations(final int authorizedOperations) {
@@ -2776,162 +2886,125 @@ public class KafkaAdminClient extends AdminClient {
}
@Override
- public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId, final ListConsumerGroupOffsetsOptions options) {
+ public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId,
+ final ListConsumerGroupOffsetsOptions options) {
final KafkaFutureImpl<Map<TopicPartition, OffsetAndMetadata>> groupOffsetListingFuture = new KafkaFutureImpl<>();
-
final long startFindCoordinatorMs = time.milliseconds();
final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs());
- runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) {
+ ConsumerGroupOperationContext<Map<TopicPartition, OffsetAndMetadata>, ListConsumerGroupOffsetsOptions> context =
+ new ConsumerGroupOperationContext<>(groupId, options, deadline, groupOffsetListingFuture);
+
+ Call findCoordinatorCall = getFindCoordinatorCall(context,
+ () -> KafkaAdminClient.this.getListConsumerGroupOffsetsCall(context));
+ runnable.call(findCoordinatorCall, startFindCoordinatorMs);
+
+ return new ListConsumerGroupOffsetsResult(groupOffsetListingFuture);
+ }
+
+ private Call getListConsumerGroupOffsetsCall(ConsumerGroupOperationContext<Map<TopicPartition, OffsetAndMetadata>,
+ ListConsumerGroupOffsetsOptions> context) {
+ return new Call("listConsumerGroupOffsets", context.getDeadline(),
+ new ConstantNodeIdProvider(context.getNode().get().id())) {
@Override
- FindCoordinatorRequest.Builder createRequest(int timeoutMs) {
- return new FindCoordinatorRequest.Builder(
- new FindCoordinatorRequestData()
- .setKeyType(CoordinatorType.GROUP.id())
- .setKey(groupId));
+ AbstractRequest.Builder createRequest(int timeoutMs) {
+ return new OffsetFetchRequest.Builder(context.getGroupId(), context.getOptions().topicPartitions());
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
- final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
+ final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse;
+ final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>();
- if (handleGroupRequestError(response.error(), groupOffsetListingFuture))
+ // If coordinator changed since we fetched it, retry
+ if (context.hasCoordinatorMoved(response)) {
+ rescheduleTask(context, () -> getListConsumerGroupOffsetsCall(context));
return;
+ }
- final long nowListConsumerGroupOffsets = time.milliseconds();
-
- final int nodeId = response.node().id();
-
- runnable.call(new Call("listConsumerGroupOffsets", deadline, new ConstantNodeIdProvider(nodeId)) {
- @Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
- return new OffsetFetchRequest.Builder(groupId, options.topicPartitions());
- }
-
- @Override
- void handleResponse(AbstractResponse abstractResponse) {
- final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse;
- final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>();
-
- if (handleGroupRequestError(response.error(), groupOffsetListingFuture))
- return;
-
- for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry :
- response.responseData().entrySet()) {
- final TopicPartition topicPartition = entry.getKey();
- OffsetFetchResponse.PartitionData partitionData = entry.getValue();
- final Errors error = partitionData.error;
-
- if (error == Errors.NONE) {
- final Long offset = partitionData.offset;
- final String metadata = partitionData.metadata;
- final Optional<Integer> leaderEpoch = partitionData.leaderEpoch;
- groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
- } else {
- log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
- }
- }
- groupOffsetListingFuture.complete(groupOffsetsListing);
- }
+ if (handleGroupRequestError(response.error(), context.getFuture()))
+ return;
- @Override
- void handleFailure(Throwable throwable) {
- groupOffsetListingFuture.completeExceptionally(throwable);
+ for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry :
+ response.responseData().entrySet()) {
+ final TopicPartition topicPartition = entry.getKey();
+ OffsetFetchResponse.PartitionData partitionData = entry.getValue();
+ final Errors error = partitionData.error;
+
+ if (error == Errors.NONE) {
+ final Long offset = partitionData.offset;
+ final String metadata = partitionData.metadata;
+ final Optional<Integer> leaderEpoch = partitionData.leaderEpoch;
+ groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
+ } else {
+ log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
}
- }, nowListConsumerGroupOffsets);
+ }
+ context.getFuture().complete(groupOffsetsListing);
}
@Override
void handleFailure(Throwable throwable) {
- groupOffsetListingFuture.completeExceptionally(throwable);
+ context.getFuture().completeExceptionally(throwable);
}
- }, startFindCoordinatorMs);
-
- return new ListConsumerGroupOffsetsResult(groupOffsetListingFuture);
+ };
}
@Override
public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options) {
- final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>(groupIds.size());
- for (String groupId: groupIds) {
- if (groupIdIsUnrepresentable(groupId)) {
- KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
- future.completeExceptionally(new ApiException("The given group id '" +
- groupId + "' cannot be represented in a request."));
- futures.put(groupId, future);
- } else if (!futures.containsKey(groupId)) {
- futures.put(groupId, new KafkaFutureImpl<>());
- }
- }
+ final Map<String, KafkaFutureImpl<Void>> futures = createFutures(groupIds);
// TODO: KAFKA-6788, we should consider grouping the request per coordinator and send one request with a list of
// all consumer groups this coordinator host
for (final String groupId : groupIds) {
// skip sending request for those futures that already failed.
- if (futures.get(groupId).isCompletedExceptionally())
+ final KafkaFutureImpl<Void> future = futures.get(groupId);
+ if (future.isCompletedExceptionally())
continue;
final long startFindCoordinatorMs = time.milliseconds();
final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs());
+ ConsumerGroupOperationContext<Void, DeleteConsumerGroupsOptions> context =
+ new ConsumerGroupOperationContext<>(groupId, options, deadline, future);
+ Call findCoordinatorCall = getFindCoordinatorCall(context,
+ () -> KafkaAdminClient.this.getDeleteConsumerGroupsCall(context));
+ runnable.call(findCoordinatorCall, startFindCoordinatorMs);
+ }
- runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) {
- @Override
- FindCoordinatorRequest.Builder createRequest(int timeoutMs) {
- return new FindCoordinatorRequest.Builder(
- new FindCoordinatorRequestData()
- .setKeyType(CoordinatorType.GROUP.id())
- .setKey(groupId));
- }
-
- @Override
- void handleResponse(AbstractResponse abstractResponse) {
- final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
-
- if (handleGroupRequestError(response.error(), futures.get(groupId)))
- return;
-
- final long nowDeleteConsumerGroups = time.milliseconds();
-
- final int nodeId = response.node().id();
-
- runnable.call(new Call("deleteConsumerGroups", deadline, new ConstantNodeIdProvider(nodeId)) {
-
- @Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
- return new DeleteGroupsRequest.Builder(Collections.singleton(groupId));
- }
-
- @Override
- void handleResponse(AbstractResponse abstractResponse) {
- final DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse;
+ return new DeleteConsumerGroupsResult(new HashMap<>(futures));
+ }
- KafkaFutureImpl<Void> future = futures.get(groupId);
- final Errors groupError = response.get(groupId);
+ private Call getDeleteConsumerGroupsCall(ConsumerGroupOperationContext<Void, DeleteConsumerGroupsOptions> context) {
+ return new Call("deleteConsumerGroups", context.getDeadline(), new ConstantNodeIdProvider(context.getNode().get().id())) {
- if (handleGroupRequestError(groupError, future))
- return;
+ @Override
+ AbstractRequest.Builder createRequest(int timeoutMs) {
+ return new DeleteGroupsRequest.Builder(Collections.singleton(context.getGroupId()));
+ }
- future.complete(null);
- }
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ final DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse;
- @Override
- void handleFailure(Throwable throwable) {
- KafkaFutureImpl<Void> future = futures.get(groupId);
- future.completeExceptionally(throwable);
- }
- }, nowDeleteConsumerGroups);
+ // If coordinator changed since we fetched it, retry
+ if (context.hasCoordinatorMoved(response)) {
+ rescheduleTask(context, () -> getDeleteConsumerGroupsCall(context));
+ return;
}
- @Override
- void handleFailure(Throwable throwable) {
- KafkaFutureImpl<Void> future = futures.get(groupId);
- future.completeExceptionally(throwable);
- }
- }, startFindCoordinatorMs);
- }
+ final Errors groupError = response.get(context.getGroupId());
+ if (handleGroupRequestError(groupError, context.getFuture()))
+ return;
- return new DeleteConsumerGroupsResult(new HashMap<>(futures));
+ context.getFuture().complete(null);
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ context.getFuture().completeExceptionally(throwable);
+ }
+ };
}
@Override
@@ -2968,5 +3041,4 @@ public class KafkaAdminClient extends AdminClient {
}, now);
return new ElectPreferredLeadersResult(electionFuture, partitionSet);
}
-
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 6aaa75b..567d578 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -1078,10 +1078,10 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
//Retriable FindCoordinatorResponse errors should be retried
- env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));
- env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode()));
+ env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));
+ env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode()));
- env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, env.cluster().controller()));
+ env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
DescribeGroupsResponseData data = new DescribeGroupsResponseData();
@@ -1107,6 +1107,23 @@ public class KafkaAdminClientTest {
Collections.emptySet()));
env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
+ /*
+ * We need to return two responses here, one with NOT_COORDINATOR error when calling describe consumer group
+ * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a
+ * FindCoordinatorResponse.
+ */
+ data = new DescribeGroupsResponseData();
+ data.groups().add(DescribeGroupsResponse.groupMetadata(
+ "group-0",
+ Errors.NOT_COORDINATOR,
+ "",
+ "",
+ "",
+ Collections.emptyList(),
+ Collections.emptySet()));
+ env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
+ env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
data = new DescribeGroupsResponseData();
TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
@@ -1133,26 +1150,83 @@ public class KafkaAdminClientTest {
),
Collections.emptySet()));
- data.groups().add(DescribeGroupsResponse.groupMetadata(
+ env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
+
+ final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(singletonList("group-0"));
+ final ConsumerGroupDescription groupDescription = result.describedGroups().get("group-0").get();
+
+ assertEquals(1, result.describedGroups().size());
+ assertEquals("group-0", groupDescription.groupId());
+ assertEquals(2, groupDescription.members().size());
+ }
+ }
+
+ @Test
+ public void testDescribeMultipleConsumerGroups() throws Exception {
+ final HashMap<Integer, Node> nodes = new HashMap<>();
+ nodes.put(0, new Node(0, "localhost", 8121));
+
+ final Cluster cluster =
+ new Cluster(
+ "mockClusterId",
+ nodes.values(),
+ Collections.<PartitionInfo>emptyList(),
+ Collections.<String>emptySet(),
+ Collections.<String>emptySet(), nodes.get(0));
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
+ TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
+ TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
+ TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
+
+ final List<TopicPartition> topicPartitions = new ArrayList<>();
+ topicPartitions.add(0, myTopicPartition0);
+ topicPartitions.add(1, myTopicPartition1);
+ topicPartitions.add(2, myTopicPartition2);
+
+ final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(topicPartitions));
+ byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()];
+ memberAssignment.get(memberAssignmentBytes);
+
+ DescribeGroupsResponseData group0Data = new DescribeGroupsResponseData();
+ group0Data.groups().add(DescribeGroupsResponse.groupMetadata(
+ "group-0",
+ Errors.NONE,
+ "",
+ ConsumerProtocol.PROTOCOL_TYPE,
+ "",
+ asList(
+ DescribeGroupsResponse.groupMember("0", "clientId0", "clientHost", memberAssignmentBytes, null),
+ DescribeGroupsResponse.groupMember("1", "clientId1", "clientHost", memberAssignmentBytes, null)
+ ),
+ Collections.emptySet()));
+
+ DescribeGroupsResponseData groupConnectData = new DescribeGroupsResponseData();
+ group0Data.groups().add(DescribeGroupsResponse.groupMetadata(
"group-connect-0",
Errors.NONE,
"",
"connect",
"",
asList(
- DescribeGroupsResponse.groupMember("0", "clientId0", "clientHost", memberAssignmentBytes, null),
- DescribeGroupsResponse.groupMember("1", "clientId1", "clientHost", memberAssignmentBytes, null)
+ DescribeGroupsResponse.groupMember("0", "clientId0", "clientHost", memberAssignmentBytes, null),
+ DescribeGroupsResponse.groupMember("1", "clientId1", "clientHost", memberAssignmentBytes, null)
),
Collections.emptySet()));
- env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
+ env.kafkaClient().prepareResponse(new DescribeGroupsResponse(group0Data));
+ env.kafkaClient().prepareResponse(new DescribeGroupsResponse(groupConnectData));
- final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(singletonList("group-0"));
- final ConsumerGroupDescription groupDescription = result.describedGroups().get("group-0").get();
-
- assertEquals(1, result.describedGroups().size());
- assertEquals("group-0", groupDescription.groupId());
- assertEquals(2, groupDescription.members().size());
+ Collection<String> groups = new HashSet<>();
+ groups.add("group-0");
+ groups.add("group-connect-0");
+ final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(groups);
+ assertEquals(2, result.describedGroups().size());
+ assertEquals(groups, result.describedGroups().keySet());
}
}
@@ -1173,14 +1247,22 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
//Retriable FindCoordinatorResponse errors should be retried
- env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));
+ env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));
- env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, env.cluster().controller()));
+ env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
//Retriable errors should be retried
env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap()));
env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap()));
+ /*
+ * We need to return two responses here, one for NOT_COORDINATOR error when calling list consumer group offsets
+ * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a
+ * FindCoordinatorResponse.
+ */
+ env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
+ env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
@@ -1206,7 +1288,7 @@ public class KafkaAdminClientTest {
@Test
public void testDeleteConsumerGroups() throws Exception {
- final HashMap<Integer, Node> nodes = new HashMap<>();
+ final Map<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));
final Cluster cluster =
@@ -1254,6 +1336,16 @@ public class KafkaAdminClientTest {
errorResponse2.put("group-0", Errors.COORDINATOR_LOAD_IN_PROGRESS);
env.kafkaClient().prepareResponse(new DeleteGroupsResponse(errorResponse2));
+ /*
+ * We need to return two responses here, one for NOT_COORDINATOR call when calling delete a consumer group
+ * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a
+ * FindCoordinatorResponse.
+ */
+ Map<String, Errors> coordinatorMoved = new HashMap<>();
+ coordinatorMoved.put("UnitTestError", Errors.NOT_COORDINATOR);
+ env.kafkaClient().prepareResponse(new DeleteGroupsResponse(coordinatorMoved));
+ env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
env.kafkaClient().prepareResponse(new DeleteGroupsResponse(validResponse));
final DeleteConsumerGroupsResult errorResult1 = env.adminClient().deleteConsumerGroups(groupIds);