You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/12/04 06:17:55 UTC

kafka git commit: KAFKA-2931: add system test for consumer rolling upgrades

Repository: kafka
Updated Branches:
  refs/heads/trunk 80df43500 -> cd54fc881


KAFKA-2931: add system test for consumer rolling upgrades

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma, Guozhang Wang

Closes #619 from hachikuji/KAFKA-2931


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cd54fc88
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cd54fc88
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cd54fc88

Branch: refs/heads/trunk
Commit: cd54fc8816964f5a56469075c75c567e777b9656
Parents: 80df435
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Dec 3 21:17:51 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Dec 3 21:17:51 2015 -0800

----------------------------------------------------------------------
 .../consumer/internals/AbstractCoordinator.java | 14 +--
 .../consumer/internals/ConsumerCoordinator.java | 32 ++++---
 .../kafka/common/requests/JoinGroupRequest.java | 14 +--
 .../internals/ConsumerCoordinatorTest.java      | 62 +++++++++----
 .../common/requests/RequestResponseTest.java    |  4 +-
 .../runtime/distributed/WorkerCoordinator.java  | 10 +--
 .../runtime/distributed/WorkerGroupMember.java  |  2 -
 .../distributed/WorkerCoordinatorTest.java      | 10 ++-
 .../kafka/api/AuthorizerIntegrationTest.scala   |  2 +-
 tests/kafkatest/services/verifiable_consumer.py |  7 +-
 .../kafkatest/tests/consumer_rolling_upgrade.py | 94 ++++++++++++++++++++
 .../apache/kafka/tools/VerifiableConsumer.java  | 11 +++
 12 files changed, 198 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 322de5c..33886ed 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.requests.GroupCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatRequest;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
 import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.LeaveGroupRequest;
 import org.apache.kafka.common.requests.LeaveGroupResponse;
@@ -48,9 +49,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -132,14 +131,14 @@ public abstract class AbstractCoordinator implements Closeable {
 
     /**
      * Get the current list of protocols and their associated metadata supported
-     * by the local member. The order of the protocols in the map indicates the preference
+     * by the local member. The order of the protocols in the list indicates the preference
      * of the protocol (the first entry is the most preferred). The coordinator takes this
      * preference into account when selecting the generation protocol (generally more preferred
      * protocols will be selected as long as all members support them and there is no disagreement
      * on the preference).
      * @return Non-empty map of supported protocols and metadata
      */
-    protected abstract LinkedHashMap<String, ByteBuffer> metadata();
+    protected abstract List<ProtocolMetadata> metadata();
 
     /**
      * Invoked prior to each group join or rejoin. This is typically used to perform any
@@ -308,17 +307,12 @@ public abstract class AbstractCoordinator implements Closeable {
 
         // send a join group request to the coordinator
         log.debug("(Re-)joining group {}", groupId);
-
-        List<JoinGroupRequest.GroupProtocol> protocols = new ArrayList<>();
-        for (Map.Entry<String, ByteBuffer> metadataEntry : metadata().entrySet())
-            protocols.add(new JoinGroupRequest.GroupProtocol(metadataEntry.getKey(), metadataEntry.getValue()));
-
         JoinGroupRequest request = new JoinGroupRequest(
                 groupId,
                 this.sessionTimeoutMs,
                 this.memberId,
                 protocolType(),
-                protocols);
+                metadata());
 
         // create the request for the coordinator
         log.debug("Issuing request ({}: {}) to coordinator {}", ApiKeys.JOIN_GROUP, request, this.coordinator.id());

http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 8453c7b..4ac05a3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -37,6 +37,7 @@ import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.OffsetFetchRequest;
@@ -49,7 +50,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -61,7 +61,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
     private static final Logger log = LoggerFactory.getLogger(ConsumerCoordinator.class);
 
-    private final Map<String, PartitionAssignor> protocolMap;
+    private final List<PartitionAssignor> assignors;
     private final org.apache.kafka.clients.Metadata metadata;
     private final MetadataSnapshot metadataSnapshot;
     private final ConsumerCoordinatorMetrics sensors;
@@ -104,10 +104,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         this.subscriptions = subscriptions;
         this.defaultOffsetCommitCallback = defaultOffsetCommitCallback;
         this.autoCommitEnabled = autoCommitEnabled;
-
-        this.protocolMap = new HashMap<>();
-        for (PartitionAssignor assignor : assignors)
-            this.protocolMap.put(assignor.name(), assignor);
+        this.assignors = assignors;
 
         addMetadataListener();
 
@@ -121,13 +118,14 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     }
 
     @Override
-    public LinkedHashMap<String, ByteBuffer> metadata() {
-        LinkedHashMap<String, ByteBuffer> metadata = new LinkedHashMap<>();
-        for (PartitionAssignor assignor : protocolMap.values()) {
+    public List<ProtocolMetadata> metadata() {
+        List<ProtocolMetadata> metadataList = new ArrayList<>();
+        for (PartitionAssignor assignor : assignors) {
             Subscription subscription = assignor.subscription(subscriptions.subscription());
-            metadata.put(assignor.name(), ConsumerProtocol.serializeSubscription(subscription));
+            ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription);
+            metadataList.add(new ProtocolMetadata(assignor.name(), metadata));
         }
-        return metadata;
+        return metadataList;
     }
 
     private void addMetadataListener() {
@@ -156,12 +154,20 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         });
     }
 
+    private PartitionAssignor lookupAssignor(String name) {
+        for (PartitionAssignor assignor : this.assignors) {
+            if (assignor.name().equals(name))
+                return assignor;
+        }
+        return null;
+    }
+
     @Override
     protected void onJoinComplete(int generation,
                                   String memberId,
                                   String assignmentStrategy,
                                   ByteBuffer assignmentBuffer) {
-        PartitionAssignor assignor = protocolMap.get(assignmentStrategy);
+        PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
         if (assignor == null)
             throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
 
@@ -198,7 +204,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     protected Map<String, ByteBuffer> performAssignment(String leaderId,
                                                         String assignmentStrategy,
                                                         Map<String, ByteBuffer> allSubscriptions) {
-        PartitionAssignor assignor = protocolMap.get(protocol);
+        PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
         if (assignor == null)
             throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 91a698c..cae07bc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -40,13 +40,13 @@ public class JoinGroupRequest extends AbstractRequest {
     private final int sessionTimeout;
     private final String memberId;
     private final String protocolType;
-    private final List<GroupProtocol> groupProtocols;
+    private final List<ProtocolMetadata> groupProtocols;
 
-    public static class GroupProtocol {
+    public static class ProtocolMetadata {
         private final String name;
         private final ByteBuffer metadata;
 
-        public GroupProtocol(String name, ByteBuffer metadata) {
+        public ProtocolMetadata(String name, ByteBuffer metadata) {
             this.name = name;
             this.metadata = metadata;
         }
@@ -64,7 +64,7 @@ public class JoinGroupRequest extends AbstractRequest {
                             int sessionTimeout,
                             String memberId,
                             String protocolType,
-                            List<GroupProtocol> groupProtocols) {
+                            List<ProtocolMetadata> groupProtocols) {
         super(new Struct(CURRENT_SCHEMA));
         struct.set(GROUP_ID_KEY_NAME, groupId);
         struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout);
@@ -72,7 +72,7 @@ public class JoinGroupRequest extends AbstractRequest {
         struct.set(PROTOCOL_TYPE_KEY_NAME, protocolType);
 
         List<Struct> groupProtocolsList = new ArrayList<>();
-        for (GroupProtocol protocol : groupProtocols) {
+        for (ProtocolMetadata protocol : groupProtocols) {
             Struct protocolStruct = struct.instance(GROUP_PROTOCOLS_KEY_NAME);
             protocolStruct.set(PROTOCOL_NAME_KEY_NAME, protocol.name);
             protocolStruct.set(PROTOCOL_METADATA_KEY_NAME, protocol.metadata);
@@ -99,7 +99,7 @@ public class JoinGroupRequest extends AbstractRequest {
             Struct groupProtocolStruct = (Struct) groupProtocolObj;
             String name = groupProtocolStruct.getString(PROTOCOL_NAME_KEY_NAME);
             ByteBuffer metadata = groupProtocolStruct.getBytes(PROTOCOL_METADATA_KEY_NAME);
-            groupProtocols.add(new GroupProtocol(name, metadata));
+            groupProtocols.add(new ProtocolMetadata(name, metadata));
         }
     }
 
@@ -132,7 +132,7 @@ public class JoinGroupRequest extends AbstractRequest {
         return memberId;
     }
 
-    public List<GroupProtocol> groupProtocols() {
+    public List<ProtocolMetadata> groupProtocols() {
         return groupProtocols;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 500aaed..9f9682a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -24,6 +24,8 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.clients.consumer.RoundRobinAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
@@ -38,6 +40,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.requests.GroupCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
 import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.LeaveGroupRequest;
 import org.apache.kafka.common.requests.LeaveGroupResponse;
@@ -75,7 +78,6 @@ public class ConsumerCoordinatorTest {
     private int sessionTimeoutMs = 10;
     private int heartbeatIntervalMs = 2;
     private long retryBackoffMs = 100;
-    private long requestTimeoutMs = 5000;
     private boolean autoCommitEnabled = false;
     private long autoCommitIntervalMs = 5000;
     private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor();
@@ -108,22 +110,7 @@ public class ConsumerCoordinatorTest {
 
         client.setNode(node);
 
-        this.coordinator = new ConsumerCoordinator(
-                consumerClient,
-                groupId,
-                sessionTimeoutMs,
-                heartbeatIntervalMs,
-                assignors,
-                metadata,
-                subscriptions,
-                metrics,
-                "consumer" + groupId,
-                metricTags,
-                time,
-                retryBackoffMs,
-                defaultOffsetCommitCallback,
-                autoCommitEnabled,
-                autoCommitIntervalMs);
+        this.coordinator = buildCoordinator(metrics, assignors);
     }
 
     @After
@@ -892,6 +879,47 @@ public class ConsumerCoordinatorTest {
         assertEquals(null, subscriptions.committed(tp));
     }
 
+    @Test
+    public void testProtocolMetadataOrder() {
+        RoundRobinAssignor roundRobin = new RoundRobinAssignor();
+        RangeAssignor range = new RangeAssignor();
+
+        try (Metrics metrics = new Metrics(time)) {
+            ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(roundRobin, range));
+            List<ProtocolMetadata> metadata = coordinator.metadata();
+            assertEquals(2, metadata.size());
+            assertEquals(roundRobin.name(), metadata.get(0).name());
+            assertEquals(range.name(), metadata.get(1).name());
+        }
+
+        try (Metrics metrics = new Metrics(time)) {
+            ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(range, roundRobin));
+            List<ProtocolMetadata> metadata = coordinator.metadata();
+            assertEquals(2, metadata.size());
+            assertEquals(range.name(), metadata.get(0).name());
+            assertEquals(roundRobin.name(), metadata.get(1).name());
+        }
+    }
+
+    private ConsumerCoordinator buildCoordinator(Metrics metrics, List<PartitionAssignor> assignors) {
+        return new ConsumerCoordinator(
+                consumerClient,
+                groupId,
+                sessionTimeoutMs,
+                heartbeatIntervalMs,
+                assignors,
+                metadata,
+                subscriptions,
+                metrics,
+                "consumer" + groupId,
+                metricTags,
+                time,
+                retryBackoffMs,
+                defaultOffsetCommitCallback,
+                autoCommitEnabled,
+                autoCommitIntervalMs);
+    }
+
     private Struct consumerMetadataResponse(Node node, short error) {
         GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node);
         return response.toStruct();

http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 5ee11d2..ab18817 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -188,8 +188,8 @@ public class RequestResponseTest {
 
     private AbstractRequest createJoinGroupRequest() {
         ByteBuffer metadata = ByteBuffer.wrap(new byte[] {});
-        List<JoinGroupRequest.GroupProtocol> protocols = new ArrayList<>();
-        protocols.add(new JoinGroupRequest.GroupProtocol("consumer-range", metadata));
+        List<JoinGroupRequest.ProtocolMetadata> protocols = new ArrayList<>();
+        protocols.add(new JoinGroupRequest.ProtocolMetadata("consumer-range", metadata));
         return new JoinGroupRequest("group1", 30000, "consumer1", "consumer", protocols);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 082e235..6275636 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
 import org.apache.kafka.common.utils.CircularIterator;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.storage.KafkaConfigStorage;
@@ -35,7 +36,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -69,7 +69,6 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
                              String metricGrpPrefix,
                              Map<String, String> metricTags,
                              Time time,
-                             long requestTimeoutMs,
                              long retryBackoffMs,
                              String restUrl,
                              KafkaConfigStorage configStorage,
@@ -101,12 +100,11 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
     }
 
     @Override
-    public LinkedHashMap<String, ByteBuffer> metadata() {
-        LinkedHashMap<String, ByteBuffer> metadata = new LinkedHashMap<>();
+    public List<ProtocolMetadata> metadata() {
         configSnapshot = configStorage.snapshot();
         ConnectProtocol.WorkerState workerState = new ConnectProtocol.WorkerState(restUrl, configSnapshot.offset());
-        metadata.put(DEFAULT_SUBPROTOCOL, ConnectProtocol.serializeMetadata(workerState));
-        return metadata;
+        ByteBuffer metadata = ConnectProtocol.serializeMetadata(workerState);
+        return Collections.singletonList(new ProtocolMetadata(DEFAULT_SUBPROTOCOL, metadata));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index c72e3ef..a36608a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -21,7 +21,6 @@ import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
@@ -105,7 +104,6 @@ public class WorkerGroupMember {
                     metricGrpPrefix,
                     metricsTags,
                     this.time,
-                    config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
                     retryBackoffMs,
                     restUrl,
                     configStorage,

http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 38c1aeb..f47a9f9 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.requests.GroupCoordinatorResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
 import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.SyncGroupRequest;
 import org.apache.kafka.common.requests.SyncGroupResponse;
@@ -68,7 +69,6 @@ public class WorkerCoordinatorTest {
     private int sessionTimeoutMs = 10;
     private int heartbeatIntervalMs = 2;
     private long retryBackoffMs = 100;
-    private long requestTimeoutMs = 5000;
     private MockTime time;
     private MockClient client;
     private Cluster cluster = TestUtils.singletonCluster("topic", 1);
@@ -105,7 +105,6 @@ public class WorkerCoordinatorTest {
                 "consumer" + groupId,
                 metricTags,
                 time,
-                requestTimeoutMs,
                 retryBackoffMs,
                 LEADER_URL,
                 configStorage,
@@ -149,9 +148,12 @@ public class WorkerCoordinatorTest {
 
         PowerMock.replayAll();
 
-        LinkedHashMap<String, ByteBuffer> serialized = coordinator.metadata();
+        List<ProtocolMetadata> serialized = coordinator.metadata();
         assertEquals(1, serialized.size());
-        ConnectProtocol.WorkerState state = ConnectProtocol.deserializeMetadata(serialized.get(WorkerCoordinator.DEFAULT_SUBPROTOCOL));
+
+        ProtocolMetadata defaultMetadata = serialized.get(0);
+        assertEquals(WorkerCoordinator.DEFAULT_SUBPROTOCOL, defaultMetadata.name());
+        ConnectProtocol.WorkerState state = ConnectProtocol.deserializeMetadata(defaultMetadata.metadata());
         assertEquals(1, state.offset());
 
         PowerMock.verifyAll();

http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 26ab885..4aa8438 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -191,7 +191,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
 
   private def createJoinGroupRequest = {
     new JoinGroupRequest(group, 30000, "", "consumer",
-      List( new JoinGroupRequest.GroupProtocol("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava)
+      List( new JoinGroupRequest.ProtocolMetadata("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava)
   }
 
   private def createSyncGroupRequest = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/tests/kafkatest/services/verifiable_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py
index 338646b..955dd5d 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -134,6 +134,7 @@ class VerifiableConsumer(BackgroundThreadService):
 
     def __init__(self, context, num_nodes, kafka, topic, group_id,
                  max_messages=-1, session_timeout=30000, enable_autocommit=False,
+                 assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor",
                  version=TRUNK):
         super(VerifiableConsumer, self).__init__(context, num_nodes)
         self.log_level = "TRACE"
@@ -144,6 +145,7 @@ class VerifiableConsumer(BackgroundThreadService):
         self.max_messages = max_messages
         self.session_timeout = session_timeout
         self.enable_autocommit = enable_autocommit
+        self.assignment_strategy = assignment_strategy
         self.prop_file = ""
         self.security_config = kafka.security_config.client_config(self.prop_file)
         self.prop_file += str(self.security_config)
@@ -223,9 +225,10 @@ class VerifiableConsumer(BackgroundThreadService):
         cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableConsumer.LOG4J_CONFIG
         cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableConsumer" \
-              " --group-id %s --topic %s --broker-list %s --session-timeout %s %s" % \
+              " --group-id %s --topic %s --broker-list %s --session-timeout %s --assignment-strategy %s %s" % \
               (self.group_id, self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol), self.session_timeout,
-               "--enable-autocommit" if self.enable_autocommit else "")
+               self.assignment_strategy, "--enable-autocommit" if self.enable_autocommit else "")
+
         if self.max_messages > 0:
             cmd += " --max-messages %s" % str(self.max_messages)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/tests/kafkatest/tests/consumer_rolling_upgrade.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/consumer_rolling_upgrade.py b/tests/kafkatest/tests/consumer_rolling_upgrade.py
new file mode 100644
index 0000000..f00937c
--- /dev/null
+++ b/tests/kafkatest/tests/consumer_rolling_upgrade.py
@@ -0,0 +1,94 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.utils.util import wait_until
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.verifiable_consumer import VerifiableConsumer
+from kafkatest.services.kafka import TopicPartition
+
+class ConsumerRollingUpgradeTest(KafkaTest):
+    TOPIC = "test_topic"
+    NUM_PARTITIONS = 4
+    GROUP_ID = "test_group_id"
+    RANGE = "org.apache.kafka.clients.consumer.RangeAssignor"
+    ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor"
+
+    def __init__(self, test_context):
+        super(ConsumerRollingUpgradeTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
+            self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 }
+        })
+        self.num_consumers = 2
+        self.session_timeout = 10000
+
+    def min_cluster_size(self):
+        return super(ConsumerRollingUpgradeTest, self).min_cluster_size() + self.num_consumers
+
+    def _await_all_members(self, consumer):
+        # Wait until all members have joined the group
+        wait_until(lambda: len(consumer.joined_nodes()) == self.num_consumers, timeout_sec=self.session_timeout+5,
+                   err_msg="Consumers failed to join in a reasonable amount of time")
+
+    def _verify_range_assignment(self, consumer):
+        # range assignment should give us two partition sets: (0, 1) and (2, 3)
+        assignment = set([frozenset(partitions) for partitions in consumer.current_assignment().values()])
+        assert assignment == set([
+            frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 1)]),
+            frozenset([TopicPartition(self.TOPIC, 2), TopicPartition(self.TOPIC, 3)])])
+
+    def _verify_roundrobin_assignment(self, consumer):
+        assignment = set([frozenset(x) for x in consumer.current_assignment().values()])
+        assert assignment == set([
+            frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 2)]),
+            frozenset([TopicPartition(self.TOPIC, 1), TopicPartition(self.TOPIC, 3)])])
+
+    def rolling_update_test(self):
+        """
+        Verify rolling updates of partition assignment strategies works correctly. In this
+        test, we use a rolling restart to change the group's assignment strategy from "range" 
+        to "roundrobin." We verify after every restart that all members are still in the group
+        and that the correct assignment strategy was used.
+        """
+
+        # initialize the consumer using range assignment
+        consumer = VerifiableConsumer(self.test_context, self.num_consumers, self.kafka,
+                                      self.TOPIC, self.GROUP_ID, session_timeout=self.session_timeout,
+                                      assignment_strategy=self.RANGE)
+        consumer.start()
+        self._await_all_members(consumer)
+        self._verify_range_assignment(consumer)
+
+        # change consumer configuration to prefer round-robin assignment, but still support range assignment
+        consumer.assignment_strategy = self.ROUND_ROBIN + "," + self.RANGE
+
+        # restart one of the nodes and verify that we are still using range assignment
+        consumer.stop_node(consumer.nodes[0])
+        consumer.start_node(consumer.nodes[0])
+        self._await_all_members(consumer)
+        self._verify_range_assignment(consumer)
+        
+        # now restart the other node and verify that we have switched to round-robin
+        consumer.stop_node(consumer.nodes[1])
+        consumer.start_node(consumer.nodes[1])
+        self._await_all_members(consumer)
+        self._verify_roundrobin_assignment(consumer)
+
+        # if we want, we can now drop support for range assignment
+        consumer.assignment_strategy = self.ROUND_ROBIN
+        for node in consumer.nodes:
+            consumer.stop_node(node)
+            consumer.start_node(node)
+            self._await_all_members(consumer)
+            self._verify_roundrobin_assignment(consumer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index 93c0bc6..25b87bd 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -35,6 +35,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.clients.consumer.RoundRobinAssignor;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -538,6 +540,14 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
                 .dest("resetPolicy")
                 .help("Set reset policy (must be either 'earliest', 'latest', or 'none'");
 
+        parser.addArgument("--assignment-strategy")
+                .action(store())
+                .required(false)
+                .setDefault(RangeAssignor.class.getName())
+                .type(String.class)
+                .dest("assignmentStrategy")
+                .help("Set assignment strategy (e.g. " + RoundRobinAssignor.class.getName() + ")");
+
         parser.addArgument("--consumer.config")
                 .action(store())
                 .required(false)
@@ -571,6 +581,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
         consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, useAutoCommit);
         consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, res.getString("resetPolicy"));
         consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(res.getInt("sessionTimeout")));
+        consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, res.getString("assignmentStrategy"));
 
         StringDeserializer deserializer = new StringDeserializer();
         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps, deserializer, deserializer);