You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/12/04 06:17:55 UTC
kafka git commit: KAFKA-2931: add system test for consumer rolling
upgrades
Repository: kafka
Updated Branches:
refs/heads/trunk 80df43500 -> cd54fc881
KAFKA-2931: add system test for consumer rolling upgrades
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma, Guozhang Wang
Closes #619 from hachikuji/KAFKA-2931
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cd54fc88
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cd54fc88
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cd54fc88
Branch: refs/heads/trunk
Commit: cd54fc8816964f5a56469075c75c567e777b9656
Parents: 80df435
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Dec 3 21:17:51 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Dec 3 21:17:51 2015 -0800
----------------------------------------------------------------------
.../consumer/internals/AbstractCoordinator.java | 14 +--
.../consumer/internals/ConsumerCoordinator.java | 32 ++++---
.../kafka/common/requests/JoinGroupRequest.java | 14 +--
.../internals/ConsumerCoordinatorTest.java | 62 +++++++++----
.../common/requests/RequestResponseTest.java | 4 +-
.../runtime/distributed/WorkerCoordinator.java | 10 +--
.../runtime/distributed/WorkerGroupMember.java | 2 -
.../distributed/WorkerCoordinatorTest.java | 10 ++-
.../kafka/api/AuthorizerIntegrationTest.scala | 2 +-
tests/kafkatest/services/verifiable_consumer.py | 7 +-
.../kafkatest/tests/consumer_rolling_upgrade.py | 94 ++++++++++++++++++++
.../apache/kafka/tools/VerifiableConsumer.java | 11 +++
12 files changed, 198 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 322de5c..33886ed 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.requests.GroupCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
@@ -48,9 +49,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -132,14 +131,14 @@ public abstract class AbstractCoordinator implements Closeable {
/**
* Get the current list of protocols and their associated metadata supported
- * by the local member. The order of the protocols in the map indicates the preference
+ * by the local member. The order of the protocols in the list indicates the preference
* of the protocol (the first entry is the most preferred). The coordinator takes this
* preference into account when selecting the generation protocol (generally more preferred
* protocols will be selected as long as all members support them and there is no disagreement
* on the preference).
* @return Non-empty map of supported protocols and metadata
*/
- protected abstract LinkedHashMap<String, ByteBuffer> metadata();
+ protected abstract List<ProtocolMetadata> metadata();
/**
* Invoked prior to each group join or rejoin. This is typically used to perform any
@@ -308,17 +307,12 @@ public abstract class AbstractCoordinator implements Closeable {
// send a join group request to the coordinator
log.debug("(Re-)joining group {}", groupId);
-
- List<JoinGroupRequest.GroupProtocol> protocols = new ArrayList<>();
- for (Map.Entry<String, ByteBuffer> metadataEntry : metadata().entrySet())
- protocols.add(new JoinGroupRequest.GroupProtocol(metadataEntry.getKey(), metadataEntry.getValue()));
-
JoinGroupRequest request = new JoinGroupRequest(
groupId,
this.sessionTimeoutMs,
this.memberId,
protocolType(),
- protocols);
+ metadata());
// create the request for the coordinator
log.debug("Issuing request ({}: {}) to coordinator {}", ApiKeys.JOIN_GROUP, request, this.coordinator.id());
http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 8453c7b..4ac05a3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -37,6 +37,7 @@ import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
@@ -49,7 +50,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -61,7 +61,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
private static final Logger log = LoggerFactory.getLogger(ConsumerCoordinator.class);
- private final Map<String, PartitionAssignor> protocolMap;
+ private final List<PartitionAssignor> assignors;
private final org.apache.kafka.clients.Metadata metadata;
private final MetadataSnapshot metadataSnapshot;
private final ConsumerCoordinatorMetrics sensors;
@@ -104,10 +104,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
this.subscriptions = subscriptions;
this.defaultOffsetCommitCallback = defaultOffsetCommitCallback;
this.autoCommitEnabled = autoCommitEnabled;
-
- this.protocolMap = new HashMap<>();
- for (PartitionAssignor assignor : assignors)
- this.protocolMap.put(assignor.name(), assignor);
+ this.assignors = assignors;
addMetadataListener();
@@ -121,13 +118,14 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
@Override
- public LinkedHashMap<String, ByteBuffer> metadata() {
- LinkedHashMap<String, ByteBuffer> metadata = new LinkedHashMap<>();
- for (PartitionAssignor assignor : protocolMap.values()) {
+ public List<ProtocolMetadata> metadata() {
+ List<ProtocolMetadata> metadataList = new ArrayList<>();
+ for (PartitionAssignor assignor : assignors) {
Subscription subscription = assignor.subscription(subscriptions.subscription());
- metadata.put(assignor.name(), ConsumerProtocol.serializeSubscription(subscription));
+ ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription);
+ metadataList.add(new ProtocolMetadata(assignor.name(), metadata));
}
- return metadata;
+ return metadataList;
}
private void addMetadataListener() {
@@ -156,12 +154,20 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
});
}
+ private PartitionAssignor lookupAssignor(String name) {
+ for (PartitionAssignor assignor : this.assignors) {
+ if (assignor.name().equals(name))
+ return assignor;
+ }
+ return null;
+ }
+
@Override
protected void onJoinComplete(int generation,
String memberId,
String assignmentStrategy,
ByteBuffer assignmentBuffer) {
- PartitionAssignor assignor = protocolMap.get(assignmentStrategy);
+ PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
if (assignor == null)
throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
@@ -198,7 +204,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
protected Map<String, ByteBuffer> performAssignment(String leaderId,
String assignmentStrategy,
Map<String, ByteBuffer> allSubscriptions) {
- PartitionAssignor assignor = protocolMap.get(protocol);
+ PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
if (assignor == null)
throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 91a698c..cae07bc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -40,13 +40,13 @@ public class JoinGroupRequest extends AbstractRequest {
private final int sessionTimeout;
private final String memberId;
private final String protocolType;
- private final List<GroupProtocol> groupProtocols;
+ private final List<ProtocolMetadata> groupProtocols;
- public static class GroupProtocol {
+ public static class ProtocolMetadata {
private final String name;
private final ByteBuffer metadata;
- public GroupProtocol(String name, ByteBuffer metadata) {
+ public ProtocolMetadata(String name, ByteBuffer metadata) {
this.name = name;
this.metadata = metadata;
}
@@ -64,7 +64,7 @@ public class JoinGroupRequest extends AbstractRequest {
int sessionTimeout,
String memberId,
String protocolType,
- List<GroupProtocol> groupProtocols) {
+ List<ProtocolMetadata> groupProtocols) {
super(new Struct(CURRENT_SCHEMA));
struct.set(GROUP_ID_KEY_NAME, groupId);
struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout);
@@ -72,7 +72,7 @@ public class JoinGroupRequest extends AbstractRequest {
struct.set(PROTOCOL_TYPE_KEY_NAME, protocolType);
List<Struct> groupProtocolsList = new ArrayList<>();
- for (GroupProtocol protocol : groupProtocols) {
+ for (ProtocolMetadata protocol : groupProtocols) {
Struct protocolStruct = struct.instance(GROUP_PROTOCOLS_KEY_NAME);
protocolStruct.set(PROTOCOL_NAME_KEY_NAME, protocol.name);
protocolStruct.set(PROTOCOL_METADATA_KEY_NAME, protocol.metadata);
@@ -99,7 +99,7 @@ public class JoinGroupRequest extends AbstractRequest {
Struct groupProtocolStruct = (Struct) groupProtocolObj;
String name = groupProtocolStruct.getString(PROTOCOL_NAME_KEY_NAME);
ByteBuffer metadata = groupProtocolStruct.getBytes(PROTOCOL_METADATA_KEY_NAME);
- groupProtocols.add(new GroupProtocol(name, metadata));
+ groupProtocols.add(new ProtocolMetadata(name, metadata));
}
}
@@ -132,7 +132,7 @@ public class JoinGroupRequest extends AbstractRequest {
return memberId;
}
- public List<GroupProtocol> groupProtocols() {
+ public List<ProtocolMetadata> groupProtocols() {
return groupProtocols;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 500aaed..9f9682a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -24,6 +24,8 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
@@ -38,6 +40,7 @@ import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.GroupCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
@@ -75,7 +78,6 @@ public class ConsumerCoordinatorTest {
private int sessionTimeoutMs = 10;
private int heartbeatIntervalMs = 2;
private long retryBackoffMs = 100;
- private long requestTimeoutMs = 5000;
private boolean autoCommitEnabled = false;
private long autoCommitIntervalMs = 5000;
private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor();
@@ -108,22 +110,7 @@ public class ConsumerCoordinatorTest {
client.setNode(node);
- this.coordinator = new ConsumerCoordinator(
- consumerClient,
- groupId,
- sessionTimeoutMs,
- heartbeatIntervalMs,
- assignors,
- metadata,
- subscriptions,
- metrics,
- "consumer" + groupId,
- metricTags,
- time,
- retryBackoffMs,
- defaultOffsetCommitCallback,
- autoCommitEnabled,
- autoCommitIntervalMs);
+ this.coordinator = buildCoordinator(metrics, assignors);
}
@After
@@ -892,6 +879,47 @@ public class ConsumerCoordinatorTest {
assertEquals(null, subscriptions.committed(tp));
}
+ @Test
+ public void testProtocolMetadataOrder() {
+ RoundRobinAssignor roundRobin = new RoundRobinAssignor();
+ RangeAssignor range = new RangeAssignor();
+
+ try (Metrics metrics = new Metrics(time)) {
+ ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(roundRobin, range));
+ List<ProtocolMetadata> metadata = coordinator.metadata();
+ assertEquals(2, metadata.size());
+ assertEquals(roundRobin.name(), metadata.get(0).name());
+ assertEquals(range.name(), metadata.get(1).name());
+ }
+
+ try (Metrics metrics = new Metrics(time)) {
+ ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(range, roundRobin));
+ List<ProtocolMetadata> metadata = coordinator.metadata();
+ assertEquals(2, metadata.size());
+ assertEquals(range.name(), metadata.get(0).name());
+ assertEquals(roundRobin.name(), metadata.get(1).name());
+ }
+ }
+
+ private ConsumerCoordinator buildCoordinator(Metrics metrics, List<PartitionAssignor> assignors) {
+ return new ConsumerCoordinator(
+ consumerClient,
+ groupId,
+ sessionTimeoutMs,
+ heartbeatIntervalMs,
+ assignors,
+ metadata,
+ subscriptions,
+ metrics,
+ "consumer" + groupId,
+ metricTags,
+ time,
+ retryBackoffMs,
+ defaultOffsetCommitCallback,
+ autoCommitEnabled,
+ autoCommitIntervalMs);
+ }
+
private Struct consumerMetadataResponse(Node node, short error) {
GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node);
return response.toStruct();
http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 5ee11d2..ab18817 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -188,8 +188,8 @@ public class RequestResponseTest {
private AbstractRequest createJoinGroupRequest() {
ByteBuffer metadata = ByteBuffer.wrap(new byte[] {});
- List<JoinGroupRequest.GroupProtocol> protocols = new ArrayList<>();
- protocols.add(new JoinGroupRequest.GroupProtocol("consumer-range", metadata));
+ List<JoinGroupRequest.ProtocolMetadata> protocols = new ArrayList<>();
+ protocols.add(new JoinGroupRequest.ProtocolMetadata("consumer-range", metadata));
return new JoinGroupRequest("group1", 30000, "consumer1", "consumer", protocols);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 082e235..6275636 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
import org.apache.kafka.common.utils.CircularIterator;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.storage.KafkaConfigStorage;
@@ -35,7 +36,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -69,7 +69,6 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
String metricGrpPrefix,
Map<String, String> metricTags,
Time time,
- long requestTimeoutMs,
long retryBackoffMs,
String restUrl,
KafkaConfigStorage configStorage,
@@ -101,12 +100,11 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
}
@Override
- public LinkedHashMap<String, ByteBuffer> metadata() {
- LinkedHashMap<String, ByteBuffer> metadata = new LinkedHashMap<>();
+ public List<ProtocolMetadata> metadata() {
configSnapshot = configStorage.snapshot();
ConnectProtocol.WorkerState workerState = new ConnectProtocol.WorkerState(restUrl, configSnapshot.offset());
- metadata.put(DEFAULT_SUBPROTOCOL, ConnectProtocol.serializeMetadata(workerState));
- return metadata;
+ ByteBuffer metadata = ConnectProtocol.serializeMetadata(workerState);
+ return Collections.singletonList(new ProtocolMetadata(DEFAULT_SUBPROTOCOL, metadata));
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index c72e3ef..a36608a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -21,7 +21,6 @@ import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
@@ -105,7 +104,6 @@ public class WorkerGroupMember {
metricGrpPrefix,
metricsTags,
this.time,
- config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
retryBackoffMs,
restUrl,
configStorage,
http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 38c1aeb..f47a9f9 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.GroupCoordinatorResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
@@ -68,7 +69,6 @@ public class WorkerCoordinatorTest {
private int sessionTimeoutMs = 10;
private int heartbeatIntervalMs = 2;
private long retryBackoffMs = 100;
- private long requestTimeoutMs = 5000;
private MockTime time;
private MockClient client;
private Cluster cluster = TestUtils.singletonCluster("topic", 1);
@@ -105,7 +105,6 @@ public class WorkerCoordinatorTest {
"consumer" + groupId,
metricTags,
time,
- requestTimeoutMs,
retryBackoffMs,
LEADER_URL,
configStorage,
@@ -149,9 +148,12 @@ public class WorkerCoordinatorTest {
PowerMock.replayAll();
- LinkedHashMap<String, ByteBuffer> serialized = coordinator.metadata();
+ List<ProtocolMetadata> serialized = coordinator.metadata();
assertEquals(1, serialized.size());
- ConnectProtocol.WorkerState state = ConnectProtocol.deserializeMetadata(serialized.get(WorkerCoordinator.DEFAULT_SUBPROTOCOL));
+
+ ProtocolMetadata defaultMetadata = serialized.get(0);
+ assertEquals(WorkerCoordinator.DEFAULT_SUBPROTOCOL, defaultMetadata.name());
+ ConnectProtocol.WorkerState state = ConnectProtocol.deserializeMetadata(defaultMetadata.metadata());
assertEquals(1, state.offset());
PowerMock.verifyAll();
http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 26ab885..4aa8438 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -191,7 +191,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
private def createJoinGroupRequest = {
new JoinGroupRequest(group, 30000, "", "consumer",
- List( new JoinGroupRequest.GroupProtocol("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava)
+ List( new JoinGroupRequest.ProtocolMetadata("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava)
}
private def createSyncGroupRequest = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/tests/kafkatest/services/verifiable_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py
index 338646b..955dd5d 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -134,6 +134,7 @@ class VerifiableConsumer(BackgroundThreadService):
def __init__(self, context, num_nodes, kafka, topic, group_id,
max_messages=-1, session_timeout=30000, enable_autocommit=False,
+ assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor",
version=TRUNK):
super(VerifiableConsumer, self).__init__(context, num_nodes)
self.log_level = "TRACE"
@@ -144,6 +145,7 @@ class VerifiableConsumer(BackgroundThreadService):
self.max_messages = max_messages
self.session_timeout = session_timeout
self.enable_autocommit = enable_autocommit
+ self.assignment_strategy = assignment_strategy
self.prop_file = ""
self.security_config = kafka.security_config.client_config(self.prop_file)
self.prop_file += str(self.security_config)
@@ -223,9 +225,10 @@ class VerifiableConsumer(BackgroundThreadService):
cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableConsumer.LOG4J_CONFIG
cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableConsumer" \
- " --group-id %s --topic %s --broker-list %s --session-timeout %s %s" % \
+ " --group-id %s --topic %s --broker-list %s --session-timeout %s --assignment-strategy %s %s" % \
(self.group_id, self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol), self.session_timeout,
- "--enable-autocommit" if self.enable_autocommit else "")
+ self.assignment_strategy, "--enable-autocommit" if self.enable_autocommit else "")
+
if self.max_messages > 0:
cmd += " --max-messages %s" % str(self.max_messages)
http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/tests/kafkatest/tests/consumer_rolling_upgrade.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/consumer_rolling_upgrade.py b/tests/kafkatest/tests/consumer_rolling_upgrade.py
new file mode 100644
index 0000000..f00937c
--- /dev/null
+++ b/tests/kafkatest/tests/consumer_rolling_upgrade.py
@@ -0,0 +1,94 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.utils.util import wait_until
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.verifiable_consumer import VerifiableConsumer
+from kafkatest.services.kafka import TopicPartition
+
+class ConsumerRollingUpgradeTest(KafkaTest):
+ TOPIC = "test_topic"
+ NUM_PARTITIONS = 4
+ GROUP_ID = "test_group_id"
+ RANGE = "org.apache.kafka.clients.consumer.RangeAssignor"
+ ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor"
+
+ def __init__(self, test_context):
+ super(ConsumerRollingUpgradeTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
+ self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 }
+ })
+ self.num_consumers = 2
+ self.session_timeout = 10000
+
+ def min_cluster_size(self):
+ return super(ConsumerRollingUpgradeTest, self).min_cluster_size() + self.num_consumers
+
+ def _await_all_members(self, consumer):
+ # Wait until all members have joined the group
+ wait_until(lambda: len(consumer.joined_nodes()) == self.num_consumers, timeout_sec=self.session_timeout+5,
+ err_msg="Consumers failed to join in a reasonable amount of time")
+
+ def _verify_range_assignment(self, consumer):
+ # range assignment should give us two partition sets: (0, 1) and (2, 3)
+ assignment = set([frozenset(partitions) for partitions in consumer.current_assignment().values()])
+ assert assignment == set([
+ frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 1)]),
+ frozenset([TopicPartition(self.TOPIC, 2), TopicPartition(self.TOPIC, 3)])])
+
+ def _verify_roundrobin_assignment(self, consumer):
+ assignment = set([frozenset(x) for x in consumer.current_assignment().values()])
+ assert assignment == set([
+ frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 2)]),
+ frozenset([TopicPartition(self.TOPIC, 1), TopicPartition(self.TOPIC, 3)])])
+
+ def rolling_update_test(self):
+ """
+ Verify rolling updates of partition assignment strategies works correctly. In this
+ test, we use a rolling restart to change the group's assignment strategy from "range"
+ to "roundrobin." We verify after every restart that all members are still in the group
+ and that the correct assignment strategy was used.
+ """
+
+ # initialize the consumer using range assignment
+ consumer = VerifiableConsumer(self.test_context, self.num_consumers, self.kafka,
+ self.TOPIC, self.GROUP_ID, session_timeout=self.session_timeout,
+ assignment_strategy=self.RANGE)
+ consumer.start()
+ self._await_all_members(consumer)
+ self._verify_range_assignment(consumer)
+
+ # change consumer configuration to prefer round-robin assignment, but still support range assignment
+ consumer.assignment_strategy = self.ROUND_ROBIN + "," + self.RANGE
+
+ # restart one of the nodes and verify that we are still using range assignment
+ consumer.stop_node(consumer.nodes[0])
+ consumer.start_node(consumer.nodes[0])
+ self._await_all_members(consumer)
+ self._verify_range_assignment(consumer)
+
+ # now restart the other node and verify that we have switched to round-robin
+ consumer.stop_node(consumer.nodes[1])
+ consumer.start_node(consumer.nodes[1])
+ self._await_all_members(consumer)
+ self._verify_roundrobin_assignment(consumer)
+
+ # if we want, we can now drop support for range assignment
+ consumer.assignment_strategy = self.ROUND_ROBIN
+ for node in consumer.nodes:
+ consumer.stop_node(node)
+ consumer.start_node(node)
+ self._await_all_members(consumer)
+ self._verify_roundrobin_assignment(consumer)
http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index 93c0bc6..25b87bd 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -35,6 +35,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -538,6 +540,14 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
.dest("resetPolicy")
.help("Set reset policy (must be either 'earliest', 'latest', or 'none'");
+ parser.addArgument("--assignment-strategy")
+ .action(store())
+ .required(false)
+ .setDefault(RangeAssignor.class.getName())
+ .type(String.class)
+ .dest("assignmentStrategy")
+ .help("Set assignment strategy (e.g. " + RoundRobinAssignor.class.getName() + ")");
+
parser.addArgument("--consumer.config")
.action(store())
.required(false)
@@ -571,6 +581,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, useAutoCommit);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, res.getString("resetPolicy"));
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(res.getInt("sessionTimeout")));
+ consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, res.getString("assignmentStrategy"));
StringDeserializer deserializer = new StringDeserializer();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps, deserializer, deserializer);