You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/12/05 23:06:11 UTC
kafka git commit: MINOR: backport fix to partition assignor order
(KAFKA-2931)
Repository: kafka
Updated Branches:
refs/heads/0.9.0 7b46a99d6 -> 8687c099c
MINOR: backport fix to partition assignor order (KAFKA-2931)
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Guozhang Wang, Gwen Shapira
Closes #630 from hachikuji/KAFKA-2931-0.9
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8687c099
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8687c099
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8687c099
Branch: refs/heads/0.9.0
Commit: 8687c099c4ef082a8f35a050d3b319c7721921c2
Parents: 7b46a99
Author: Jason Gustafson <ja...@confluent.io>
Authored: Sat Dec 5 14:05:59 2015 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Sat Dec 5 14:05:59 2015 -0800
----------------------------------------------------------------------
.../consumer/internals/AbstractCoordinator.java | 14 ++---
.../consumer/internals/ConsumerCoordinator.java | 32 ++++++----
.../kafka/common/requests/JoinGroupRequest.java | 14 ++---
.../internals/ConsumerCoordinatorTest.java | 62 ++++++++++++++------
.../common/requests/RequestResponseTest.java | 4 +-
.../runtime/distributed/WorkerCoordinator.java | 10 ++--
.../runtime/distributed/WorkerGroupMember.java | 2 -
.../distributed/WorkerCoordinatorTest.java | 10 ++--
.../kafka/api/AuthorizerIntegrationTest.scala | 2 +-
9 files changed, 88 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8687c099/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 322de5c..33886ed 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.requests.GroupCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
@@ -48,9 +49,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -132,14 +131,14 @@ public abstract class AbstractCoordinator implements Closeable {
/**
* Get the current list of protocols and their associated metadata supported
- * by the local member. The order of the protocols in the map indicates the preference
+ * by the local member. The order of the protocols in the list indicates the preference
* of the protocol (the first entry is the most preferred). The coordinator takes this
* preference into account when selecting the generation protocol (generally more preferred
* protocols will be selected as long as all members support them and there is no disagreement
* on the preference).
* @return Non-empty map of supported protocols and metadata
*/
- protected abstract LinkedHashMap<String, ByteBuffer> metadata();
+ protected abstract List<ProtocolMetadata> metadata();
/**
* Invoked prior to each group join or rejoin. This is typically used to perform any
@@ -308,17 +307,12 @@ public abstract class AbstractCoordinator implements Closeable {
// send a join group request to the coordinator
log.debug("(Re-)joining group {}", groupId);
-
- List<JoinGroupRequest.GroupProtocol> protocols = new ArrayList<>();
- for (Map.Entry<String, ByteBuffer> metadataEntry : metadata().entrySet())
- protocols.add(new JoinGroupRequest.GroupProtocol(metadataEntry.getKey(), metadataEntry.getValue()));
-
JoinGroupRequest request = new JoinGroupRequest(
groupId,
this.sessionTimeoutMs,
this.memberId,
protocolType(),
- protocols);
+ metadata());
// create the request for the coordinator
log.debug("Issuing request ({}: {}) to coordinator {}", ApiKeys.JOIN_GROUP, request, this.coordinator.id());
http://git-wip-us.apache.org/repos/asf/kafka/blob/8687c099/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 8453c7b..4ac05a3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -37,6 +37,7 @@ import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
@@ -49,7 +50,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -61,7 +61,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
private static final Logger log = LoggerFactory.getLogger(ConsumerCoordinator.class);
- private final Map<String, PartitionAssignor> protocolMap;
+ private final List<PartitionAssignor> assignors;
private final org.apache.kafka.clients.Metadata metadata;
private final MetadataSnapshot metadataSnapshot;
private final ConsumerCoordinatorMetrics sensors;
@@ -104,10 +104,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
this.subscriptions = subscriptions;
this.defaultOffsetCommitCallback = defaultOffsetCommitCallback;
this.autoCommitEnabled = autoCommitEnabled;
-
- this.protocolMap = new HashMap<>();
- for (PartitionAssignor assignor : assignors)
- this.protocolMap.put(assignor.name(), assignor);
+ this.assignors = assignors;
addMetadataListener();
@@ -121,13 +118,14 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
@Override
- public LinkedHashMap<String, ByteBuffer> metadata() {
- LinkedHashMap<String, ByteBuffer> metadata = new LinkedHashMap<>();
- for (PartitionAssignor assignor : protocolMap.values()) {
+ public List<ProtocolMetadata> metadata() {
+ List<ProtocolMetadata> metadataList = new ArrayList<>();
+ for (PartitionAssignor assignor : assignors) {
Subscription subscription = assignor.subscription(subscriptions.subscription());
- metadata.put(assignor.name(), ConsumerProtocol.serializeSubscription(subscription));
+ ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription);
+ metadataList.add(new ProtocolMetadata(assignor.name(), metadata));
}
- return metadata;
+ return metadataList;
}
private void addMetadataListener() {
@@ -156,12 +154,20 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
});
}
+ private PartitionAssignor lookupAssignor(String name) {
+ for (PartitionAssignor assignor : this.assignors) {
+ if (assignor.name().equals(name))
+ return assignor;
+ }
+ return null;
+ }
+
@Override
protected void onJoinComplete(int generation,
String memberId,
String assignmentStrategy,
ByteBuffer assignmentBuffer) {
- PartitionAssignor assignor = protocolMap.get(assignmentStrategy);
+ PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
if (assignor == null)
throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
@@ -198,7 +204,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
protected Map<String, ByteBuffer> performAssignment(String leaderId,
String assignmentStrategy,
Map<String, ByteBuffer> allSubscriptions) {
- PartitionAssignor assignor = protocolMap.get(protocol);
+ PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
if (assignor == null)
throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
http://git-wip-us.apache.org/repos/asf/kafka/blob/8687c099/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 91a698c..cae07bc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -40,13 +40,13 @@ public class JoinGroupRequest extends AbstractRequest {
private final int sessionTimeout;
private final String memberId;
private final String protocolType;
- private final List<GroupProtocol> groupProtocols;
+ private final List<ProtocolMetadata> groupProtocols;
- public static class GroupProtocol {
+ public static class ProtocolMetadata {
private final String name;
private final ByteBuffer metadata;
- public GroupProtocol(String name, ByteBuffer metadata) {
+ public ProtocolMetadata(String name, ByteBuffer metadata) {
this.name = name;
this.metadata = metadata;
}
@@ -64,7 +64,7 @@ public class JoinGroupRequest extends AbstractRequest {
int sessionTimeout,
String memberId,
String protocolType,
- List<GroupProtocol> groupProtocols) {
+ List<ProtocolMetadata> groupProtocols) {
super(new Struct(CURRENT_SCHEMA));
struct.set(GROUP_ID_KEY_NAME, groupId);
struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout);
@@ -72,7 +72,7 @@ public class JoinGroupRequest extends AbstractRequest {
struct.set(PROTOCOL_TYPE_KEY_NAME, protocolType);
List<Struct> groupProtocolsList = new ArrayList<>();
- for (GroupProtocol protocol : groupProtocols) {
+ for (ProtocolMetadata protocol : groupProtocols) {
Struct protocolStruct = struct.instance(GROUP_PROTOCOLS_KEY_NAME);
protocolStruct.set(PROTOCOL_NAME_KEY_NAME, protocol.name);
protocolStruct.set(PROTOCOL_METADATA_KEY_NAME, protocol.metadata);
@@ -99,7 +99,7 @@ public class JoinGroupRequest extends AbstractRequest {
Struct groupProtocolStruct = (Struct) groupProtocolObj;
String name = groupProtocolStruct.getString(PROTOCOL_NAME_KEY_NAME);
ByteBuffer metadata = groupProtocolStruct.getBytes(PROTOCOL_METADATA_KEY_NAME);
- groupProtocols.add(new GroupProtocol(name, metadata));
+ groupProtocols.add(new ProtocolMetadata(name, metadata));
}
}
@@ -132,7 +132,7 @@ public class JoinGroupRequest extends AbstractRequest {
return memberId;
}
- public List<GroupProtocol> groupProtocols() {
+ public List<ProtocolMetadata> groupProtocols() {
return groupProtocols;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8687c099/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 500aaed..9f9682a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -24,6 +24,8 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
@@ -38,6 +40,7 @@ import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.GroupCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
@@ -75,7 +78,6 @@ public class ConsumerCoordinatorTest {
private int sessionTimeoutMs = 10;
private int heartbeatIntervalMs = 2;
private long retryBackoffMs = 100;
- private long requestTimeoutMs = 5000;
private boolean autoCommitEnabled = false;
private long autoCommitIntervalMs = 5000;
private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor();
@@ -108,22 +110,7 @@ public class ConsumerCoordinatorTest {
client.setNode(node);
- this.coordinator = new ConsumerCoordinator(
- consumerClient,
- groupId,
- sessionTimeoutMs,
- heartbeatIntervalMs,
- assignors,
- metadata,
- subscriptions,
- metrics,
- "consumer" + groupId,
- metricTags,
- time,
- retryBackoffMs,
- defaultOffsetCommitCallback,
- autoCommitEnabled,
- autoCommitIntervalMs);
+ this.coordinator = buildCoordinator(metrics, assignors);
}
@After
@@ -892,6 +879,47 @@ public class ConsumerCoordinatorTest {
assertEquals(null, subscriptions.committed(tp));
}
+ @Test
+ public void testProtocolMetadataOrder() {
+ RoundRobinAssignor roundRobin = new RoundRobinAssignor();
+ RangeAssignor range = new RangeAssignor();
+
+ try (Metrics metrics = new Metrics(time)) {
+ ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(roundRobin, range));
+ List<ProtocolMetadata> metadata = coordinator.metadata();
+ assertEquals(2, metadata.size());
+ assertEquals(roundRobin.name(), metadata.get(0).name());
+ assertEquals(range.name(), metadata.get(1).name());
+ }
+
+ try (Metrics metrics = new Metrics(time)) {
+ ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(range, roundRobin));
+ List<ProtocolMetadata> metadata = coordinator.metadata();
+ assertEquals(2, metadata.size());
+ assertEquals(range.name(), metadata.get(0).name());
+ assertEquals(roundRobin.name(), metadata.get(1).name());
+ }
+ }
+
+ private ConsumerCoordinator buildCoordinator(Metrics metrics, List<PartitionAssignor> assignors) {
+ return new ConsumerCoordinator(
+ consumerClient,
+ groupId,
+ sessionTimeoutMs,
+ heartbeatIntervalMs,
+ assignors,
+ metadata,
+ subscriptions,
+ metrics,
+ "consumer" + groupId,
+ metricTags,
+ time,
+ retryBackoffMs,
+ defaultOffsetCommitCallback,
+ autoCommitEnabled,
+ autoCommitIntervalMs);
+ }
+
private Struct consumerMetadataResponse(Node node, short error) {
GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node);
return response.toStruct();
http://git-wip-us.apache.org/repos/asf/kafka/blob/8687c099/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 5ee11d2..ab18817 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -188,8 +188,8 @@ public class RequestResponseTest {
private AbstractRequest createJoinGroupRequest() {
ByteBuffer metadata = ByteBuffer.wrap(new byte[] {});
- List<JoinGroupRequest.GroupProtocol> protocols = new ArrayList<>();
- protocols.add(new JoinGroupRequest.GroupProtocol("consumer-range", metadata));
+ List<JoinGroupRequest.ProtocolMetadata> protocols = new ArrayList<>();
+ protocols.add(new JoinGroupRequest.ProtocolMetadata("consumer-range", metadata));
return new JoinGroupRequest("group1", 30000, "consumer1", "consumer", protocols);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8687c099/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 082e235..6275636 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
import org.apache.kafka.common.utils.CircularIterator;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.storage.KafkaConfigStorage;
@@ -35,7 +36,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -69,7 +69,6 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
String metricGrpPrefix,
Map<String, String> metricTags,
Time time,
- long requestTimeoutMs,
long retryBackoffMs,
String restUrl,
KafkaConfigStorage configStorage,
@@ -101,12 +100,11 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
}
@Override
- public LinkedHashMap<String, ByteBuffer> metadata() {
- LinkedHashMap<String, ByteBuffer> metadata = new LinkedHashMap<>();
+ public List<ProtocolMetadata> metadata() {
configSnapshot = configStorage.snapshot();
ConnectProtocol.WorkerState workerState = new ConnectProtocol.WorkerState(restUrl, configSnapshot.offset());
- metadata.put(DEFAULT_SUBPROTOCOL, ConnectProtocol.serializeMetadata(workerState));
- return metadata;
+ ByteBuffer metadata = ConnectProtocol.serializeMetadata(workerState);
+ return Collections.singletonList(new ProtocolMetadata(DEFAULT_SUBPROTOCOL, metadata));
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/8687c099/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index c72e3ef..a36608a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -21,7 +21,6 @@ import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
@@ -105,7 +104,6 @@ public class WorkerGroupMember {
metricGrpPrefix,
metricsTags,
this.time,
- config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
retryBackoffMs,
restUrl,
configStorage,
http://git-wip-us.apache.org/repos/asf/kafka/blob/8687c099/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 38c1aeb..f47a9f9 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.GroupCoordinatorResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
@@ -68,7 +69,6 @@ public class WorkerCoordinatorTest {
private int sessionTimeoutMs = 10;
private int heartbeatIntervalMs = 2;
private long retryBackoffMs = 100;
- private long requestTimeoutMs = 5000;
private MockTime time;
private MockClient client;
private Cluster cluster = TestUtils.singletonCluster("topic", 1);
@@ -105,7 +105,6 @@ public class WorkerCoordinatorTest {
"consumer" + groupId,
metricTags,
time,
- requestTimeoutMs,
retryBackoffMs,
LEADER_URL,
configStorage,
@@ -149,9 +148,12 @@ public class WorkerCoordinatorTest {
PowerMock.replayAll();
- LinkedHashMap<String, ByteBuffer> serialized = coordinator.metadata();
+ List<ProtocolMetadata> serialized = coordinator.metadata();
assertEquals(1, serialized.size());
- ConnectProtocol.WorkerState state = ConnectProtocol.deserializeMetadata(serialized.get(WorkerCoordinator.DEFAULT_SUBPROTOCOL));
+
+ ProtocolMetadata defaultMetadata = serialized.get(0);
+ assertEquals(WorkerCoordinator.DEFAULT_SUBPROTOCOL, defaultMetadata.name());
+ ConnectProtocol.WorkerState state = ConnectProtocol.deserializeMetadata(defaultMetadata.metadata());
assertEquals(1, state.offset());
PowerMock.verifyAll();
http://git-wip-us.apache.org/repos/asf/kafka/blob/8687c099/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 26ab885..4aa8438 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -191,7 +191,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
private def createJoinGroupRequest = {
new JoinGroupRequest(group, 30000, "", "consumer",
- List( new JoinGroupRequest.GroupProtocol("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava)
+ List( new JoinGroupRequest.ProtocolMetadata("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava)
}
private def createSyncGroupRequest = {