You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/12/05 23:06:11 UTC

kafka git commit: MINOR: backport fix to partition assignor order (KAFKA-2931)

Repository: kafka
Updated Branches:
  refs/heads/0.9.0 7b46a99d6 -> 8687c099c


MINOR: backport fix to partition assignor order (KAFKA-2931)

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Guozhang Wang, Gwen Shapira

Closes #630 from hachikuji/KAFKA-2931-0.9


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8687c099
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8687c099
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8687c099

Branch: refs/heads/0.9.0
Commit: 8687c099c4ef082a8f35a050d3b319c7721921c2
Parents: 7b46a99
Author: Jason Gustafson <ja...@confluent.io>
Authored: Sat Dec 5 14:05:59 2015 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Sat Dec 5 14:05:59 2015 -0800

----------------------------------------------------------------------
 .../consumer/internals/AbstractCoordinator.java | 14 ++---
 .../consumer/internals/ConsumerCoordinator.java | 32 ++++++----
 .../kafka/common/requests/JoinGroupRequest.java | 14 ++---
 .../internals/ConsumerCoordinatorTest.java      | 62 ++++++++++++++------
 .../common/requests/RequestResponseTest.java    |  4 +-
 .../runtime/distributed/WorkerCoordinator.java  | 10 ++--
 .../runtime/distributed/WorkerGroupMember.java  |  2 -
 .../distributed/WorkerCoordinatorTest.java      | 10 ++--
 .../kafka/api/AuthorizerIntegrationTest.scala   |  2 +-
 9 files changed, 88 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8687c099/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 322de5c..33886ed 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.requests.GroupCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatRequest;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
 import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.LeaveGroupRequest;
 import org.apache.kafka.common.requests.LeaveGroupResponse;
@@ -48,9 +49,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -132,14 +131,14 @@ public abstract class AbstractCoordinator implements Closeable {
 
     /**
      * Get the current list of protocols and their associated metadata supported
-     * by the local member. The order of the protocols in the map indicates the preference
+     * by the local member. The order of the protocols in the list indicates the preference
      * of the protocol (the first entry is the most preferred). The coordinator takes this
      * preference into account when selecting the generation protocol (generally more preferred
      * protocols will be selected as long as all members support them and there is no disagreement
      * on the preference).
      * @return Non-empty map of supported protocols and metadata
      */
-    protected abstract LinkedHashMap<String, ByteBuffer> metadata();
+    protected abstract List<ProtocolMetadata> metadata();
 
     /**
      * Invoked prior to each group join or rejoin. This is typically used to perform any
@@ -308,17 +307,12 @@ public abstract class AbstractCoordinator implements Closeable {
 
         // send a join group request to the coordinator
         log.debug("(Re-)joining group {}", groupId);
-
-        List<JoinGroupRequest.GroupProtocol> protocols = new ArrayList<>();
-        for (Map.Entry<String, ByteBuffer> metadataEntry : metadata().entrySet())
-            protocols.add(new JoinGroupRequest.GroupProtocol(metadataEntry.getKey(), metadataEntry.getValue()));
-
         JoinGroupRequest request = new JoinGroupRequest(
                 groupId,
                 this.sessionTimeoutMs,
                 this.memberId,
                 protocolType(),
-                protocols);
+                metadata());
 
         // create the request for the coordinator
         log.debug("Issuing request ({}: {}) to coordinator {}", ApiKeys.JOIN_GROUP, request, this.coordinator.id());

http://git-wip-us.apache.org/repos/asf/kafka/blob/8687c099/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 8453c7b..4ac05a3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -37,6 +37,7 @@ import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.OffsetFetchRequest;
@@ -49,7 +50,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -61,7 +61,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
     private static final Logger log = LoggerFactory.getLogger(ConsumerCoordinator.class);
 
-    private final Map<String, PartitionAssignor> protocolMap;
+    private final List<PartitionAssignor> assignors;
     private final org.apache.kafka.clients.Metadata metadata;
     private final MetadataSnapshot metadataSnapshot;
     private final ConsumerCoordinatorMetrics sensors;
@@ -104,10 +104,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         this.subscriptions = subscriptions;
         this.defaultOffsetCommitCallback = defaultOffsetCommitCallback;
         this.autoCommitEnabled = autoCommitEnabled;
-
-        this.protocolMap = new HashMap<>();
-        for (PartitionAssignor assignor : assignors)
-            this.protocolMap.put(assignor.name(), assignor);
+        this.assignors = assignors;
 
         addMetadataListener();
 
@@ -121,13 +118,14 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     }
 
     @Override
-    public LinkedHashMap<String, ByteBuffer> metadata() {
-        LinkedHashMap<String, ByteBuffer> metadata = new LinkedHashMap<>();
-        for (PartitionAssignor assignor : protocolMap.values()) {
+    public List<ProtocolMetadata> metadata() {
+        List<ProtocolMetadata> metadataList = new ArrayList<>();
+        for (PartitionAssignor assignor : assignors) {
             Subscription subscription = assignor.subscription(subscriptions.subscription());
-            metadata.put(assignor.name(), ConsumerProtocol.serializeSubscription(subscription));
+            ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription);
+            metadataList.add(new ProtocolMetadata(assignor.name(), metadata));
         }
-        return metadata;
+        return metadataList;
     }
 
     private void addMetadataListener() {
@@ -156,12 +154,20 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         });
     }
 
+    private PartitionAssignor lookupAssignor(String name) {
+        for (PartitionAssignor assignor : this.assignors) {
+            if (assignor.name().equals(name))
+                return assignor;
+        }
+        return null;
+    }
+
     @Override
     protected void onJoinComplete(int generation,
                                   String memberId,
                                   String assignmentStrategy,
                                   ByteBuffer assignmentBuffer) {
-        PartitionAssignor assignor = protocolMap.get(assignmentStrategy);
+        PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
         if (assignor == null)
             throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
 
@@ -198,7 +204,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     protected Map<String, ByteBuffer> performAssignment(String leaderId,
                                                         String assignmentStrategy,
                                                         Map<String, ByteBuffer> allSubscriptions) {
-        PartitionAssignor assignor = protocolMap.get(protocol);
+        PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
         if (assignor == null)
             throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8687c099/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 91a698c..cae07bc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -40,13 +40,13 @@ public class JoinGroupRequest extends AbstractRequest {
     private final int sessionTimeout;
     private final String memberId;
     private final String protocolType;
-    private final List<GroupProtocol> groupProtocols;
+    private final List<ProtocolMetadata> groupProtocols;
 
-    public static class GroupProtocol {
+    public static class ProtocolMetadata {
         private final String name;
         private final ByteBuffer metadata;
 
-        public GroupProtocol(String name, ByteBuffer metadata) {
+        public ProtocolMetadata(String name, ByteBuffer metadata) {
             this.name = name;
             this.metadata = metadata;
         }
@@ -64,7 +64,7 @@ public class JoinGroupRequest extends AbstractRequest {
                             int sessionTimeout,
                             String memberId,
                             String protocolType,
-                            List<GroupProtocol> groupProtocols) {
+                            List<ProtocolMetadata> groupProtocols) {
         super(new Struct(CURRENT_SCHEMA));
         struct.set(GROUP_ID_KEY_NAME, groupId);
         struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout);
@@ -72,7 +72,7 @@ public class JoinGroupRequest extends AbstractRequest {
         struct.set(PROTOCOL_TYPE_KEY_NAME, protocolType);
 
         List<Struct> groupProtocolsList = new ArrayList<>();
-        for (GroupProtocol protocol : groupProtocols) {
+        for (ProtocolMetadata protocol : groupProtocols) {
             Struct protocolStruct = struct.instance(GROUP_PROTOCOLS_KEY_NAME);
             protocolStruct.set(PROTOCOL_NAME_KEY_NAME, protocol.name);
             protocolStruct.set(PROTOCOL_METADATA_KEY_NAME, protocol.metadata);
@@ -99,7 +99,7 @@ public class JoinGroupRequest extends AbstractRequest {
             Struct groupProtocolStruct = (Struct) groupProtocolObj;
             String name = groupProtocolStruct.getString(PROTOCOL_NAME_KEY_NAME);
             ByteBuffer metadata = groupProtocolStruct.getBytes(PROTOCOL_METADATA_KEY_NAME);
-            groupProtocols.add(new GroupProtocol(name, metadata));
+            groupProtocols.add(new ProtocolMetadata(name, metadata));
         }
     }
 
@@ -132,7 +132,7 @@ public class JoinGroupRequest extends AbstractRequest {
         return memberId;
     }
 
-    public List<GroupProtocol> groupProtocols() {
+    public List<ProtocolMetadata> groupProtocols() {
         return groupProtocols;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8687c099/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 500aaed..9f9682a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -24,6 +24,8 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.clients.consumer.RoundRobinAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
@@ -38,6 +40,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.requests.GroupCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
 import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.LeaveGroupRequest;
 import org.apache.kafka.common.requests.LeaveGroupResponse;
@@ -75,7 +78,6 @@ public class ConsumerCoordinatorTest {
     private int sessionTimeoutMs = 10;
     private int heartbeatIntervalMs = 2;
     private long retryBackoffMs = 100;
-    private long requestTimeoutMs = 5000;
     private boolean autoCommitEnabled = false;
     private long autoCommitIntervalMs = 5000;
     private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor();
@@ -108,22 +110,7 @@ public class ConsumerCoordinatorTest {
 
         client.setNode(node);
 
-        this.coordinator = new ConsumerCoordinator(
-                consumerClient,
-                groupId,
-                sessionTimeoutMs,
-                heartbeatIntervalMs,
-                assignors,
-                metadata,
-                subscriptions,
-                metrics,
-                "consumer" + groupId,
-                metricTags,
-                time,
-                retryBackoffMs,
-                defaultOffsetCommitCallback,
-                autoCommitEnabled,
-                autoCommitIntervalMs);
+        this.coordinator = buildCoordinator(metrics, assignors);
     }
 
     @After
@@ -892,6 +879,47 @@ public class ConsumerCoordinatorTest {
         assertEquals(null, subscriptions.committed(tp));
     }
 
+    @Test
+    public void testProtocolMetadataOrder() {
+        RoundRobinAssignor roundRobin = new RoundRobinAssignor();
+        RangeAssignor range = new RangeAssignor();
+
+        try (Metrics metrics = new Metrics(time)) {
+            ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(roundRobin, range));
+            List<ProtocolMetadata> metadata = coordinator.metadata();
+            assertEquals(2, metadata.size());
+            assertEquals(roundRobin.name(), metadata.get(0).name());
+            assertEquals(range.name(), metadata.get(1).name());
+        }
+
+        try (Metrics metrics = new Metrics(time)) {
+            ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(range, roundRobin));
+            List<ProtocolMetadata> metadata = coordinator.metadata();
+            assertEquals(2, metadata.size());
+            assertEquals(range.name(), metadata.get(0).name());
+            assertEquals(roundRobin.name(), metadata.get(1).name());
+        }
+    }
+
+    private ConsumerCoordinator buildCoordinator(Metrics metrics, List<PartitionAssignor> assignors) {
+        return new ConsumerCoordinator(
+                consumerClient,
+                groupId,
+                sessionTimeoutMs,
+                heartbeatIntervalMs,
+                assignors,
+                metadata,
+                subscriptions,
+                metrics,
+                "consumer" + groupId,
+                metricTags,
+                time,
+                retryBackoffMs,
+                defaultOffsetCommitCallback,
+                autoCommitEnabled,
+                autoCommitIntervalMs);
+    }
+
     private Struct consumerMetadataResponse(Node node, short error) {
         GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node);
         return response.toStruct();

http://git-wip-us.apache.org/repos/asf/kafka/blob/8687c099/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 5ee11d2..ab18817 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -188,8 +188,8 @@ public class RequestResponseTest {
 
     private AbstractRequest createJoinGroupRequest() {
         ByteBuffer metadata = ByteBuffer.wrap(new byte[] {});
-        List<JoinGroupRequest.GroupProtocol> protocols = new ArrayList<>();
-        protocols.add(new JoinGroupRequest.GroupProtocol("consumer-range", metadata));
+        List<JoinGroupRequest.ProtocolMetadata> protocols = new ArrayList<>();
+        protocols.add(new JoinGroupRequest.ProtocolMetadata("consumer-range", metadata));
         return new JoinGroupRequest("group1", 30000, "consumer1", "consumer", protocols);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8687c099/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 082e235..6275636 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
 import org.apache.kafka.common.utils.CircularIterator;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.storage.KafkaConfigStorage;
@@ -35,7 +36,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -69,7 +69,6 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
                              String metricGrpPrefix,
                              Map<String, String> metricTags,
                              Time time,
-                             long requestTimeoutMs,
                              long retryBackoffMs,
                              String restUrl,
                              KafkaConfigStorage configStorage,
@@ -101,12 +100,11 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
     }
 
     @Override
-    public LinkedHashMap<String, ByteBuffer> metadata() {
-        LinkedHashMap<String, ByteBuffer> metadata = new LinkedHashMap<>();
+    public List<ProtocolMetadata> metadata() {
         configSnapshot = configStorage.snapshot();
         ConnectProtocol.WorkerState workerState = new ConnectProtocol.WorkerState(restUrl, configSnapshot.offset());
-        metadata.put(DEFAULT_SUBPROTOCOL, ConnectProtocol.serializeMetadata(workerState));
-        return metadata;
+        ByteBuffer metadata = ConnectProtocol.serializeMetadata(workerState);
+        return Collections.singletonList(new ProtocolMetadata(DEFAULT_SUBPROTOCOL, metadata));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/8687c099/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index c72e3ef..a36608a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -21,7 +21,6 @@ import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
@@ -105,7 +104,6 @@ public class WorkerGroupMember {
                     metricGrpPrefix,
                     metricsTags,
                     this.time,
-                    config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
                     retryBackoffMs,
                     restUrl,
                     configStorage,

http://git-wip-us.apache.org/repos/asf/kafka/blob/8687c099/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 38c1aeb..f47a9f9 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.requests.GroupCoordinatorResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
 import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.SyncGroupRequest;
 import org.apache.kafka.common.requests.SyncGroupResponse;
@@ -68,7 +69,6 @@ public class WorkerCoordinatorTest {
     private int sessionTimeoutMs = 10;
     private int heartbeatIntervalMs = 2;
     private long retryBackoffMs = 100;
-    private long requestTimeoutMs = 5000;
     private MockTime time;
     private MockClient client;
     private Cluster cluster = TestUtils.singletonCluster("topic", 1);
@@ -105,7 +105,6 @@ public class WorkerCoordinatorTest {
                 "consumer" + groupId,
                 metricTags,
                 time,
-                requestTimeoutMs,
                 retryBackoffMs,
                 LEADER_URL,
                 configStorage,
@@ -149,9 +148,12 @@ public class WorkerCoordinatorTest {
 
         PowerMock.replayAll();
 
-        LinkedHashMap<String, ByteBuffer> serialized = coordinator.metadata();
+        List<ProtocolMetadata> serialized = coordinator.metadata();
         assertEquals(1, serialized.size());
-        ConnectProtocol.WorkerState state = ConnectProtocol.deserializeMetadata(serialized.get(WorkerCoordinator.DEFAULT_SUBPROTOCOL));
+
+        ProtocolMetadata defaultMetadata = serialized.get(0);
+        assertEquals(WorkerCoordinator.DEFAULT_SUBPROTOCOL, defaultMetadata.name());
+        ConnectProtocol.WorkerState state = ConnectProtocol.deserializeMetadata(defaultMetadata.metadata());
         assertEquals(1, state.offset());
 
         PowerMock.verifyAll();

http://git-wip-us.apache.org/repos/asf/kafka/blob/8687c099/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 26ab885..4aa8438 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -191,7 +191,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
 
   private def createJoinGroupRequest = {
     new JoinGroupRequest(group, 30000, "", "consumer",
-      List( new JoinGroupRequest.GroupProtocol("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava)
+      List( new JoinGroupRequest.ProtocolMetadata("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava)
   }
 
   private def createSyncGroupRequest = {