You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/06/23 22:22:24 UTC
[kafka] branch trunk updated: KAFKA-8179: Part 2,
ConsumerCoordinator Algorithm (#6778)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e5c4ebd KAFKA-8179: Part 2, ConsumerCoordinator Algorithm (#6778)
e5c4ebd is described below
commit e5c4ebdd7433d746a46d6962ee04ff2d782d892b
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Sun Jun 23 15:22:07 2019 -0700
KAFKA-8179: Part 2, ConsumerCoordinator Algorithm (#6778)
1. In ConsumerCoordinator, select the protocol as the common protocol from all configured assignor instances' supported protocols with the highest number.
1.b. In onJoinPrepare: only call onPartitionRevoked with EAGER.
1.a. In onJoinComplete: call onPartitionAssigned with EAGER; call onPartitionRevoked following onPartitionAssigned with COOPERATIVE, and then request re-join if the error indicates so.
1.c. In performAssignment: update the user's assignor returned assignments by excluding all partitions that are still owned by some other members.
2. I've refactored the Subscription / Assignment such that: assigned partitions, error codes, and group instance id are not-final anymore, instead they can be updated. For the last one, it is directly related to the logic of this PR but I felt it is more convienent to go with other fields.
3. Testing: primarily in ConsumerCoordinatorTest, make it parameterized with protocol, and add necessary scenarios for COOPERATIVE protocol.
I intentionally omitted the documentation change since there are some behavioral updates that needs to be finalized in later PRs, and hence I will also only add the docs in later PRs.
Reviewers: Bill Bejeck <bb...@gmail.com>, Boyang Chen <bo...@confluent.io>, Sophie Blee-Goldman <so...@confluent.io>
---
.../consumer/internals/ConsumerCoordinator.java | 181 +++++-
.../consumer/internals/ConsumerProtocol.java | 58 +-
.../consumer/internals/PartitionAssignor.java | 48 +-
.../java/org/apache/kafka/clients/MockClient.java | 2 +-
.../kafka/clients/consumer/KafkaConsumerTest.java | 2 +-
.../internals/ConsumerCoordinatorTest.java | 623 ++++++++++++---------
.../consumer/internals/ConsumerProtocolTest.java | 47 +-
.../consumer/internals/MockPartitionAssignor.java | 11 +
.../test/java/org/apache/kafka/test/TestUtils.java | 5 +
9 files changed, 624 insertions(+), 353 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 477f24b..48d0c95 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment;
+import org.apache.kafka.clients.consumer.internals.PartitionAssignor.RebalanceProtocol;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
@@ -119,6 +120,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
}
+ private final RebalanceProtocol protocol;
+
/**
* Initialize the coordination manager.
*/
@@ -158,6 +161,32 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
if (autoCommitEnabled)
this.nextAutoCommitTimer = time.timer(autoCommitIntervalMs);
+ // select the rebalance protocol such that:
+ // 1. only consider protocols that are supported by all the assignors. If there is no common protocols supported
+ // across all the assignors, throw an exception.
+ // 2. if there are multiple protocols that are commonly supported, select the one with the highest id (i.e. the
+ // id number indicates how advanced the protocol is).
+ // we know there are at least one assignor in the list, no need to double check for NPE
+ if (!assignors.isEmpty()) {
+ List<RebalanceProtocol> supportedProtocols = new ArrayList<>(assignors.get(0).supportedProtocols());
+
+ for (PartitionAssignor assignor : assignors) {
+ supportedProtocols.retainAll(assignor.supportedProtocols());
+ }
+
+ if (supportedProtocols.isEmpty()) {
+ throw new IllegalArgumentException("Specified assignors " +
+ assignors.stream().map(PartitionAssignor::name).collect(Collectors.toSet()) +
+ " do not have commonly supported rebalance protocol");
+ }
+
+ Collections.sort(supportedProtocols);
+
+ protocol = supportedProtocols.get(supportedProtocols.size() - 1);
+ } else {
+ protocol = null;
+ }
+
this.metadata.requestUpdate();
}
@@ -236,6 +265,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
if (assignor == null)
throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
+ Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
+
Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
if (!subscriptions.assignFromSubscribed(assignment.partitions())) {
log.warn("We received an assignment {} that doesn't match our current subscription {}; it is likely " +
@@ -262,14 +293,66 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
// execute the user's callback after rebalance
ConsumerRebalanceListener listener = subscriptions.rebalanceListener();
- log.info("Setting newly assigned partitions: {}", Utils.join(assignedPartitions, ", "));
+
+ switch (protocol) {
+ case EAGER:
+ if (!ownedPartitions.isEmpty()) {
+ log.info("Coordinator has owned partitions {} that are not revoked with {} protocol, " +
+ "it is likely client is woken up before a previous pending rebalance completes its callback", ownedPartitions, protocol);
+ }
+
+ log.info("Setting newly assigned partitions: {}", Utils.join(assignedPartitions, ", "));
+ try {
+ listener.onPartitionsAssigned(assignedPartitions);
+ } catch (WakeupException | InterruptException e) {
+ throw e;
+ } catch (Exception e) {
+ log.error("User provided listener {} failed on partition assignment", listener.getClass().getName(), e);
+ }
+ break;
+
+ case COOPERATIVE:
+ assignAndRevoke(listener, assignedPartitions, ownedPartitions);
+
+ if (assignment.error() == ConsumerProtocol.AssignmentError.NEED_REJOIN) {
+ requestRejoin();
+ }
+
+ break;
+ }
+
+ }
+
+ private void assignAndRevoke(final ConsumerRebalanceListener listener,
+ final Set<TopicPartition> assignedPartitions,
+ final Set<TopicPartition> ownedPartitions) {
+ Set<TopicPartition> addedPartitions = new HashSet<>(assignedPartitions);
+ Set<TopicPartition> revokedPartitions = new HashSet<>(ownedPartitions);
+ addedPartitions.removeAll(ownedPartitions);
+ revokedPartitions.removeAll(assignedPartitions);
+
+ log.info("Updating with newly assigned partitions: {}, compare with already owned partitions: {}, " +
+ "newly added partitions: {}, revoking partitions: {}",
+ Utils.join(assignedPartitions, ", "),
+ Utils.join(ownedPartitions, ", "),
+ Utils.join(addedPartitions, ", "),
+ Utils.join(revokedPartitions, ", "));
+
try {
- listener.onPartitionsAssigned(assignedPartitions);
+ listener.onPartitionsAssigned(addedPartitions);
} catch (WakeupException | InterruptException e) {
throw e;
} catch (Exception e) {
log.error("User provided listener {} failed on partition assignment", listener.getClass().getName(), e);
}
+
+ try {
+ listener.onPartitionsRevoked(revokedPartitions);
+ } catch (WakeupException | InterruptException e) {
+ throw e;
+ } catch (Exception e) {
+ log.error("User provided listener {} failed on partition revocation", listener.getClass().getName(), e);
+ }
}
void maybeUpdateSubscriptionMetadata() {
@@ -302,6 +385,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
invokeCompletedOffsetCommitCallbacks();
if (subscriptions.partitionsAutoAssigned()) {
+ if (protocol == null) {
+ throw new IllegalStateException("User confingure ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG to empty " +
+ "while trying to subscribe for group protocol to auto assign partitions");
+ }
// Always update the heartbeat last poll time so that the heartbeat thread does not leave the
// group proactively due to application inactivity even if (say) the coordinator cannot be found.
pollHeartbeat(timer.currentTimeMs());
@@ -389,11 +476,14 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
Set<String> allSubscribedTopics = new HashSet<>();
Map<String, Subscription> subscriptions = new HashMap<>();
+ // collect all the owned partitions
+ Map<TopicPartition, String> ownedPartitions = new HashMap<>();
for (JoinGroupResponseData.JoinGroupResponseMember memberSubscription : allSubscriptions) {
- Subscription subscription = ConsumerProtocol.buildSubscription(ByteBuffer.wrap(memberSubscription.metadata()),
- Optional.ofNullable(memberSubscription.groupInstanceId()));
+ Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubscription.metadata()));
+ subscription.setGroupInstanceId(Optional.ofNullable(memberSubscription.groupInstanceId()));
subscriptions.put(memberSubscription.memberId(), subscription);
allSubscribedTopics.addAll(subscription.topics());
+ ownedPartitions.putAll(subscription.ownedPartitions().stream().collect(Collectors.toMap(item -> item, item -> memberSubscription.memberId())));
}
// the leader will begin watching for changes to any of the topics the group is interested in,
@@ -404,7 +494,16 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
log.debug("Performing assignment using strategy {} with subscriptions {}", assignor.name(), subscriptions);
- Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions);
+ Map<String, Assignment> assignments = assignor.assign(metadata.fetch(), subscriptions);
+
+ switch (protocol) {
+ case EAGER:
+ break;
+
+ case COOPERATIVE:
+ adjustAssignment(ownedPartitions, assignments);
+ break;
+ }
// user-customized assignor may have created some topics that are not in the subscription list
// and assign their partitions to the members; in this case we would like to update the leader's
@@ -414,7 +513,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
// TODO: this is a hack and not something we want to support long-term unless we push regex into the protocol
// we may need to modify the PartitionAssignor API to better support this case.
Set<String> assignedTopics = new HashSet<>();
- for (Assignment assigned : assignment.values()) {
+ for (Assignment assigned : assignments.values()) {
for (TopicPartition tp : assigned.partitions())
assignedTopics.add(tp.topic());
}
@@ -437,10 +536,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
assignmentSnapshot = metadataSnapshot;
- log.debug("Finished assignment for group: {}", assignment);
+ log.debug("Finished assignment for group: {}", assignments);
Map<String, ByteBuffer> groupAssignment = new HashMap<>();
- for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) {
+ for (Map.Entry<String, Assignment> assignmentEntry : assignments.entrySet()) {
ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
groupAssignment.put(assignmentEntry.getKey(), buffer);
}
@@ -448,6 +547,40 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
return groupAssignment;
}
+ private void adjustAssignment(final Map<TopicPartition, String> ownedPartitions,
+ final Map<String, Assignment> assignments) {
+ boolean revocationsNeeded = false;
+ Set<TopicPartition> assignedPartitions = new HashSet<>();
+ for (final Map.Entry<String, Assignment> entry : assignments.entrySet()) {
+ final Assignment assignment = entry.getValue();
+ assignedPartitions.addAll(assignment.partitions());
+
+ // update the assignment if the partition is owned by another different owner
+ List<TopicPartition> updatedPartitions = assignment.partitions().stream()
+ .filter(tp -> ownedPartitions.containsKey(tp) && !entry.getKey().equals(ownedPartitions.get(tp)))
+ .collect(Collectors.toList());
+ if (!updatedPartitions.equals(assignment.partitions())) {
+ assignment.updatePartitions(updatedPartitions);
+ revocationsNeeded = true;
+ }
+ }
+
+ // for all owned but not assigned partitions, blindly add them to assignment
+ for (final Map.Entry<TopicPartition, String> entry : ownedPartitions.entrySet()) {
+ final TopicPartition tp = entry.getKey();
+ if (!assignedPartitions.contains(tp)) {
+ assignments.get(entry.getValue()).partitions().add(tp);
+ }
+ }
+
+ // if revocations are triggered, tell everyone to re-join immediately.
+ if (revocationsNeeded) {
+ for (final Assignment assignment : assignments.values()) {
+ assignment.setError(ConsumerProtocol.AssignmentError.NEED_REJOIN);
+ }
+ }
+ }
+
@Override
protected void onJoinPrepare(int generation, String memberId) {
// commit offsets prior to rebalance if auto-commit enabled
@@ -455,14 +588,26 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
// execute the user's callback before rebalance
ConsumerRebalanceListener listener = subscriptions.rebalanceListener();
- Set<TopicPartition> revoked = subscriptions.assignedPartitions();
- log.info("Revoking previously assigned partitions {}", revoked);
- try {
- listener.onPartitionsRevoked(revoked);
- } catch (WakeupException | InterruptException e) {
- throw e;
- } catch (Exception e) {
- log.error("User provided listener {} failed on partition revocation", listener.getClass().getName(), e);
+
+ switch (protocol) {
+ case EAGER:
+ Set<TopicPartition> revokedPartitions = new HashSet<>(subscriptions.assignedPartitions());
+ log.info("Revoking previously assigned partitions {}", revokedPartitions);
+ try {
+ listener.onPartitionsRevoked(revokedPartitions);
+ } catch (WakeupException | InterruptException e) {
+ throw e;
+ } catch (Exception e) {
+ log.error("User provided listener {} failed on partition revocation", listener.getClass().getName(), e);
+ }
+
+ // also clear the assigned partitions since all have been revoked
+ subscriptions.assignFromSubscribed(Collections.emptySet());
+
+ break;
+
+ case COOPERATIVE:
+ break;
}
isLeader = false;
@@ -1047,4 +1192,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
}
+ /* test-only classes below */
+ RebalanceProtocol getProtocol() {
+ return protocol;
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index 9e7961e..d05d5b0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -27,12 +27,8 @@ import org.apache.kafka.common.utils.CollectionUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
-
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
/**
* ConsumerProtocol contains the schemas for consumer subscriptions and assignments for use with
@@ -56,7 +52,22 @@ import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
* ErrorCode => [int16]
* </pre>
*
- * Older versioned formats can be inferred by reading the code below.
+ * Version 0 format:
+ *
+ * <pre>
+ * Subscription => Version Topics
+ * Version => Int16
+ * Topics => [String]
+ * UserData => Bytes
+ *
+ * Assignment => Version TopicPartitions
+ * Version => int16
+ * AssignedPartitions => [Topic Partitions]
+ * Topic => String
+ * Partitions => [int32]
+ * UserData => Bytes
+ * </pre>
+ *
*
* The current implementation assumes that future versions will not break compatibility. When
* it encounters a newer version, it parses it using the current format. This basically means
@@ -74,6 +85,8 @@ public class ConsumerProtocol {
public static final String TOPIC_PARTITIONS_KEY_NAME = "topic_partitions";
public static final String USER_DATA_KEY_NAME = "user_data";
+ public static final Field.Int16 ERROR_CODE = new Field.Int16("error_code", "Assignment error code");
+
public static final short CONSUMER_PROTOCOL_V0 = 0;
public static final short CONSUMER_PROTOCOL_V1 = 1;
@@ -106,13 +119,13 @@ public class ConsumerProtocol {
new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES),
ERROR_CODE);
- public enum Errors {
+ public enum AssignmentError {
NONE(0),
NEED_REJOIN(1);
private final short code;
- Errors(final int code) {
+ AssignmentError(final int code) {
this.code = (short) code;
}
@@ -120,7 +133,7 @@ public class ConsumerProtocol {
return code;
}
- public static Errors fromCode(final short code) {
+ public static AssignmentError fromCode(final short code) {
switch (code) {
case 0:
return NONE;
@@ -179,20 +192,17 @@ public class ConsumerProtocol {
}
}
- public static PartitionAssignor.Subscription buildSubscriptionV0(ByteBuffer buffer, Optional<String> groupInstanceId) {
+ public static PartitionAssignor.Subscription deserializeSubscriptionV0(ByteBuffer buffer) {
Struct struct = SUBSCRIPTION_V0.read(buffer);
ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
List<String> topics = new ArrayList<>();
for (Object topicObj : struct.getArray(TOPICS_KEY_NAME))
topics.add((String) topicObj);
- return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V0,
- topics,
- userData,
- Collections.emptyList(),
- groupInstanceId);
+
+ return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V0, topics, userData);
}
- public static PartitionAssignor.Subscription buildSubscriptionV1(ByteBuffer buffer, Optional<String> groupInstanceId) {
+ public static PartitionAssignor.Subscription deserializeSubscriptionV1(ByteBuffer buffer) {
Struct struct = SUBSCRIPTION_V1.read(buffer);
ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
List<String> topics = new ArrayList<>();
@@ -208,14 +218,10 @@ public class ConsumerProtocol {
}
}
- return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V1,
- topics,
- userData,
- ownedPartitions,
- groupInstanceId);
+ return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions);
}
- public static PartitionAssignor.Subscription buildSubscription(ByteBuffer buffer, Optional<String> groupInstanceId) {
+ public static PartitionAssignor.Subscription deserializeSubscription(ByteBuffer buffer) {
Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
Short version = header.getShort(VERSION_KEY_NAME);
@@ -224,14 +230,14 @@ public class ConsumerProtocol {
switch (version) {
case CONSUMER_PROTOCOL_V0:
- return buildSubscriptionV0(buffer, groupInstanceId);
+ return deserializeSubscriptionV0(buffer);
case CONSUMER_PROTOCOL_V1:
- return buildSubscriptionV1(buffer, groupInstanceId);
+ return deserializeSubscriptionV1(buffer);
// assume all higher versions can be parsed as V1
default:
- return buildSubscriptionV1(buffer, groupInstanceId);
+ return deserializeSubscriptionV1(buffer);
}
}
@@ -267,7 +273,7 @@ public class ConsumerProtocol {
topicAssignments.add(topicAssignment);
}
struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray());
- struct.set(ERROR_CODE.name, assignment.error().code);
+ struct.set(ERROR_CODE, assignment.error().code);
ByteBuffer buffer = ByteBuffer.allocate(CONSUMER_PROTOCOL_HEADER_V1.sizeOf() + ASSIGNMENT_V1.sizeOf(struct));
CONSUMER_PROTOCOL_HEADER_V1.writeTo(buffer);
@@ -316,7 +322,7 @@ public class ConsumerProtocol {
}
}
- Errors error = Errors.fromCode(struct.get(ERROR_CODE));
+ AssignmentError error = AssignmentError.fromCode(struct.get(ERROR_CODE));
return new PartitionAssignor.Assignment(CONSUMER_PROTOCOL_V1, partitions, userData, error);
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
index 921a55b..c26f684 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
@@ -80,7 +80,7 @@ public interface PartitionAssignor {
}
/**
- * Indicate which rebalance protocol this assignor can would work with;
+ * Indicate which rebalance protocol this assignor works with;
* By default it should always work with {@link RebalanceProtocol#EAGER}.
*/
default List<RebalanceProtocol> supportedProtocols() {
@@ -88,7 +88,7 @@ public interface PartitionAssignor {
}
/**
- * Return the version of the assignor which indicate how the user metadata encodings
+ * Return the version of the assignor which indicates how the user metadata encodings
* and the assignment algorithm gets evolved.
*/
default short version() {
@@ -131,18 +131,17 @@ public interface PartitionAssignor {
private final List<String> topics;
private final ByteBuffer userData;
private final List<TopicPartition> ownedPartitions;
- private final Optional<String> groupInstanceId;
+ private Optional<String> groupInstanceId;
Subscription(Short version,
List<String> topics,
ByteBuffer userData,
- List<TopicPartition> ownedPartitions,
- Optional<String> groupInstanceId) {
+ List<TopicPartition> ownedPartitions) {
this.version = version;
this.topics = topics;
this.userData = userData;
this.ownedPartitions = ownedPartitions;
- this.groupInstanceId = groupInstanceId;
+ this.groupInstanceId = Optional.empty();
if (version < CONSUMER_PROTOCOL_V0)
throw new SchemaException("Unsupported subscription version: " + version);
@@ -152,14 +151,11 @@ public interface PartitionAssignor {
}
Subscription(Short version, List<String> topics, ByteBuffer userData) {
- this(version, topics, userData, Collections.emptyList(), Optional.empty());
+ this(version, topics, userData, Collections.emptyList());
}
- public Subscription(List<String> topics,
- ByteBuffer userData,
- List<TopicPartition> ownedPartitions,
- Optional<String> groupInstanceId) {
- this(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions, groupInstanceId);
+ public Subscription(List<String> topics, ByteBuffer userData, List<TopicPartition> ownedPartitions) {
+ this(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions);
}
public Subscription(List<String> topics, ByteBuffer userData) {
@@ -186,6 +182,10 @@ public interface PartitionAssignor {
return userData;
}
+ public void setGroupInstanceId(Optional<String> groupInstanceId) {
+ this.groupInstanceId = groupInstanceId;
+ }
+
public Optional<String> groupInstanceId() {
return groupInstanceId;
}
@@ -202,11 +202,11 @@ public interface PartitionAssignor {
class Assignment {
private final Short version;
- private final List<TopicPartition> partitions;
+ private List<TopicPartition> partitions;
private final ByteBuffer userData;
- private final ConsumerProtocol.Errors error;
+ private ConsumerProtocol.AssignmentError error;
- Assignment(Short version, List<TopicPartition> partitions, ByteBuffer userData, ConsumerProtocol.Errors error) {
+ Assignment(Short version, List<TopicPartition> partitions, ByteBuffer userData, ConsumerProtocol.AssignmentError error) {
this.version = version;
this.partitions = partitions;
this.userData = userData;
@@ -215,16 +215,12 @@ public interface PartitionAssignor {
if (version < CONSUMER_PROTOCOL_V0)
throw new SchemaException("Unsupported subscription version: " + version);
- if (version < CONSUMER_PROTOCOL_V1 && error != ConsumerProtocol.Errors.NONE)
+ if (version < CONSUMER_PROTOCOL_V1 && error != ConsumerProtocol.AssignmentError.NONE)
throw new IllegalArgumentException("Assignment version smaller than 1 should not have error code.");
}
Assignment(Short version, List<TopicPartition> partitions, ByteBuffer userData) {
- this(version, partitions, userData, ConsumerProtocol.Errors.NONE);
- }
-
- public Assignment(List<TopicPartition> partitions, ByteBuffer userData, ConsumerProtocol.Errors error) {
- this(CONSUMER_PROTOCOL_V1, partitions, userData, error);
+ this(version, partitions, userData, ConsumerProtocol.AssignmentError.NONE);
}
public Assignment(List<TopicPartition> partitions, ByteBuffer userData) {
@@ -243,10 +239,18 @@ public interface PartitionAssignor {
return partitions;
}
- public ConsumerProtocol.Errors error() {
+ public ConsumerProtocol.AssignmentError error() {
return error;
}
+ public void updatePartitions(List<TopicPartition> partitions) {
+ this.partitions = partitions;
+ }
+
+ public void setError(ConsumerProtocol.AssignmentError error) {
+ this.error = error;
+ }
+
public ByteBuffer userData() {
return userData;
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index b23005e..650f670 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -221,7 +221,7 @@ public class MockClient implements KafkaClient {
builder.latestAllowedVersion());
AbstractRequest abstractRequest = request.requestBuilder().build(version);
if (!futureResp.requestMatcher.matches(abstractRequest))
- throw new IllegalStateException("Request matcher did not match next-in-line request " + abstractRequest);
+ throw new IllegalStateException("Request matcher did not match next-in-line request " + abstractRequest + " with prepared response " + futureResp.responseBody);
UnsupportedVersionException unsupportedVersionException = null;
if (futureResp.isUnsupportedRequest)
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 0f9b956..c1adf19 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1692,7 +1692,7 @@ public class KafkaConsumerTest {
assertTrue(protocolIterator.hasNext());
ByteBuffer protocolMetadata = ByteBuffer.wrap(protocolIterator.next().metadata());
- PartitionAssignor.Subscription subscription = ConsumerProtocol.buildSubscription(protocolMetadata, Optional.empty());
+ PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata);
return subscribedTopics.equals(new HashSet<>(subscription.topics()));
}
}, joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 1dc1780..a0878fb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -71,6 +71,8 @@ import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -97,6 +99,7 @@ import java.util.regex.Pattern;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
+import static org.apache.kafka.test.TestUtils.toSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -105,6 +108,7 @@ import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+@RunWith(value = Parameterized.class)
public class ConsumerCoordinatorTest {
private final String topic1 = "test1";
private final String topic2 = "test2";
@@ -121,8 +125,9 @@ public class ConsumerCoordinatorTest {
private final MockTime time = new MockTime();
private GroupRebalanceConfig rebalanceConfig;
- private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor();
- private List<PartitionAssignor> assignors = Collections.singletonList(partitionAssignor);
+ private final PartitionAssignor.RebalanceProtocol protocol;
+ private final MockPartitionAssignor partitionAssignor;
+ private final List<PartitionAssignor> assignors;
private MockClient client;
private MetadataResponse metadataResponse = TestUtils.metadataUpdateWith(1, new HashMap<String, Integer>() {
{
@@ -139,6 +144,21 @@ public class ConsumerCoordinatorTest {
private MockCommitCallback mockOffsetCommitCallback;
private ConsumerCoordinator coordinator;
+ public ConsumerCoordinatorTest(final PartitionAssignor.RebalanceProtocol protocol) {
+ this.protocol = protocol;
+ this.partitionAssignor = new MockPartitionAssignor(Collections.singletonList(protocol));
+ this.assignors = Collections.singletonList(partitionAssignor);
+ }
+
+ @Parameterized.Parameters(name = "rebalance protocol = {0}")
+ public static Collection<Object[]> data() {
+ final List<Object[]> values = new ArrayList<>();
+ for (final PartitionAssignor.RebalanceProtocol protocol: PartitionAssignor.RebalanceProtocol.values()) {
+ values.add(new Object[]{protocol});
+ }
+ return values;
+ }
+
@Before
public void setup() {
LogContext logContext = new LogContext();
@@ -177,6 +197,25 @@ public class ConsumerCoordinatorTest {
}
@Test
+ public void testSelectRebalanceProtcol() {
+ List<PartitionAssignor> assignors = new ArrayList<>();
+ assignors.add(new MockPartitionAssignor(Collections.singletonList(PartitionAssignor.RebalanceProtocol.EAGER)));
+ assignors.add(new MockPartitionAssignor(Collections.singletonList(PartitionAssignor.RebalanceProtocol.COOPERATIVE)));
+
+ // no commonly supported protocols
+ assertThrows(IllegalArgumentException.class, () -> buildCoordinator(rebalanceConfig, new Metrics(), assignors, false));
+
+ assignors.clear();
+ assignors.add(new MockPartitionAssignor(Arrays.asList(PartitionAssignor.RebalanceProtocol.EAGER, PartitionAssignor.RebalanceProtocol.COOPERATIVE)));
+ assignors.add(new MockPartitionAssignor(Arrays.asList(PartitionAssignor.RebalanceProtocol.EAGER, PartitionAssignor.RebalanceProtocol.COOPERATIVE)));
+
+ // select higher indexed (more advanced) protocols
+ try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, false)) {
+ assertEquals(PartitionAssignor.RebalanceProtocol.COOPERATIVE, coordinator.getProtocol());
+ }
+ }
+
+ @Test
public void testNormalHeartbeat() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -416,6 +455,9 @@ public class ConsumerCoordinatorTest {
@Test
public void testNormalJoinGroupLeader() {
final String consumerId = "leader";
+ final Set<String> subscription = singleton(topic1);
+ final List<TopicPartition> owned = Collections.emptyList();
+ final List<TopicPartition> assigned = Arrays.asList(t1p);
subscriptions.subscribe(singleton(topic1), rebalanceListener);
@@ -427,7 +469,7 @@ public class ConsumerCoordinatorTest {
// normal join group
Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
- partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p)));
+ partitionAssignor.prepare(singletonMap(consumerId, assigned));
client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
client.prepareResponse(new MockClient.RequestMatcher() {
@@ -438,34 +480,39 @@ public class ConsumerCoordinatorTest {
sync.data.generationId() == 1 &&
sync.groupAssignments().containsKey(consumerId);
}
- }, syncGroupResponse(singletonList(t1p), Errors.NONE));
+ }, syncGroupResponse(assigned, Errors.NONE));
coordinator.poll(time.timer(Long.MAX_VALUE));
assertFalse(coordinator.rejoinNeededOrPending());
- assertEquals(singleton(t1p), subscriptions.assignedPartitions());
- assertEquals(singleton(topic1), subscriptions.groupSubscription());
+ assertEquals(toSet(assigned), subscriptions.assignedPartitions());
+ assertEquals(subscription, subscriptions.groupSubscription());
assertEquals(1, rebalanceListener.revokedCount);
- assertEquals(Collections.emptySet(), rebalanceListener.revoked);
+ assertEquals(getRevoked(owned, assigned), rebalanceListener.revoked);
assertEquals(1, rebalanceListener.assignedCount);
- assertEquals(singleton(t1p), rebalanceListener.assigned);
+ assertEquals(getAdded(owned, assigned), rebalanceListener.assigned);
}
@Test
public void testOutdatedCoordinatorAssignment() {
final String consumerId = "outdated_assignment";
+ final List<TopicPartition> owned = Collections.emptyList();
+ final List<String> oldSubscription = singletonList(topic2);
+ final List<TopicPartition> oldAssignment = Arrays.asList(t2p);
+ final List<String> newSubscription = singletonList(topic1);
+ final List<TopicPartition> newAssignment = Arrays.asList(t1p);
- subscriptions.subscribe(singleton(topic2), rebalanceListener);
+ subscriptions.subscribe(toSet(oldSubscription), rebalanceListener);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
// Test coordinator returning unsubscribed partitions
- partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p)));
+ partitionAssignor.prepare(singletonMap(consumerId, newAssignment));
// First incorrect assignment for subscription
client.prepareResponse(
joinGroupLeaderResponse(
- 1, consumerId, singletonMap(consumerId, singletonList(topic2)), Errors.NONE));
+ 1, consumerId, singletonMap(consumerId, oldSubscription), Errors.NONE));
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
@@ -474,12 +521,12 @@ public class ConsumerCoordinatorTest {
sync.data.generationId() == 1 &&
sync.groupAssignments().containsKey(consumerId);
}
- }, syncGroupResponse(Arrays.asList(t2p), Errors.NONE));
+ }, syncGroupResponse(oldAssignment, Errors.NONE));
// Second correct assignment for subscription
client.prepareResponse(
joinGroupLeaderResponse(
- 1, consumerId, singletonMap(consumerId, singletonList(topic1)), Errors.NONE));
+ 1, consumerId, singletonMap(consumerId, newSubscription), Errors.NONE));
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
@@ -488,29 +535,42 @@ public class ConsumerCoordinatorTest {
sync.data.generationId() == 1 &&
sync.groupAssignments().containsKey(consumerId);
}
- }, syncGroupResponse(singletonList(t1p), Errors.NONE));
+ }, syncGroupResponse(newAssignment, Errors.NONE));
// Poll once so that the join group future gets created and complete
coordinator.poll(time.timer(0));
// Before the sync group response gets completed change the subscription
- subscriptions.subscribe(singleton(topic1), rebalanceListener);
+ subscriptions.subscribe(toSet(newSubscription), rebalanceListener);
coordinator.poll(time.timer(0));
coordinator.poll(time.timer(Long.MAX_VALUE));
+ final Collection<TopicPartition> revoked = getRevoked(owned, newAssignment);
+ final Collection<TopicPartition> assigned = getAdded(owned, newAssignment);
+
+ int revokeCount = 1;
+ final int addCount = 1;
+
+ // with eager protocol we will call revoke on the old assignment as well
+ if (protocol == PartitionAssignor.RebalanceProtocol.EAGER) {
+ revokeCount += 1;
+ }
+
assertFalse(coordinator.rejoinNeededOrPending());
- assertEquals(singleton(t1p), subscriptions.assignedPartitions());
- assertEquals(singleton(topic1), subscriptions.groupSubscription());
- assertEquals(2, rebalanceListener.revokedCount);
- assertEquals(Collections.emptySet(), rebalanceListener.revoked);
- assertEquals(1, rebalanceListener.assignedCount);
- assertEquals(singleton(t1p), rebalanceListener.assigned);
+ assertEquals(toSet(newAssignment), subscriptions.assignedPartitions());
+ assertEquals(toSet(newSubscription), subscriptions.groupSubscription());
+ assertEquals(revokeCount, rebalanceListener.revokedCount);
+ assertEquals(revoked, rebalanceListener.revoked);
+ assertEquals(addCount, rebalanceListener.assignedCount);
+ assertEquals(assigned, rebalanceListener.assigned);
}
@Test
public void testPatternJoinGroupLeader() {
final String consumerId = "leader";
+ final List<TopicPartition> assigned = Arrays.asList(t1p, t2p);
+ final List<TopicPartition> owned = Collections.emptyList();
subscriptions.subscribe(Pattern.compile("test.*"), rebalanceListener);
@@ -523,7 +583,7 @@ public class ConsumerCoordinatorTest {
// normal join group
Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
- partitionAssignor.prepare(singletonMap(consumerId, Arrays.asList(t1p, t2p)));
+ partitionAssignor.prepare(singletonMap(consumerId, assigned));
client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
client.prepareResponse(new MockClient.RequestMatcher() {
@@ -534,7 +594,7 @@ public class ConsumerCoordinatorTest {
sync.data.generationId() == 1 &&
sync.groupAssignments().containsKey(consumerId);
}
- }, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE));
+ }, syncGroupResponse(assigned, Errors.NONE));
// expect client to force updating the metadata, if yes gives it both topics
client.prepareMetadataUpdate(metadataResponse);
@@ -545,14 +605,17 @@ public class ConsumerCoordinatorTest {
assertEquals(2, subscriptions.groupSubscription().size());
assertEquals(2, subscriptions.subscription().size());
assertEquals(1, rebalanceListener.revokedCount);
- assertEquals(Collections.emptySet(), rebalanceListener.revoked);
+ assertEquals(getRevoked(owned, assigned), rebalanceListener.revoked);
assertEquals(1, rebalanceListener.assignedCount);
- assertEquals(2, rebalanceListener.assigned.size());
+ assertEquals(getAdded(owned, assigned), rebalanceListener.assigned);
}
@Test
public void testMetadataRefreshDuringRebalance() {
final String consumerId = "leader";
+ final List<TopicPartition> owned = Collections.emptyList();
+ final List<TopicPartition> oldAssigned = Arrays.asList(t1p);
+
subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 1)));
@@ -564,11 +627,10 @@ public class ConsumerCoordinatorTest {
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
Map<String, List<String>> initialSubscription = singletonMap(consumerId, singletonList(topic1));
- partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p)));
+ partitionAssignor.prepare(singletonMap(consumerId, oldAssigned));
// the metadata will be updated in flight with a new topic added
final List<String> updatedSubscription = Arrays.asList(topic1, topic2);
- final Set<String> updatedSubscriptionSet = new HashSet<>(updatedSubscription);
client.prepareResponse(joinGroupLeaderResponse(1, consumerId, initialSubscription, Errors.NONE));
client.prepareResponse(new MockClient.RequestMatcher() {
@@ -580,14 +642,22 @@ public class ConsumerCoordinatorTest {
client.updateMetadata(TestUtils.metadataUpdateWith(1, updatedPartitions));
return true;
}
- }, syncGroupResponse(singletonList(t1p), Errors.NONE));
+ }, syncGroupResponse(oldAssigned, Errors.NONE));
coordinator.poll(time.timer(Long.MAX_VALUE));
- List<TopicPartition> newAssignment = Arrays.asList(t1p, t2p);
- Set<TopicPartition> newAssignmentSet = new HashSet<>(newAssignment);
+ // rejoin will only be set in the next poll call
+ assertFalse(coordinator.rejoinNeededOrPending());
+ assertEquals(singleton(topic1), subscriptions.subscription());
+ assertEquals(toSet(oldAssigned), subscriptions.assignedPartitions());
+ assertEquals(1, rebalanceListener.revokedCount);
+ assertEquals(getRevoked(owned, oldAssigned), rebalanceListener.revoked);
+ assertEquals(1, rebalanceListener.assignedCount);
+ assertEquals(getAdded(owned, oldAssigned), rebalanceListener.assigned);
+
+ List<TopicPartition> newAssigned = Arrays.asList(t1p, t2p);
Map<String, List<String>> updatedSubscriptions = singletonMap(consumerId, Arrays.asList(topic1, topic2));
- partitionAssignor.prepare(singletonMap(consumerId, newAssignment));
+ partitionAssignor.prepare(singletonMap(consumerId, newAssigned));
// we expect to see a second rebalance with the new-found topics
client.prepareResponse(new MockClient.RequestMatcher() {
@@ -600,22 +670,22 @@ public class ConsumerCoordinatorTest {
JoinGroupRequestData.JoinGroupRequestProtocol protocolMetadata = protocolIterator.next();
ByteBuffer metadata = ByteBuffer.wrap(protocolMetadata.metadata());
- PartitionAssignor.Subscription subscription = ConsumerProtocol.buildSubscription(metadata, Optional.empty());
+ PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(metadata);
metadata.rewind();
- return subscription.topics().containsAll(updatedSubscriptionSet);
+ return subscription.topics().containsAll(updatedSubscription);
}
}, joinGroupLeaderResponse(2, consumerId, updatedSubscriptions, Errors.NONE));
- client.prepareResponse(syncGroupResponse(newAssignment, Errors.NONE));
+ client.prepareResponse(syncGroupResponse(newAssigned, Errors.NONE));
coordinator.poll(time.timer(Long.MAX_VALUE));
assertFalse(coordinator.rejoinNeededOrPending());
- assertEquals(updatedSubscriptionSet, subscriptions.subscription());
- assertEquals(newAssignmentSet, subscriptions.assignedPartitions());
+ assertEquals(toSet(updatedSubscription), subscriptions.subscription());
+ assertEquals(toSet(newAssigned), subscriptions.assignedPartitions());
assertEquals(2, rebalanceListener.revokedCount);
- assertEquals(singleton(t1p), rebalanceListener.revoked);
+ assertEquals(getRevoked(oldAssigned, newAssigned), rebalanceListener.revoked);
assertEquals(2, rebalanceListener.assignedCount);
- assertEquals(newAssignmentSet, rebalanceListener.assigned);
+ assertEquals(getAdded(oldAssigned, newAssigned), rebalanceListener.assigned);
}
@Test
@@ -665,6 +735,8 @@ public class ConsumerCoordinatorTest {
@Test
public void testWakeupDuringJoin() {
final String consumerId = "leader";
+ final List<TopicPartition> owned = Collections.emptyList();
+ final List<TopicPartition> assigned = Arrays.asList(t1p);
subscriptions.subscribe(singleton(topic1), rebalanceListener);
@@ -675,7 +747,7 @@ public class ConsumerCoordinatorTest {
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
- partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p)));
+ partitionAssignor.prepare(singletonMap(consumerId, assigned));
// prepare only the first half of the join and then trigger the wakeup
client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
@@ -688,22 +760,25 @@ public class ConsumerCoordinatorTest {
}
// now complete the second half
- client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+ client.prepareResponse(syncGroupResponse(assigned, Errors.NONE));
coordinator.poll(time.timer(Long.MAX_VALUE));
assertFalse(coordinator.rejoinNeededOrPending());
- assertEquals(singleton(t1p), subscriptions.assignedPartitions());
+ assertEquals(toSet(assigned), subscriptions.assignedPartitions());
assertEquals(1, rebalanceListener.revokedCount);
- assertEquals(Collections.emptySet(), rebalanceListener.revoked);
+ assertEquals(getRevoked(owned, assigned), rebalanceListener.revoked);
assertEquals(1, rebalanceListener.assignedCount);
- assertEquals(singleton(t1p), rebalanceListener.assigned);
+ assertEquals(getAdded(owned, assigned), rebalanceListener.assigned);
}
@Test
public void testNormalJoinGroupFollower() {
final String consumerId = "consumer";
+ final Set<String> subscription = singleton(topic1);
+ final List<TopicPartition> owned = Collections.emptyList();
+ final List<TopicPartition> assigned = Arrays.asList(t1p);
- subscriptions.subscribe(singleton(topic1), rebalanceListener);
+ subscriptions.subscribe(subscription, rebalanceListener);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -718,17 +793,17 @@ public class ConsumerCoordinatorTest {
sync.data.generationId() == 1 &&
sync.groupAssignments().isEmpty();
}
- }, syncGroupResponse(singletonList(t1p), Errors.NONE));
+ }, syncGroupResponse(assigned, Errors.NONE));
coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
assertFalse(coordinator.rejoinNeededOrPending());
- assertEquals(singleton(t1p), subscriptions.assignedPartitions());
- assertEquals(singleton(topic1), subscriptions.groupSubscription());
+ assertEquals(toSet(assigned), subscriptions.assignedPartitions());
+ assertEquals(subscription, subscriptions.groupSubscription());
assertEquals(1, rebalanceListener.revokedCount);
- assertEquals(Collections.emptySet(), rebalanceListener.revoked);
+ assertEquals(getRevoked(owned, assigned), rebalanceListener.revoked);
assertEquals(1, rebalanceListener.assignedCount);
- assertEquals(singleton(t1p), rebalanceListener.assigned);
+ assertEquals(getAdded(owned, assigned), rebalanceListener.assigned);
}
@Test
@@ -763,6 +838,9 @@ public class ConsumerCoordinatorTest {
@Test
public void testPatternJoinGroupFollower() {
final String consumerId = "consumer";
+ final Set<String> subscription = Utils.mkSet(topic1, topic2);
+ final List<TopicPartition> owned = Collections.emptyList();
+ final List<TopicPartition> assigned = Arrays.asList(t1p, t2p);
subscriptions.subscribe(Pattern.compile("test.*"), rebalanceListener);
@@ -783,18 +861,19 @@ public class ConsumerCoordinatorTest {
sync.data.generationId() == 1 &&
sync.groupAssignments().isEmpty();
}
- }, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE));
+ }, syncGroupResponse(assigned, Errors.NONE));
// expect client to force updating the metadata, if yes gives it both topics
client.prepareMetadataUpdate(metadataResponse);
coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
assertFalse(coordinator.rejoinNeededOrPending());
- assertEquals(2, subscriptions.numAssignedPartitions());
- assertEquals(2, subscriptions.subscription().size());
+ assertEquals(assigned.size(), subscriptions.numAssignedPartitions());
+ assertEquals(subscription, subscriptions.subscription());
assertEquals(1, rebalanceListener.revokedCount);
+ assertEquals(getRevoked(owned, assigned), rebalanceListener.revoked);
assertEquals(1, rebalanceListener.assignedCount);
- assertEquals(2, rebalanceListener.assigned.size());
+ assertEquals(getAdded(owned, assigned), rebalanceListener.assigned);
}
@Test
@@ -985,7 +1064,7 @@ public class ConsumerCoordinatorTest {
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
- partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p)));
+ partitionAssignor.prepare(singletonMap(consumerId, Arrays.asList(t1p)));
// the leader is responsible for picking up metadata changes and forcing a group rebalance
client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
@@ -1023,7 +1102,7 @@ public class ConsumerCoordinatorTest {
// prepare initial rebalance
Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, topics);
- partitionAssignor.prepare(singletonMap(consumerId, Collections.singletonList(tp1)));
+ partitionAssignor.prepare(singletonMap(consumerId, Arrays.asList(tp1)));
client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
client.prepareResponse(new MockClient.RequestMatcher() {
@@ -1086,7 +1165,6 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
client.prepareResponse(syncGroupResponse(Collections.singletonList(partition), Errors.NONE));
-
// The first call to poll should raise the exception from the rebalance listener
try {
coordinator.poll(time.timer(Long.MAX_VALUE));
@@ -1172,56 +1250,58 @@ public class ConsumerCoordinatorTest {
metadata = new ConsumerMetadata(0, Long.MAX_VALUE, includeInternalTopics,
false, subscriptions, new LogContext(), new ClusterResourceListeners());
client = new MockClient(time, metadata);
- coordinator = buildCoordinator(rebalanceConfig,
- new Metrics(),
- assignors,
- false);
-
- subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
-
- Node node = new Node(0, "localhost", 9999);
- MetadataResponse.PartitionMetadata partitionMetadata =
+ try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, false)) {
+ subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
+ Node node = new Node(0, "localhost", 9999);
+ MetadataResponse.PartitionMetadata partitionMetadata =
new MetadataResponse.PartitionMetadata(Errors.NONE, 0, node, Optional.empty(),
- singletonList(node), singletonList(node), singletonList(node));
- MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE,
+ singletonList(node), singletonList(node), singletonList(node));
+ MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE,
Topic.GROUP_METADATA_TOPIC_NAME, true, singletonList(partitionMetadata));
- client.updateMetadata(MetadataResponse.prepareResponse(singletonList(node), "clusterId", node.id(),
+ client.updateMetadata(MetadataResponse.prepareResponse(singletonList(node), "clusterId", node.id(),
singletonList(topicMetadata)));
- coordinator.maybeUpdateSubscriptionMetadata();
+ coordinator.maybeUpdateSubscriptionMetadata();
- assertEquals(includeInternalTopics, subscriptions.subscription().contains(Topic.GROUP_METADATA_TOPIC_NAME));
+ assertEquals(includeInternalTopics, subscriptions.subscription().contains(Topic.GROUP_METADATA_TOPIC_NAME));
+ }
}
@Test
public void testRejoinGroup() {
String otherTopic = "otherTopic";
+ final List<TopicPartition> owned = Collections.emptyList();
+ final List<TopicPartition> assigned = Arrays.asList(t1p);
subscriptions.subscribe(singleton(topic1), rebalanceListener);
// join the group once
- joinAsFollowerAndReceiveAssignment("consumer", coordinator, singletonList(t1p));
+ joinAsFollowerAndReceiveAssignment("consumer", coordinator, assigned);
assertEquals(1, rebalanceListener.revokedCount);
- assertTrue(rebalanceListener.revoked.isEmpty());
+ assertEquals(getRevoked(owned, assigned), rebalanceListener.revoked);
assertEquals(1, rebalanceListener.assignedCount);
- assertEquals(singleton(t1p), rebalanceListener.assigned);
+ assertEquals(getAdded(owned, assigned), rebalanceListener.assigned);
// and join the group again
+ rebalanceListener.revoked = null;
+ rebalanceListener.assigned = null;
subscriptions.subscribe(new HashSet<>(Arrays.asList(topic1, otherTopic)), rebalanceListener);
client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE));
- client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+ client.prepareResponse(syncGroupResponse(assigned, Errors.NONE));
coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
assertEquals(2, rebalanceListener.revokedCount);
- assertEquals(singleton(t1p), rebalanceListener.revoked);
+ assertEquals(getRevoked(assigned, assigned), rebalanceListener.revoked);
assertEquals(2, rebalanceListener.assignedCount);
- assertEquals(singleton(t1p), rebalanceListener.assigned);
+ assertEquals(getAdded(assigned, assigned), rebalanceListener.assigned);
}
@Test
public void testDisconnectInJoin() {
subscriptions.subscribe(singleton(topic1), rebalanceListener);
+ final List<TopicPartition> owned = Collections.emptyList();
+ final List<TopicPartition> assigned = Arrays.asList(t1p);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -1230,14 +1310,15 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE), true);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
- client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+ client.prepareResponse(syncGroupResponse(assigned, Errors.NONE));
coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
assertFalse(coordinator.rejoinNeededOrPending());
- assertEquals(singleton(t1p), subscriptions.assignedPartitions());
+ assertEquals(toSet(assigned), subscriptions.assignedPartitions());
assertEquals(1, rebalanceListener.revokedCount);
+ assertEquals(getRevoked(owned, assigned), rebalanceListener.revoked);
assertEquals(1, rebalanceListener.assignedCount);
- assertEquals(singleton(t1p), rebalanceListener.assigned);
+ assertEquals(getAdded(owned, assigned), rebalanceListener.assigned);
}
@Test(expected = ApiException.class)
@@ -1303,170 +1384,156 @@ public class ConsumerCoordinatorTest {
public void testAutoCommitDynamicAssignment() {
final String consumerId = "consumer";
- ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
- new Metrics(),
- assignors,
- true);
-
- subscriptions.subscribe(singleton(topic1), rebalanceListener);
- joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
- subscriptions.seek(t1p, 100);
-
- prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
- time.sleep(autoCommitIntervalMs);
- coordinator.poll(time.timer(Long.MAX_VALUE));
- assertFalse(client.hasPendingResponses());
+ try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors,
+ true)
+ ) {
+ subscriptions.subscribe(singleton(topic1), rebalanceListener);
+ joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
+ subscriptions.seek(t1p, 100);
+ prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
+ time.sleep(autoCommitIntervalMs);
+ coordinator.poll(time.timer(Long.MAX_VALUE));
+ assertFalse(client.hasPendingResponses());
+ }
}
@Test
public void testAutoCommitRetryBackoff() {
final String consumerId = "consumer";
- ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
- new Metrics(),
- assignors,
- true);
- subscriptions.subscribe(singleton(topic1), rebalanceListener);
- joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
- subscriptions.seek(t1p, 100);
- time.sleep(autoCommitIntervalMs);
+ try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors,
+ true)) {
+ subscriptions.subscribe(singleton(topic1), rebalanceListener);
+ joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
- // Send an offset commit, but let it fail with a retriable error
- prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NOT_COORDINATOR);
- coordinator.poll(time.timer(Long.MAX_VALUE));
- assertTrue(coordinator.coordinatorUnknown());
+ subscriptions.seek(t1p, 100);
+ time.sleep(autoCommitIntervalMs);
- // After the disconnect, we should rediscover the coordinator
- client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
- coordinator.poll(time.timer(Long.MAX_VALUE));
+ // Send an offset commit, but let it fail with a retriable error
+ prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NOT_COORDINATOR);
+ coordinator.poll(time.timer(Long.MAX_VALUE));
+ assertTrue(coordinator.coordinatorUnknown());
- subscriptions.seek(t1p, 200);
+ // After the disconnect, we should rediscover the coordinator
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ coordinator.poll(time.timer(Long.MAX_VALUE));
- // Until the retry backoff has expired, we should not retry the offset commit
- time.sleep(retryBackoffMs / 2);
- coordinator.poll(time.timer(Long.MAX_VALUE));
- assertEquals(0, client.inFlightRequestCount());
+ subscriptions.seek(t1p, 200);
- // Once the backoff expires, we should retry
- time.sleep(retryBackoffMs / 2);
- coordinator.poll(time.timer(Long.MAX_VALUE));
- assertEquals(1, client.inFlightRequestCount());
- respondToOffsetCommitRequest(singletonMap(t1p, 200L), Errors.NONE);
+ // Until the retry backoff has expired, we should not retry the offset commit
+ time.sleep(retryBackoffMs / 2);
+ coordinator.poll(time.timer(Long.MAX_VALUE));
+ assertEquals(0, client.inFlightRequestCount());
+
+ // Once the backoff expires, we should retry
+ time.sleep(retryBackoffMs / 2);
+ coordinator.poll(time.timer(Long.MAX_VALUE));
+ assertEquals(1, client.inFlightRequestCount());
+ respondToOffsetCommitRequest(singletonMap(t1p, 200L), Errors.NONE);
+ }
}
@Test
public void testAutoCommitAwaitsInterval() {
final String consumerId = "consumer";
- ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
- new Metrics(),
- assignors,
- true);
- subscriptions.subscribe(singleton(topic1), rebalanceListener);
- joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
+ try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
+ subscriptions.subscribe(singleton(topic1), rebalanceListener);
+ joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
- subscriptions.seek(t1p, 100);
- time.sleep(autoCommitIntervalMs);
+ subscriptions.seek(t1p, 100);
+ time.sleep(autoCommitIntervalMs);
- // Send the offset commit request, but do not respond
- coordinator.poll(time.timer(Long.MAX_VALUE));
- assertEquals(1, client.inFlightRequestCount());
+ // Send the offset commit request, but do not respond
+ coordinator.poll(time.timer(Long.MAX_VALUE));
+ assertEquals(1, client.inFlightRequestCount());
- time.sleep(autoCommitIntervalMs / 2);
+ time.sleep(autoCommitIntervalMs / 2);
- // Ensure that no additional offset commit is sent
- coordinator.poll(time.timer(Long.MAX_VALUE));
- assertEquals(1, client.inFlightRequestCount());
+ // Ensure that no additional offset commit is sent
+ coordinator.poll(time.timer(Long.MAX_VALUE));
+ assertEquals(1, client.inFlightRequestCount());
- respondToOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
- coordinator.poll(time.timer(Long.MAX_VALUE));
- assertEquals(0, client.inFlightRequestCount());
+ respondToOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
+ coordinator.poll(time.timer(Long.MAX_VALUE));
+ assertEquals(0, client.inFlightRequestCount());
- subscriptions.seek(t1p, 200);
+ subscriptions.seek(t1p, 200);
- // If we poll again before the auto-commit interval, there should be no new sends
- coordinator.poll(time.timer(Long.MAX_VALUE));
- assertEquals(0, client.inFlightRequestCount());
+ // If we poll again before the auto-commit interval, there should be no new sends
+ coordinator.poll(time.timer(Long.MAX_VALUE));
+ assertEquals(0, client.inFlightRequestCount());
- // After the remainder of the interval passes, we send a new offset commit
- time.sleep(autoCommitIntervalMs / 2);
- coordinator.poll(time.timer(Long.MAX_VALUE));
- assertEquals(1, client.inFlightRequestCount());
- respondToOffsetCommitRequest(singletonMap(t1p, 200L), Errors.NONE);
+ // After the remainder of the interval passes, we send a new offset commit
+ time.sleep(autoCommitIntervalMs / 2);
+ coordinator.poll(time.timer(Long.MAX_VALUE));
+ assertEquals(1, client.inFlightRequestCount());
+ respondToOffsetCommitRequest(singletonMap(t1p, 200L), Errors.NONE);
+ }
}
@Test
public void testAutoCommitDynamicAssignmentRebalance() {
final String consumerId = "consumer";
- ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
- new Metrics(),
- assignors,
- true);
-
- subscriptions.subscribe(singleton(topic1), rebalanceListener);
-
- client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
- coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+ try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors,
+ true)) {
+ subscriptions.subscribe(singleton(topic1), rebalanceListener);
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
- // haven't joined, so should not cause a commit
- time.sleep(autoCommitIntervalMs);
- consumerClient.poll(time.timer(0));
+ // haven't joined, so should not cause a commit
+ time.sleep(autoCommitIntervalMs);
+ consumerClient.poll(time.timer(0));
- client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
- client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
- coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
+ client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+ client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+ coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
- subscriptions.seek(t1p, 100);
+ subscriptions.seek(t1p, 100);
- prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
- time.sleep(autoCommitIntervalMs);
- coordinator.poll(time.timer(Long.MAX_VALUE));
- assertFalse(client.hasPendingResponses());
+ prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
+ time.sleep(autoCommitIntervalMs);
+ coordinator.poll(time.timer(Long.MAX_VALUE));
+ assertFalse(client.hasPendingResponses());
+ }
}
@Test
public void testAutoCommitManualAssignment() {
- ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
- new Metrics(),
- assignors,
- true);
-
- subscriptions.assignFromUser(singleton(t1p));
- subscriptions.seek(t1p, 100);
-
- client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
- coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+ try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
+ subscriptions.assignFromUser(singleton(t1p));
+ subscriptions.seek(t1p, 100);
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
- prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
- time.sleep(autoCommitIntervalMs);
- coordinator.poll(time.timer(Long.MAX_VALUE));
- assertFalse(client.hasPendingResponses());
+ prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
+ time.sleep(autoCommitIntervalMs);
+ coordinator.poll(time.timer(Long.MAX_VALUE));
+ assertFalse(client.hasPendingResponses());
+ }
}
@Test
public void testAutoCommitManualAssignmentCoordinatorUnknown() {
- ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
- new Metrics(),
- assignors,
- true);
-
- subscriptions.assignFromUser(singleton(t1p));
- subscriptions.seek(t1p, 100);
+ try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
+ subscriptions.assignFromUser(singleton(t1p));
+ subscriptions.seek(t1p, 100);
- // no commit initially since coordinator is unknown
- consumerClient.poll(time.timer(0));
- time.sleep(autoCommitIntervalMs);
- consumerClient.poll(time.timer(0));
+ // no commit initially since coordinator is unknown
+ consumerClient.poll(time.timer(0));
+ time.sleep(autoCommitIntervalMs);
+ consumerClient.poll(time.timer(0));
- // now find the coordinator
- client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
- coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+ // now find the coordinator
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
- // sleep only for the retry backoff
- time.sleep(retryBackoffMs);
- prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
- coordinator.poll(time.timer(Long.MAX_VALUE));
- assertFalse(client.hasPendingResponses());
+ // sleep only for the retry backoff
+ time.sleep(retryBackoffMs);
+ prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
+ coordinator.poll(time.timer(Long.MAX_VALUE));
+ assertFalse(client.hasPendingResponses());
+ }
}
@Test
@@ -2001,115 +2068,125 @@ public class ConsumerCoordinatorTest {
@Test
public void testCloseDynamicAssignment() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.empty());
- gracefulCloseTest(coordinator, true);
+ try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.empty())) {
+ gracefulCloseTest(coordinator, true);
+ }
}
@Test
public void testCloseManualAssignment() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, Optional.empty());
- gracefulCloseTest(coordinator, false);
+ try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, Optional.empty())) {
+ gracefulCloseTest(coordinator, false);
+ }
}
@Test
public void testCloseCoordinatorNotKnownManualAssignment() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, Optional.empty());
- makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
- time.sleep(autoCommitIntervalMs);
- closeVerifyTimeout(coordinator, 1000, 1000, 1000);
+ try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, Optional.empty())) {
+ makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
+ time.sleep(autoCommitIntervalMs);
+ closeVerifyTimeout(coordinator, 1000, 1000, 1000);
+ }
}
@Test
public void testCloseCoordinatorNotKnownNoCommits() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.empty());
- makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
- closeVerifyTimeout(coordinator, 1000, 0, 0);
+ try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.empty())) {
+ makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
+ closeVerifyTimeout(coordinator, 1000, 0, 0);
+ }
}
@Test
public void testCloseCoordinatorNotKnownWithCommits() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId);
- makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
- time.sleep(autoCommitIntervalMs);
- closeVerifyTimeout(coordinator, 1000, 1000, 1000);
+ try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.empty())) {
+ makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
+ time.sleep(autoCommitIntervalMs);
+ closeVerifyTimeout(coordinator, 1000, 1000, 1000);
+ }
}
@Test
public void testCloseCoordinatorUnavailableNoCommits() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.empty());
- makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
- closeVerifyTimeout(coordinator, 1000, 0, 0);
+ try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.empty())) {
+ makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
+ closeVerifyTimeout(coordinator, 1000, 0, 0);
+ }
}
@Test
public void testCloseTimeoutCoordinatorUnavailableForCommit() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId);
- makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
- time.sleep(autoCommitIntervalMs);
- closeVerifyTimeout(coordinator, 1000, 1000, 1000);
+ try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId)) {
+ makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
+ time.sleep(autoCommitIntervalMs);
+ closeVerifyTimeout(coordinator, 1000, 1000, 1000);
+ }
}
@Test
public void testCloseMaxWaitCoordinatorUnavailableForCommit() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId);
- makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
- time.sleep(autoCommitIntervalMs);
- closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs);
+ try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId)) {
+ makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
+ time.sleep(autoCommitIntervalMs);
+ closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs);
+ }
}
@Test
public void testCloseNoResponseForCommit() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId);
- time.sleep(autoCommitIntervalMs);
- closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs);
+ try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId)) {
+ time.sleep(autoCommitIntervalMs);
+ closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs);
+ }
}
@Test
public void testCloseNoResponseForLeaveGroup() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.empty());
- closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs);
+ try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.empty())) {
+ closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs);
+ }
}
@Test
public void testCloseNoWait() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId);
- time.sleep(autoCommitIntervalMs);
- closeVerifyTimeout(coordinator, 0, 0, 0);
+ try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId)) {
+ time.sleep(autoCommitIntervalMs);
+ closeVerifyTimeout(coordinator, 0, 0, 0);
+ }
}
@Test
public void testHeartbeatThreadClose() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId);
- coordinator.ensureActiveGroup();
- time.sleep(heartbeatIntervalMs + 100);
- Thread.yield(); // Give heartbeat thread a chance to attempt heartbeat
- closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs);
- Thread[] threads = new Thread[Thread.activeCount()];
- int threadCount = Thread.enumerate(threads);
- for (int i = 0; i < threadCount; i++)
- assertFalse("Heartbeat thread active after close", threads[i].getName().contains(groupId));
+ try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId)) {
+ coordinator.ensureActiveGroup();
+ time.sleep(heartbeatIntervalMs + 100);
+ Thread.yield(); // Give heartbeat thread a chance to attempt heartbeat
+ closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs);
+ Thread[] threads = new Thread[Thread.activeCount()];
+ int threadCount = Thread.enumerate(threads);
+ for (int i = 0; i < threadCount; i++) {
+ assertFalse("Heartbeat thread active after close", threads[i].getName().contains(groupId));
+ }
+ }
}
@Test
public void testAutoCommitAfterCoordinatorBackToService() {
- ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
- new Metrics(),
- assignors,
- true);
-
- subscriptions.assignFromUser(Collections.singleton(t1p));
- subscriptions.seek(t1p, 100L);
-
- coordinator.markCoordinatorUnknown();
- assertTrue(coordinator.coordinatorUnknown());
- client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
- prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
-
- // async commit offset should find coordinator
- time.sleep(autoCommitIntervalMs); // sleep for a while to ensure auto commit does happen
- coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
- assertFalse(coordinator.coordinatorUnknown());
- assertEquals(100L, subscriptions.position(t1p).offset);
+ try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
+ subscriptions.assignFromUser(Collections.singleton(t1p));
+ subscriptions.seek(t1p, 100L);
+
+ coordinator.markCoordinatorUnknown();
+ assertTrue(coordinator.coordinatorUnknown());
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
+
+ // async commit offset should find coordinator
+ time.sleep(autoCommitIntervalMs); // sleep for a while to ensure auto commit does happen
+ coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
+ assertFalse(coordinator.coordinatorUnknown());
+ assertEquals(100L, subscriptions.position(t1p).offset);
+ }
}
@Test(expected = FencedInstanceIdException.class)
@@ -2266,6 +2343,34 @@ public class ConsumerCoordinatorTest {
null);
}
+ private Collection<TopicPartition> getRevoked(final List<TopicPartition> owned,
+ final List<TopicPartition> assigned) {
+ switch (protocol) {
+ case EAGER:
+ return toSet(owned);
+ case COOPERATIVE:
+ final List<TopicPartition> revoked = new ArrayList<>(owned);
+ revoked.removeAll(assigned);
+ return toSet(revoked);
+ default:
+ throw new IllegalStateException("This should not happen");
+ }
+ }
+
+ private Collection<TopicPartition> getAdded(final List<TopicPartition> owned,
+ final List<TopicPartition> assigned) {
+ switch (protocol) {
+ case EAGER:
+ return toSet(assigned);
+ case COOPERATIVE:
+ final List<TopicPartition> added = new ArrayList<>(assigned);
+ added.removeAll(owned);
+ return toSet(added);
+ default:
+ throw new IllegalStateException("This should not happen");
+ }
+ }
+
private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
return FindCoordinatorResponse.prepareResponse(error, node);
}
@@ -2294,7 +2399,7 @@ public class ConsumerCoordinatorTest {
.setProtocolName(partitionAssignor.name())
.setLeader(memberId)
.setMemberId(memberId)
- .setMembers(Collections.emptyList())
+ .setMembers(metadata)
);
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
index d89c5a7..9e601b0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.clients.consumer.internals;
-import org.apache.kafka.clients.consumer.internals.ConsumerProtocol.Errors;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
import org.apache.kafka.common.TopicPartition;
@@ -29,12 +28,9 @@ import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Optional;
-import java.util.Set;
import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSUMER_PROTOCOL_HEADER_SCHEMA;
import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.OWNED_PARTITIONS_KEY_NAME;
@@ -44,6 +40,7 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.TOPIC
import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.USER_DATA_KEY_NAME;
import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.VERSION_KEY_NAME;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.test.TestUtils.toSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -59,7 +56,7 @@ public class ConsumerProtocolTest {
public void serializeDeserializeMetadata() {
Subscription subscription = new Subscription(Arrays.asList("foo", "bar"));
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
- Subscription parsedSubscription = ConsumerProtocol.buildSubscription(buffer, Optional.empty());
+ Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
assertEquals(subscription.topics(), parsedSubscription.topics());
assertEquals(0, parsedSubscription.userData().limit());
assertFalse(parsedSubscription.groupInstanceId().isPresent());
@@ -70,8 +67,10 @@ public class ConsumerProtocolTest {
Subscription subscription = new Subscription(Arrays.asList("foo", "bar"));
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
- Subscription parsedSubscription = ConsumerProtocol.buildSubscription(buffer, groupInstanceId);
+ Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
+ parsedSubscription.setGroupInstanceId(groupInstanceId);
assertEquals(subscription.topics(), parsedSubscription.topics());
+ assertEquals(0, parsedSubscription.userData().limit());
assertEquals(groupInstanceId, parsedSubscription.groupInstanceId());
}
@@ -79,34 +78,29 @@ public class ConsumerProtocolTest {
public void serializeDeserializeNullSubscriptionUserData() {
Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), null);
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
- Subscription parsedSubscription = ConsumerProtocol.buildSubscription(buffer, Optional.empty());
+ Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
assertEquals(subscription.topics(), parsedSubscription.topics());
assertNull(parsedSubscription.userData());
- assertFalse(parsedSubscription.groupInstanceId().isPresent());
}
@Test
public void deserializeOldSubscriptionVersion() {
Subscription subscription = new Subscription((short) 0, Arrays.asList("foo", "bar"), null);
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
- Subscription parsedSubscription = ConsumerProtocol.buildSubscription(buffer, groupInstanceId);
+ Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
assertEquals(parsedSubscription.topics(), parsedSubscription.topics());
assertNull(parsedSubscription.userData());
assertTrue(parsedSubscription.ownedPartitions().isEmpty());
- assertEquals(groupInstanceId, parsedSubscription.groupInstanceId());
}
@Test
public void deserializeNewSubscriptionWithOldVersion() {
- Subscription subscription = new Subscription((short) 1,
- Arrays.asList("foo", "bar"),
- null, Collections.singletonList(tp2),
- Optional.empty());
+ Subscription subscription = new Subscription((short) 1, Arrays.asList("foo", "bar"), null, Collections.singletonList(tp2));
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
// ignore the version assuming it is the old byte code, as it will blindly deserialize as V0
Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
header.getShort(VERSION_KEY_NAME);
- Subscription parsedSubscription = ConsumerProtocol.buildSubscriptionV0(buffer, Optional.empty());
+ Subscription parsedSubscription = ConsumerProtocol.deserializeSubscriptionV0(buffer);
assertEquals(subscription.topics(), parsedSubscription.topics());
assertNull(parsedSubscription.userData());
assertTrue(parsedSubscription.ownedPartitions().isEmpty());
@@ -141,10 +135,11 @@ public class ConsumerProtocolTest {
buffer.flip();
- Subscription parsedSubscription = ConsumerProtocol.buildSubscription(buffer, groupInstanceId);
- assertEquals(Collections.singletonList("topic"), parsedSubscription.topics());
- assertEquals(Collections.singletonList(tp2), parsedSubscription.ownedPartitions());
- assertEquals(groupInstanceId, parsedSubscription.groupInstanceId());
+ Subscription subscription = ConsumerProtocol.deserializeSubscription(buffer);
+ subscription.setGroupInstanceId(groupInstanceId);
+ assertEquals(Collections.singletonList("topic"), subscription.topics());
+ assertEquals(Collections.singletonList(tp2), subscription.ownedPartitions());
+ assertEquals(groupInstanceId, subscription.groupInstanceId());
}
@Test
@@ -172,20 +167,20 @@ public class ConsumerProtocolTest {
Assignment parsedAssignment = ConsumerProtocol.deserializeAssignment(buffer);
assertEquals(toSet(partitions), toSet(parsedAssignment.partitions()));
assertNull(parsedAssignment.userData());
- assertEquals(Errors.NONE, parsedAssignment.error());
+ assertEquals(ConsumerProtocol.AssignmentError.NONE, parsedAssignment.error());
}
@Test
public void deserializeNewAssignmentWithOldVersion() {
List<TopicPartition> partitions = Collections.singletonList(tp1);
- ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new Assignment((short) 1, partitions, null, Errors.NEED_REJOIN));
+ ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new Assignment((short) 1, partitions, null, ConsumerProtocol.AssignmentError.NEED_REJOIN));
// ignore the version assuming it is the old byte code, as it will blindly deserialize as 0
Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
header.getShort(VERSION_KEY_NAME);
Assignment parsedAssignment = ConsumerProtocol.deserializeAssignmentV0(buffer);
assertEquals(toSet(partitions), toSet(parsedAssignment.partitions()));
assertNull(parsedAssignment.userData());
- assertEquals(Errors.NONE, parsedAssignment.error());
+ assertEquals(ConsumerProtocol.AssignmentError.NONE, parsedAssignment.error());
}
@Test
@@ -205,7 +200,7 @@ public class ConsumerProtocolTest {
.set(ConsumerProtocol.TOPIC_KEY_NAME, tp1.topic())
.set(ConsumerProtocol.PARTITIONS_KEY_NAME, new Object[]{tp1.partition()})});
assignmentV100.set(USER_DATA_KEY_NAME, ByteBuffer.wrap(new byte[0]));
- assignmentV100.set(ERROR_CODE.name, Errors.NEED_REJOIN.code());
+ assignmentV100.set(ERROR_CODE.name, ConsumerProtocol.AssignmentError.NEED_REJOIN.code());
assignmentV100.set("foo", "bar");
Struct headerV100 = new Struct(CONSUMER_PROTOCOL_HEADER_SCHEMA);
@@ -219,10 +214,6 @@ public class ConsumerProtocolTest {
PartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment(buffer);
assertEquals(toSet(Collections.singletonList(tp1)), toSet(assignment.partitions()));
- assertEquals(Errors.NEED_REJOIN, assignment.error());
- }
-
- private static <T> Set<T> toSet(Collection<T> collection) {
- return new HashSet<>(collection);
+ assertEquals(ConsumerProtocol.AssignmentError.NEED_REJOIN, assignment.error());
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java
index 609c773..ca7cb34 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java
@@ -23,8 +23,14 @@ import java.util.Map;
public class MockPartitionAssignor extends AbstractPartitionAssignor {
+ private final List<RebalanceProtocol> supportedProtocols;
+
private Map<String, List<TopicPartition>> result = null;
+ MockPartitionAssignor(final List<RebalanceProtocol> supportedProtocols) {
+ this.supportedProtocols = supportedProtocols;
+ }
+
@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
@@ -38,6 +44,11 @@ public class MockPartitionAssignor extends AbstractPartitionAssignor {
return "consumer-mock-assignor";
}
+ @Override
+ public List<RebalanceProtocol> supportedProtocols() {
+ return supportedProtocols;
+ }
+
public void clear() {
this.result = null;
}
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 250ebc0..4d35f06 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -41,6 +41,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Base64;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -434,6 +435,10 @@ public class TestUtils {
return list;
}
+ public static <T> Set<T> toSet(Collection<T> collection) {
+ return new HashSet<>(collection);
+ }
+
public static ByteBuffer toBuffer(Struct struct) {
ByteBuffer buffer = ByteBuffer.allocate(struct.sizeOf());
struct.writeTo(buffer);