You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/21 21:08:49 UTC

[5/8] kafka git commit: KAFKA-2464: client-side assignment for new consumer

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
new file mode 100644
index 0000000..93994d7
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -0,0 +1,749 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.GroupMetadataResponse;
+import org.apache.kafka.common.requests.HeartbeatResponse;
+import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetCommitResponse;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.requests.SyncGroupRequest;
+import org.apache.kafka.common.requests.SyncGroupResponse;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class ConsumerCoordinatorTest {
+
+    private String topicName = "test";
+    private String groupId = "test-group";
+    private TopicPartition tp = new TopicPartition(topicName, 0);
+    private int sessionTimeoutMs = 10;
+    private int heartbeatIntervalMs = 2;
+    private long retryBackoffMs = 100;
+    private long requestTimeoutMs = 5000;
+    private boolean autoCommitEnabled = false;
+    private long autoCommitIntervalMs = 5000;
+    private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor();
+    private List<PartitionAssignor> assignors = Arrays.<PartitionAssignor>asList(partitionAssignor);
+    private MockTime time;
+    private MockClient client;
+    private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
+    private Node node = cluster.nodes().get(0);
+    private SubscriptionState subscriptions;
+    private Metadata metadata;
+    private Metrics metrics;
+    private Map<String, String> metricTags = new LinkedHashMap<>();
+    private ConsumerNetworkClient consumerClient;
+    private MockRebalanceListener rebalanceListener;
+    private MockCommitCallback defaultOffsetCommitCallback;
+    private ConsumerCoordinator coordinator;
+
+    @Before
+    public void setup() {
+        this.time = new MockTime();
+        this.client = new MockClient(time);
+        this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
+        this.metadata = new Metadata(0, Long.MAX_VALUE);
+        this.metadata.update(cluster, time.milliseconds());
+        this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
+        this.metrics = new Metrics(time);
+        this.rebalanceListener = new MockRebalanceListener();
+        this.defaultOffsetCommitCallback = new MockCommitCallback();
+        this.partitionAssignor.clear();
+
+        client.setNode(node);
+
+        this.coordinator = new ConsumerCoordinator(
+                consumerClient,
+                groupId,
+                sessionTimeoutMs,
+                heartbeatIntervalMs,
+                assignors,
+                metadata,
+                subscriptions,
+                metrics,
+                "consumer" + groupId,
+                metricTags,
+                time,
+                requestTimeoutMs,
+                retryBackoffMs,
+                defaultOffsetCommitCallback,
+                autoCommitEnabled,
+                autoCommitIntervalMs);
+    }
+
+    @After
+    public void teardown() {
+        this.metrics.close();
+    }
+
+    @Test
+    public void testNormalHeartbeat() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // normal heartbeat
+        time.sleep(sessionTimeoutMs);
+        RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
+        assertEquals(1, consumerClient.pendingRequestCount());
+        assertFalse(future.isDone());
+
+        client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
+        consumerClient.poll(0);
+
+        assertTrue(future.isDone());
+        assertTrue(future.succeeded());
+    }
+
+    @Test
+    public void testCoordinatorNotAvailable() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // GROUP_COORDINATOR_NOT_AVAILABLE will mark coordinator as unknown
+        time.sleep(sessionTimeoutMs);
+        RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
+        assertEquals(1, consumerClient.pendingRequestCount());
+        assertFalse(future.isDone());
+
+        client.prepareResponse(heartbeatResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()));
+        time.sleep(sessionTimeoutMs);
+        consumerClient.poll(0);
+
+        assertTrue(future.isDone());
+        assertTrue(future.failed());
+        assertEquals(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception(), future.exception());
+        assertTrue(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testNotCoordinator() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // not_coordinator will mark coordinator as unknown
+        time.sleep(sessionTimeoutMs);
+        RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
+        assertEquals(1, consumerClient.pendingRequestCount());
+        assertFalse(future.isDone());
+
+        client.prepareResponse(heartbeatResponse(Errors.NOT_COORDINATOR_FOR_GROUP.code()));
+        time.sleep(sessionTimeoutMs);
+        consumerClient.poll(0);
+
+        assertTrue(future.isDone());
+        assertTrue(future.failed());
+        assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.exception(), future.exception());
+        assertTrue(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testIllegalGeneration() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // illegal_generation will cause re-partition
+        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+        subscriptions.changePartitionAssignment(Collections.singletonList(tp));
+
+        time.sleep(sessionTimeoutMs);
+        RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
+        assertEquals(1, consumerClient.pendingRequestCount());
+        assertFalse(future.isDone());
+
+        client.prepareResponse(heartbeatResponse(Errors.ILLEGAL_GENERATION.code()));
+        time.sleep(sessionTimeoutMs);
+        consumerClient.poll(0);
+
+        assertTrue(future.isDone());
+        assertTrue(future.failed());
+        assertEquals(Errors.ILLEGAL_GENERATION.exception(), future.exception());
+        assertTrue(coordinator.needRejoin());
+    }
+
+    @Test
+    public void testUnknownConsumerId() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // illegal_generation will cause re-partition
+        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+        subscriptions.changePartitionAssignment(Collections.singletonList(tp));
+
+        time.sleep(sessionTimeoutMs);
+        RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
+        assertEquals(1, consumerClient.pendingRequestCount());
+        assertFalse(future.isDone());
+
+        client.prepareResponse(heartbeatResponse(Errors.UNKNOWN_MEMBER_ID.code()));
+        time.sleep(sessionTimeoutMs);
+        consumerClient.poll(0);
+
+        assertTrue(future.isDone());
+        assertTrue(future.failed());
+        assertEquals(Errors.UNKNOWN_MEMBER_ID.exception(), future.exception());
+        assertTrue(coordinator.needRejoin());
+    }
+
+    @Test
+    public void testCoordinatorDisconnect() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // coordinator disconnect will mark coordinator as unknown
+        time.sleep(sessionTimeoutMs);
+        RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
+        assertEquals(1, consumerClient.pendingRequestCount());
+        assertFalse(future.isDone());
+
+        client.prepareResponse(heartbeatResponse(Errors.NONE.code()), true); // return disconnected
+        time.sleep(sessionTimeoutMs);
+        consumerClient.poll(0);
+
+        assertTrue(future.isDone());
+        assertTrue(future.failed());
+        assertTrue(future.exception() instanceof DisconnectException);
+        assertTrue(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testNormalJoinGroupLeader() {
+        final String consumerId = "leader";
+
+        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+        subscriptions.needReassignment();
+
+        // ensure metadata is up-to-date for leader
+        metadata.setTopics(Arrays.asList(topicName));
+        metadata.update(cluster, time.milliseconds());
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // normal join group
+        Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, Arrays.asList(topicName));
+        partitionAssignor.prepare(Collections.singletonMap(consumerId, Arrays.asList(tp)));
+
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code()));
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                SyncGroupRequest sync = new SyncGroupRequest(request.request().body());
+                return sync.memberId().equals(consumerId) &&
+                        sync.generationId() == 1 &&
+                        sync.groupAssignment().containsKey(consumerId);
+            }
+        }, syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        coordinator.ensurePartitionAssignment();
+
+        assertFalse(subscriptions.partitionAssignmentNeeded());
+        assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+        assertEquals(1, rebalanceListener.revokedCount);
+        assertEquals(Collections.emptySet(), rebalanceListener.revoked);
+        assertEquals(1, rebalanceListener.assignedCount);
+        assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
+    }
+
+    @Test
+    public void testNormalJoinGroupFollower() {
+        final String consumerId = "consumer";
+
+        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+        subscriptions.needReassignment();
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // normal join group
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                SyncGroupRequest sync = new SyncGroupRequest(request.request().body());
+                return sync.memberId().equals(consumerId) &&
+                        sync.generationId() == 1 &&
+                        sync.groupAssignment().isEmpty();
+            }
+        }, syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+
+        coordinator.ensurePartitionAssignment();
+
+        assertFalse(subscriptions.partitionAssignmentNeeded());
+        assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+        assertEquals(1, rebalanceListener.revokedCount);
+        assertEquals(1, rebalanceListener.assignedCount);
+        assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
+    }
+
+
+    @Test
+    public void testMetadataChangeTriggersRebalance() {
+        final String consumerId = "consumer";
+
+        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+        subscriptions.needReassignment();
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+
+        coordinator.ensurePartitionAssignment();
+
+        assertFalse(subscriptions.partitionAssignmentNeeded());
+
+        // a new partition is added to the topic
+        metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds());
+
+        // we should detect the change and ask for reassignment
+        assertTrue(subscriptions.partitionAssignmentNeeded());
+    }
+
+    @Test
+    public void testRejoinGroup() {
+        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+        subscriptions.needReassignment();
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // join the group once
+        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        coordinator.ensurePartitionAssignment();
+
+        assertEquals(1, rebalanceListener.revokedCount);
+        assertEquals(1, rebalanceListener.assignedCount);
+
+        // and join the group again
+        subscriptions.needReassignment();
+        client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        coordinator.ensurePartitionAssignment();
+
+        assertEquals(2, rebalanceListener.revokedCount);
+        assertEquals(Collections.singleton(tp), rebalanceListener.revoked);
+        assertEquals(2, rebalanceListener.assignedCount);
+        assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
+    }
+
+    @Test
+    public void testDisconnectInJoin() {
+        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+        subscriptions.needReassignment();
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // disconnected from original coordinator will cause re-discover and join again
+        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()), true);
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        coordinator.ensurePartitionAssignment();
+        assertFalse(subscriptions.partitionAssignmentNeeded());
+        assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+        assertEquals(1, rebalanceListener.revokedCount);
+        assertEquals(1, rebalanceListener.assignedCount);
+        assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
+    }
+
+    @Test(expected = ApiException.class)
+    public void testInvalidSessionTimeout() {
+        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+        subscriptions.needReassignment();
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // coordinator doesn't like the session timeout
+        client.prepareResponse(joinGroupFollowerResponse(0, "consumer", "", Errors.INVALID_SESSION_TIMEOUT.code()));
+        coordinator.ensurePartitionAssignment();
+    }
+
+    @Test
+    public void testCommitOffsetOnly() {
+        subscriptions.assign(Arrays.asList(tp));
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+
+        AtomicBoolean success = new AtomicBoolean(false);
+        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), callback(success));
+        consumerClient.poll(0);
+        assertTrue(success.get());
+
+        assertEquals(100L, subscriptions.committed(tp).offset());
+    }
+
+    @Test
+    public void testCommitOffsetMetadata() {
+        subscriptions.assign(Arrays.asList(tp));
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+
+        AtomicBoolean success = new AtomicBoolean(false);
+        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "hello")), callback(success));
+        consumerClient.poll(0);
+        assertTrue(success.get());
+
+        assertEquals(100L, subscriptions.committed(tp).offset());
+        assertEquals("hello", subscriptions.committed(tp).metadata());
+    }
+
+    @Test
+    public void testCommitOffsetAsyncWithDefaultCallback() {
+        int invokedBeforeTest = defaultOffsetCommitCallback.invoked;
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null);
+        consumerClient.poll(0);
+        assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
+        assertNull(defaultOffsetCommitCallback.exception);
+    }
+
+    @Test
+    public void testResetGeneration() {
+        // enable auto-assignment
+        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        coordinator.ensurePartitionAssignment();
+
+        // now switch to manual assignment
+        subscriptions.unsubscribe();
+        coordinator.resetGeneration();
+        subscriptions.assign(Arrays.asList(tp));
+
+        // the client should not reuse generation/memberId from auto-subscribed generation
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                OffsetCommitRequest commitRequest = new OffsetCommitRequest(request.request().body());
+                return commitRequest.memberId().equals(OffsetCommitRequest.DEFAULT_MEMBER_ID) &&
+                        commitRequest.generationId() == OffsetCommitRequest.DEFAULT_GENERATION_ID;
+            }
+        }, offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+
+        AtomicBoolean success = new AtomicBoolean(false);
+        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), callback(success));
+        consumerClient.poll(0);
+        assertTrue(success.get());
+    }
+
+    @Test
+    public void testCommitOffsetAsyncFailedWithDefaultCallback() {
+        int invokedBeforeTest = defaultOffsetCommitCallback.invoked;
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
+        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null);
+        consumerClient.poll(0);
+        assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
+        assertEquals(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception(), defaultOffsetCommitCallback.exception);
+    }
+
+    @Test
+    public void testCommitOffsetAsyncCoordinatorNotAvailable() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // async commit with coordinator not available
+        MockCommitCallback cb = new MockCommitCallback();
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
+        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
+        consumerClient.poll(0);
+
+        assertTrue(coordinator.coordinatorUnknown());
+        assertEquals(1, cb.invoked);
+        assertEquals(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception(), cb.exception);
+    }
+
+    @Test
+    public void testCommitOffsetAsyncNotCoordinator() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // async commit with not coordinator
+        MockCommitCallback cb = new MockCommitCallback();
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code())));
+        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
+        consumerClient.poll(0);
+
+        assertTrue(coordinator.coordinatorUnknown());
+        assertEquals(1, cb.invoked);
+        assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.exception(), cb.exception);
+    }
+
+    @Test
+    public void testCommitOffsetAsyncDisconnected() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // async commit with coordinator disconnected
+        MockCommitCallback cb = new MockCommitCallback();
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
+        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
+        consumerClient.poll(0);
+
+        assertTrue(coordinator.coordinatorUnknown());
+        assertEquals(1, cb.invoked);
+        assertTrue(cb.exception instanceof DisconnectException);
+    }
+
+    @Test
+    public void testCommitOffsetSyncNotCoordinator() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code())));
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
+    }
+
+    @Test
+    public void testCommitOffsetSyncCoordinatorNotAvailable() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
+    }
+
+    @Test
+    public void testCommitOffsetSyncCoordinatorDisconnected() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
+    }
+
+    @Test(expected = OffsetMetadataTooLarge.class)
+    public void testCommitOffsetMetadataTooLarge() {
+        // since offset metadata is provided by the user, we have to propagate the exception so they can handle it
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.OFFSET_METADATA_TOO_LARGE.code())));
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")));
+    }
+
+    @Test(expected = ApiException.class)
+    public void testCommitOffsetSyncCallbackWithNonRetriableException() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // sync commit with invalid partitions should throw if we have no callback
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.UNKNOWN.code())), false);
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
+    }
+
+    @Test
+    public void testRefreshOffset() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        subscriptions.assign(Arrays.asList(tp));
+        subscriptions.needRefreshCommits();
+        client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
+        coordinator.refreshCommittedOffsetsIfNeeded();
+        assertFalse(subscriptions.refreshCommitsNeeded());
+        assertEquals(100L, subscriptions.committed(tp).offset());
+    }
+
+    @Test
+    public void testRefreshOffsetLoadInProgress() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        subscriptions.assign(Arrays.asList(tp));
+        subscriptions.needRefreshCommits();
+        client.prepareResponse(offsetFetchResponse(tp, Errors.OFFSET_LOAD_IN_PROGRESS.code(), "", 100L));
+        client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
+        coordinator.refreshCommittedOffsetsIfNeeded();
+        assertFalse(subscriptions.refreshCommitsNeeded());
+        assertEquals(100L, subscriptions.committed(tp).offset());
+    }
+
+    @Test
+    public void testRefreshOffsetNotCoordinatorForConsumer() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        subscriptions.assign(Arrays.asList(tp));
+        subscriptions.needRefreshCommits();
+        client.prepareResponse(offsetFetchResponse(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code(), "", 100L));
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
+        coordinator.refreshCommittedOffsetsIfNeeded();
+        assertFalse(subscriptions.refreshCommitsNeeded());
+        assertEquals(100L, subscriptions.committed(tp).offset());
+    }
+
+    @Test
+    public void testRefreshOffsetWithNoFetchableOffsets() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        subscriptions.assign(Arrays.asList(tp));
+        subscriptions.needRefreshCommits();
+        client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
+        coordinator.refreshCommittedOffsetsIfNeeded();
+        assertFalse(subscriptions.refreshCommitsNeeded());
+        assertEquals(null, subscriptions.committed(tp));
+    }
+
+    private Struct consumerMetadataResponse(Node node, short error) {
+        GroupMetadataResponse response = new GroupMetadataResponse(error, node);
+        return response.toStruct();
+    }
+
+    private Struct heartbeatResponse(short error) {
+        HeartbeatResponse response = new HeartbeatResponse(error);
+        return response.toStruct();
+    }
+
+    private Struct joinGroupLeaderResponse(int generationId, String memberId,
+                                           Map<String, List<String>> subscriptions,
+                                           short error) {
+        Map<String, ByteBuffer> metadata = new HashMap<>();
+        for (Map.Entry<String, List<String>> subscriptionEntry : subscriptions.entrySet()) {
+            PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(subscriptionEntry.getValue());
+            ByteBuffer buf = ConsumerProtocol.serializeSubscription(subscription);
+            metadata.put(subscriptionEntry.getKey(), buf);
+        }
+        return new JoinGroupResponse(error, generationId, partitionAssignor.name(), memberId, memberId, metadata).toStruct();
+    }
+
+    private Struct joinGroupFollowerResponse(int generationId, String memberId, String leaderId, short error) {
+        return new JoinGroupResponse(error, generationId, partitionAssignor.name(), memberId, leaderId,
+                Collections.<String, ByteBuffer>emptyMap()).toStruct();
+    }
+
+    private Struct syncGroupResponse(List<TopicPartition> partitions, short error) {
+        ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions));
+        return new SyncGroupResponse(error, buf).toStruct();
+    }
+
+    private Struct offsetCommitResponse(Map<TopicPartition, Short> responseData) {
+        OffsetCommitResponse response = new OffsetCommitResponse(responseData);
+        return response.toStruct();
+    }
+
+    private Struct offsetFetchResponse(TopicPartition tp, Short error, String metadata, long offset) {
+        OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, metadata, error);
+        OffsetFetchResponse response = new OffsetFetchResponse(Collections.singletonMap(tp, data));
+        return response.toStruct();
+    }
+
+    private OffsetCommitCallback callback(final AtomicBoolean success) {
+        return new OffsetCommitCallback() {
+            @Override
+            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
+                if (exception == null)
+                    success.set(true);
+            }
+        };
+    }
+
+    private static class MockCommitCallback implements OffsetCommitCallback {
+        public int invoked = 0;
+        public Exception exception = null;
+
+        @Override
+        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
+            invoked++;
+            this.exception = exception;
+        }
+    }
+
+    private static class MockRebalanceListener implements ConsumerRebalanceListener {
+        public Collection<TopicPartition> revoked;
+        public Collection<TopicPartition> assigned;
+        public int revokedCount = 0;
+        public int assignedCount = 0;
+
+
+        @Override
+        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+            this.assigned = partitions;
+            assignedCount++;
+        }
+
+        @Override
+        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+            this.revoked = partitions;
+            revokedCount++;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 9de1cee..6a42058 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -106,7 +106,7 @@ public class ConsumerNetworkClientTest {
 
 
     private HeartbeatRequest heartbeatRequest() {
-        return new HeartbeatRequest("group", 1, "consumerId");
+        return new HeartbeatRequest("group", 1, "memberId");
     }
 
     private Struct heartbeatResponse(short error) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
new file mode 100644
index 0000000..8113770
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class ConsumerProtocolTest {
+
+    @Test
+    public void serializeDeserializeMetadata() {
+        Subscription subscription = new Subscription(Arrays.asList("foo", "bar"));
+
+        ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
+        Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
+        assertEquals(subscription.topics(), parsedSubscription.topics());
+    }
+
+    @Test
+    public void deserializeNewSubscriptionVersion() {
+        // verify that a new version which adds a field is still parseable
+        short version = 100;
+
+        Schema subscriptionSchemaV100 = new Schema(
+                new Field(ConsumerProtocol.TOPICS_KEY_NAME, new ArrayOf(Type.STRING)),
+                new Field(ConsumerProtocol.USER_DATA_KEY_NAME, Type.BYTES),
+                new Field("foo", Type.STRING));
+
+        Struct subscriptionV100 = new Struct(subscriptionSchemaV100);
+        subscriptionV100.set(ConsumerProtocol.TOPICS_KEY_NAME, new Object[]{"topic"});
+        subscriptionV100.set(ConsumerProtocol.USER_DATA_KEY_NAME, ByteBuffer.wrap(new byte[0]));
+        subscriptionV100.set("foo", "bar");
+
+        Struct headerV100 = new Struct(ConsumerProtocol.CONSUMER_PROTOCOL_HEADER_SCHEMA);
+        headerV100.set(ConsumerProtocol.VERSION_KEY_NAME, version);
+
+        ByteBuffer buffer = ByteBuffer.allocate(subscriptionV100.sizeOf() + headerV100.sizeOf());
+        headerV100.writeTo(buffer);
+        subscriptionV100.writeTo(buffer);
+
+        buffer.flip();
+
+        Subscription subscription = ConsumerProtocol.deserializeSubscription(buffer);
+        assertEquals(Arrays.asList("topic"), subscription.topics());
+    }
+
+    @Test
+    public void serializeDeserializeAssignment() {
+        List<TopicPartition> partitions = Arrays.asList(new TopicPartition("foo", 0), new TopicPartition("bar", 2));
+        ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions));
+        PartitionAssignor.Assignment parsedAssignment = ConsumerProtocol.deserializeAssignment(buffer);
+        assertEquals(toSet(partitions), toSet(parsedAssignment.partitions()));
+    }
+
+    @Test
+    public void deserializeNewAssignmentVersion() {
+        // verify that a new version which adds a field is still parseable
+        short version = 100;
+
+        Schema assignmentSchemaV100 = new Schema(
+                new Field(ConsumerProtocol.TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(ConsumerProtocol.TOPIC_ASSIGNMENT_V0)),
+                new Field(ConsumerProtocol.USER_DATA_KEY_NAME, Type.BYTES),
+                new Field("foo", Type.STRING));
+
+        Struct assignmentV100 = new Struct(assignmentSchemaV100);
+        assignmentV100.set(ConsumerProtocol.TOPIC_PARTITIONS_KEY_NAME,
+                new Object[]{new Struct(ConsumerProtocol.TOPIC_ASSIGNMENT_V0)
+                        .set(ConsumerProtocol.TOPIC_KEY_NAME, "foo")
+                        .set(ConsumerProtocol.PARTITIONS_KEY_NAME, new Object[]{1})});
+        assignmentV100.set(ConsumerProtocol.USER_DATA_KEY_NAME, ByteBuffer.wrap(new byte[0]));
+        assignmentV100.set("foo", "bar");
+
+        Struct headerV100 = new Struct(ConsumerProtocol.CONSUMER_PROTOCOL_HEADER_SCHEMA);
+        headerV100.set(ConsumerProtocol.VERSION_KEY_NAME, version);
+
+        ByteBuffer buffer = ByteBuffer.allocate(assignmentV100.sizeOf() + headerV100.sizeOf());
+        headerV100.writeTo(buffer);
+        assignmentV100.writeTo(buffer);
+
+        buffer.flip();
+
+        PartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment(buffer);
+        assertEquals(toSet(Arrays.asList(new TopicPartition("foo", 1))), toSet(assignment.partitions()));
+    }
+
+    private static <T> Set<T> toSet(Collection<T> collection) {
+        return new HashSet<>(collection);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
deleted file mode 100644
index 66b2e32..0000000
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
+++ /dev/null
@@ -1,635 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer.internals;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.kafka.clients.ClientRequest;
-import org.apache.kafka.clients.Metadata;
-import org.apache.kafka.clients.MockClient;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetCommitCallback;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.ApiException;
-import org.apache.kafka.common.errors.DisconnectException;
-import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.requests.ConsumerMetadataResponse;
-import org.apache.kafka.common.requests.HeartbeatResponse;
-import org.apache.kafka.common.requests.JoinGroupResponse;
-import org.apache.kafka.common.requests.OffsetCommitRequest;
-import org.apache.kafka.common.requests.OffsetCommitResponse;
-import org.apache.kafka.common.requests.OffsetFetchResponse;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.test.TestUtils;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-
-public class CoordinatorTest {
-
-    private String topicName = "test";
-    private String groupId = "test-group";
-    private TopicPartition tp = new TopicPartition(topicName, 0);
-    private int sessionTimeoutMs = 10;
-    private int heartbeatIntervalMs = 2;
-    private long retryBackoffMs = 100;
-    private long requestTimeoutMs = 5000;
-    private boolean autoCommitEnabled = false;
-    private long autoCommitIntervalMs = 5000;
-    private String rebalanceStrategy = "not-matter";
-    private MockTime time;
-    private MockClient client;
-    private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
-    private Node node = cluster.nodes().get(0);
-    private SubscriptionState subscriptions;
-    private Metadata metadata;
-    private Metrics metrics;
-    private Map<String, String> metricTags = new LinkedHashMap<String, String>();
-    private ConsumerNetworkClient consumerClient;
-    private MockRebalanceListener subscriptionListener;
-    private MockCommitCallback defaultOffsetCommitCallback;
-    private Coordinator coordinator;
-
-    @Before
-    public void setup() {
-        this.time = new MockTime();
-        this.client = new MockClient(time);
-        this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
-        this.metadata = new Metadata(0, Long.MAX_VALUE);
-        this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
-        this.metrics = new Metrics(time);
-        this.subscriptionListener = new MockRebalanceListener();
-        this.defaultOffsetCommitCallback = new MockCommitCallback();
-
-        client.setNode(node);
-
-        this.coordinator = new Coordinator(consumerClient,
-                groupId,
-                sessionTimeoutMs,
-                heartbeatIntervalMs,
-                rebalanceStrategy,
-                subscriptions,
-                metrics,
-                "consumer" + groupId,
-                metricTags,
-                time,
-                requestTimeoutMs,
-                retryBackoffMs,
-                defaultOffsetCommitCallback,
-                autoCommitEnabled,
-                autoCommitIntervalMs);
-    }
-
-    @After
-    public void teardown() {
-        this.metrics.close();
-    }
-
-    @Test
-    public void testNormalHeartbeat() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        // normal heartbeat
-        time.sleep(sessionTimeoutMs);
-        RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
-        assertEquals(1, consumerClient.pendingRequestCount());
-        assertFalse(future.isDone());
-
-        client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
-        consumerClient.poll(0);
-
-        assertTrue(future.isDone());
-        assertTrue(future.succeeded());
-    }
-
-    @Test
-    public void testCoordinatorNotAvailable() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        // consumer_coordinator_not_available will mark coordinator as unknown
-        time.sleep(sessionTimeoutMs);
-        RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
-        assertEquals(1, consumerClient.pendingRequestCount());
-        assertFalse(future.isDone());
-
-        client.prepareResponse(heartbeatResponse(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()));
-        time.sleep(sessionTimeoutMs);
-        consumerClient.poll(0);
-
-        assertTrue(future.isDone());
-        assertTrue(future.failed());
-        assertEquals(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception(), future.exception());
-        assertTrue(coordinator.coordinatorUnknown());
-    }
-
-    @Test
-    public void testNotCoordinator() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        // not_coordinator will mark coordinator as unknown
-        time.sleep(sessionTimeoutMs);
-        RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
-        assertEquals(1, consumerClient.pendingRequestCount());
-        assertFalse(future.isDone());
-
-        client.prepareResponse(heartbeatResponse(Errors.NOT_COORDINATOR_FOR_CONSUMER.code()));
-        time.sleep(sessionTimeoutMs);
-        consumerClient.poll(0);
-
-        assertTrue(future.isDone());
-        assertTrue(future.failed());
-        assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.exception(), future.exception());
-        assertTrue(coordinator.coordinatorUnknown());
-    }
-
-    @Test
-    public void testIllegalGeneration() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        // illegal_generation will cause re-partition
-        subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
-        subscriptions.changePartitionAssignment(Collections.singletonList(tp));
-
-        time.sleep(sessionTimeoutMs);
-        RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
-        assertEquals(1, consumerClient.pendingRequestCount());
-        assertFalse(future.isDone());
-
-        client.prepareResponse(heartbeatResponse(Errors.ILLEGAL_GENERATION.code()));
-        time.sleep(sessionTimeoutMs);
-        consumerClient.poll(0);
-
-        assertTrue(future.isDone());
-        assertTrue(future.failed());
-        assertEquals(Errors.ILLEGAL_GENERATION.exception(), future.exception());
-        assertTrue(subscriptions.partitionAssignmentNeeded());
-    }
-
-    @Test
-    public void testUnknownConsumerId() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        // illegal_generation will cause re-partition
-        subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
-        subscriptions.changePartitionAssignment(Collections.singletonList(tp));
-
-        time.sleep(sessionTimeoutMs);
-        RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
-        assertEquals(1, consumerClient.pendingRequestCount());
-        assertFalse(future.isDone());
-
-        client.prepareResponse(heartbeatResponse(Errors.UNKNOWN_CONSUMER_ID.code()));
-        time.sleep(sessionTimeoutMs);
-        consumerClient.poll(0);
-
-        assertTrue(future.isDone());
-        assertTrue(future.failed());
-        assertEquals(Errors.UNKNOWN_CONSUMER_ID.exception(), future.exception());
-        assertTrue(subscriptions.partitionAssignmentNeeded());
-    }
-
-    @Test
-    public void testCoordinatorDisconnect() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        // coordinator disconnect will mark coordinator as unknown
-        time.sleep(sessionTimeoutMs);
-        RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
-        assertEquals(1, consumerClient.pendingRequestCount());
-        assertFalse(future.isDone());
-
-        client.prepareResponse(heartbeatResponse(Errors.NONE.code()), true); // return disconnected
-        time.sleep(sessionTimeoutMs);
-        consumerClient.poll(0);
-
-        assertTrue(future.isDone());
-        assertTrue(future.failed());
-        assertTrue(future.exception() instanceof DisconnectException);
-        assertTrue(coordinator.coordinatorUnknown());
-    }
-
-    @Test
-    public void testNormalJoinGroup() {
-        subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
-        subscriptions.needReassignment();
-
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        // normal join group
-        client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()));
-        coordinator.ensurePartitionAssignment();
-
-        assertFalse(subscriptions.partitionAssignmentNeeded());
-        assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
-        assertEquals(1, subscriptionListener.revokedCount);
-        assertEquals(Collections.emptySet(), subscriptionListener.revoked);
-        assertEquals(1, subscriptionListener.assignedCount);
-        assertEquals(Collections.singleton(tp), subscriptionListener.assigned);
-    }
-
-    @Test
-    public void testReJoinGroup() {
-        subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
-        subscriptions.needReassignment();
-
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        // disconnected from original coordinator will cause re-discover and join again
-        client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()), true);
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()));
-        coordinator.ensurePartitionAssignment();
-        assertFalse(subscriptions.partitionAssignmentNeeded());
-        assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
-        assertEquals(1, subscriptionListener.revokedCount);
-        assertEquals(Collections.emptySet(), subscriptionListener.revoked);
-        assertEquals(1, subscriptionListener.assignedCount);
-        assertEquals(Collections.singleton(tp), subscriptionListener.assigned);
-    }
-
-    @Test(expected = ApiException.class)
-    public void testUnknownPartitionAssignmentStrategy() {
-        subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
-        subscriptions.needReassignment();
-
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        // coordinator doesn't like our assignment strategy
-        client.prepareResponse(joinGroupResponse(0, "consumer", Collections.<TopicPartition>emptyList(), Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code()));
-        coordinator.ensurePartitionAssignment();
-    }
-
-    @Test(expected = ApiException.class)
-    public void testInvalidSessionTimeout() {
-        subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
-        subscriptions.needReassignment();
-
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        // coordinator doesn't like our assignment strategy
-        client.prepareResponse(joinGroupResponse(0, "consumer", Collections.<TopicPartition>emptyList(), Errors.INVALID_SESSION_TIMEOUT.code()));
-        coordinator.ensurePartitionAssignment();
-    }
-
-    @Test
-    public void testCommitOffsetOnly() {
-        subscriptions.assign(Arrays.asList(tp));
-
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
-
-        AtomicBoolean success = new AtomicBoolean(false);
-        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), callback(success));
-        consumerClient.poll(0);
-        assertTrue(success.get());
-
-        assertEquals(100L, subscriptions.committed(tp).offset());
-    }
-
-    @Test
-    public void testCommitOffsetMetadata() {
-        subscriptions.assign(Arrays.asList(tp));
-
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
-
-        AtomicBoolean success = new AtomicBoolean(false);
-        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "hello")), callback(success));
-        consumerClient.poll(0);
-        assertTrue(success.get());
-
-        assertEquals(100L, subscriptions.committed(tp).offset());
-        assertEquals("hello", subscriptions.committed(tp).metadata());
-    }
-
-    @Test
-    public void testCommitOffsetAsyncWithDefaultCallback() {
-        int invokedBeforeTest = defaultOffsetCommitCallback.invoked;
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
-        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null);
-        consumerClient.poll(0);
-        assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
-        assertNull(defaultOffsetCommitCallback.exception);
-    }
-
-    @Test
-    public void testResetGeneration() {
-        // enable auto-assignment
-        subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
-
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()));
-        coordinator.ensurePartitionAssignment();
-
-        // now switch to manual assignment
-        subscriptions.unsubscribe();
-        coordinator.resetGeneration();
-        subscriptions.assign(Arrays.asList(tp));
-
-        // the client should not reuse generation/consumerId from auto-subscribed generation
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(ClientRequest request) {
-                OffsetCommitRequest commitRequest = new OffsetCommitRequest(request.request().body());
-                return commitRequest.consumerId().equals(OffsetCommitRequest.DEFAULT_CONSUMER_ID) &&
-                        commitRequest.generationId() == OffsetCommitRequest.DEFAULT_GENERATION_ID;
-            }
-        }, offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
-
-        AtomicBoolean success = new AtomicBoolean(false);
-        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), callback(success));
-        consumerClient.poll(0);
-        assertTrue(success.get());
-    }
-
-    @Test
-    public void testCommitOffsetAsyncFailedWithDefaultCallback() {
-        int invokedBeforeTest = defaultOffsetCommitCallback.invoked;
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code())));
-        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null);
-        consumerClient.poll(0);
-        assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
-        assertEquals(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception(), defaultOffsetCommitCallback.exception);
-    }
-
-    @Test
-    public void testCommitOffsetAsyncCoordinatorNotAvailable() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        // async commit with coordinator not available
-        MockCommitCallback cb = new MockCommitCallback();
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code())));
-        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
-        consumerClient.poll(0);
-
-        assertTrue(coordinator.coordinatorUnknown());
-        assertEquals(1, cb.invoked);
-        assertEquals(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception(), cb.exception);
-    }
-
-    @Test
-    public void testCommitOffsetAsyncNotCoordinator() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        // async commit with not coordinator
-        MockCommitCallback cb = new MockCommitCallback();
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code())));
-        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
-        consumerClient.poll(0);
-
-        assertTrue(coordinator.coordinatorUnknown());
-        assertEquals(1, cb.invoked);
-        assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.exception(), cb.exception);
-    }
-
-    @Test
-    public void testCommitOffsetAsyncDisconnected() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        // async commit with coordinator disconnected
-        MockCommitCallback cb = new MockCommitCallback();
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
-        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
-        consumerClient.poll(0);
-
-        assertTrue(coordinator.coordinatorUnknown());
-        assertEquals(1, cb.invoked);
-        assertTrue(cb.exception instanceof DisconnectException);
-    }
-
-    @Test
-    public void testCommitOffsetSyncNotCoordinator() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code())));
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
-        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
-    }
-
-    @Test
-    public void testCommitOffsetSyncCoordinatorNotAvailable() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code())));
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
-        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
-    }
-
-    @Test
-    public void testCommitOffsetSyncCoordinatorDisconnected() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
-        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
-    }
-
-    @Test(expected = OffsetMetadataTooLarge.class)
-    public void testCommitOffsetMetadataTooLarge() {
-        // since offset metadata is provided by the user, we have to propagate the exception so they can handle it
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.OFFSET_METADATA_TOO_LARGE.code())));
-        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")));
-    }
-
-    @Test(expected = ApiException.class)
-    public void testCommitOffsetSyncCallbackWithNonRetriableException() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        // sync commit with invalid partitions should throw if we have no callback
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.COMMITTING_PARTITIONS_NOT_ASSIGNED.code())), false);
-        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
-    }
-
-    @Test
-    public void testRefreshOffset() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        subscriptions.assign(Arrays.asList(tp));
-        subscriptions.needRefreshCommits();
-        client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
-        coordinator.refreshCommittedOffsetsIfNeeded();
-        assertFalse(subscriptions.refreshCommitsNeeded());
-        assertEquals(100L, subscriptions.committed(tp).offset());
-    }
-
-    @Test
-    public void testRefreshOffsetLoadInProgress() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        subscriptions.assign(Arrays.asList(tp));
-        subscriptions.needRefreshCommits();
-        client.prepareResponse(offsetFetchResponse(tp, Errors.OFFSET_LOAD_IN_PROGRESS.code(), "", 100L));
-        client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
-        coordinator.refreshCommittedOffsetsIfNeeded();
-        assertFalse(subscriptions.refreshCommitsNeeded());
-        assertEquals(100L, subscriptions.committed(tp).offset());
-    }
-
-    @Test
-    public void testRefreshOffsetNotCoordinatorForConsumer() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        subscriptions.assign(Arrays.asList(tp));
-        subscriptions.needRefreshCommits();
-        client.prepareResponse(offsetFetchResponse(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code(), "", 100L));
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
-        coordinator.refreshCommittedOffsetsIfNeeded();
-        assertFalse(subscriptions.refreshCommitsNeeded());
-        assertEquals(100L, subscriptions.committed(tp).offset());
-    }
-
-    @Test
-    public void testRefreshOffsetWithNoFetchableOffsets() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        subscriptions.assign(Arrays.asList(tp));
-        subscriptions.needRefreshCommits();
-        client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
-        coordinator.refreshCommittedOffsetsIfNeeded();
-        assertFalse(subscriptions.refreshCommitsNeeded());
-        assertEquals(null, subscriptions.committed(tp));
-    }
-
-    private Struct consumerMetadataResponse(Node node, short error) {
-        ConsumerMetadataResponse response = new ConsumerMetadataResponse(error, node);
-        return response.toStruct();
-    }
-
-    private Struct heartbeatResponse(short error) {
-        HeartbeatResponse response = new HeartbeatResponse(error);
-        return response.toStruct();
-    }
-
-    private Struct joinGroupResponse(int generationId, String consumerId, List<TopicPartition> assignedPartitions, short error) {
-        JoinGroupResponse response = new JoinGroupResponse(error, generationId, consumerId, assignedPartitions);
-        return response.toStruct();
-    }
-
-    private Struct offsetCommitResponse(Map<TopicPartition, Short> responseData) {
-        OffsetCommitResponse response = new OffsetCommitResponse(responseData);
-        return response.toStruct();
-    }
-
-    private Struct offsetFetchResponse(TopicPartition tp, Short error, String metadata, long offset) {
-        OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, metadata, error);
-        OffsetFetchResponse response = new OffsetFetchResponse(Collections.singletonMap(tp, data));
-        return response.toStruct();
-    }
-
-    private OffsetCommitCallback callback(final AtomicBoolean success) {
-        return new OffsetCommitCallback() {
-            @Override
-            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
-                if (exception == null)
-                    success.set(true);
-            }
-        };
-    }
-
-    private static class MockCommitCallback implements OffsetCommitCallback {
-        public int invoked = 0;
-        public Exception exception = null;
-
-        @Override
-        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
-            invoked++;
-            this.exception = exception;
-        }
-    }
-
-    private static class MockRebalanceListener implements ConsumerRebalanceListener {
-        public Collection<TopicPartition> revoked;
-        public Collection<TopicPartition> assigned;
-        public int revokedCount = 0;
-        public int assignedCount = 0;
-
-
-        @Override
-        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-            this.assigned = partitions;
-            assignedCount++;
-        }
-
-        @Override
-        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-            this.revoked = partitions;
-            revokedCount++;
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 4929449..8773f8c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -350,11 +350,11 @@ public class FetcherTest {
 
     @Test
     public void testGetAllTopics() throws InterruptedException {
-        // sending response before request, as getAllTopics is a blocking call
+        // sending response before request, as getTopicMetadata is a blocking call
         client.prepareResponse(
             new MetadataResponse(cluster, Collections.<String, Errors>emptyMap()).toStruct());
 
-        Map<String, List<PartitionInfo>> allTopics = fetcher.getAllTopics(5000L);
+        Map<String, List<PartitionInfo>> allTopics = fetcher.getAllTopicMetadata(5000L);
 
         assertEquals(cluster.topics().size(), allTopics.size());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java
new file mode 100644
index 0000000..40ae661
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.List;
+import java.util.Map;
+
+public class MockPartitionAssignor extends AbstractPartitionAssignor {
+
+    private Map<String, List<TopicPartition>> result = null;
+
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+                                                    Map<String, List<String>> subscriptions) {
+        if (result == null)
+            throw new IllegalStateException("Call to assign with no result prepared");
+        return result;
+    }
+
+    @Override
+    public String name() {
+        return "consumer-mock-assignor";
+    }
+
+    public void clear() {
+        this.result = null;
+    }
+
+    public void prepare(Map<String, List<TopicPartition>> result) {
+        this.result = result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index cabf591..fb21802 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -29,9 +29,9 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.HashSet;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
@@ -151,11 +151,11 @@ public class RequestResponseTest {
     }
 
     private AbstractRequest createConsumerMetadataRequest() {
-        return new ConsumerMetadataRequest("test-group");
+        return new GroupMetadataRequest("test-group");
     }
 
     private AbstractRequestResponse createConsumerMetadataResponse() {
-        return new ConsumerMetadataResponse((short) 1, new Node(10, "host1", 2014));
+        return new GroupMetadataResponse(Errors.NONE.code(), new Node(10, "host1", 2014));
     }
 
     private AbstractRequest createFetchRequest() {
@@ -180,11 +180,17 @@ public class RequestResponseTest {
     }
 
     private AbstractRequest createJoinGroupRequest() {
-        return new JoinGroupRequest("group1", 30000, Arrays.asList("topic1"), "consumer1", "strategy1");
+        ByteBuffer metadata = ByteBuffer.wrap(new byte[] {});
+        List<JoinGroupRequest.GroupProtocol> protocols = new ArrayList<>();
+        protocols.add(new JoinGroupRequest.GroupProtocol("consumer-range", metadata));
+        return new JoinGroupRequest("group1", 30000, "consumer1", "consumer", protocols);
     }
 
     private AbstractRequestResponse createJoinGroupResponse() {
-        return new JoinGroupResponse(Errors.NONE.code(), 1, "consumer1", Arrays.asList(new TopicPartition("test11", 1), new TopicPartition("test2", 1)));
+        Map<String, ByteBuffer> members = new HashMap<>();
+        members.put("consumer1", ByteBuffer.wrap(new byte[]{}));
+        members.put("consumer2", ByteBuffer.wrap(new byte[]{}));
+        return new JoinGroupResponse(Errors.NONE.code(), 1, "range", "consumer1", "leader", members);
     }
 
     private AbstractRequest createLeaveGroupRequest() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
index 1ff5e73..eb62c9e 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
@@ -340,7 +340,7 @@ public class KafkaBasedLogTest {
                 consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
-                        consumer.setException(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception());
+                        consumer.setException(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception());
                     }
                 });
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/admin/AclCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index 6a8a8a2..fd6d420 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -31,7 +31,7 @@ object AclCommand {
   val Newline = scala.util.Properties.lineSeparator
   val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] (
     Topic -> Set(Read, Write, Describe),
-    ConsumerGroup -> Set(Read),
+    Group -> Set(Read),
     Cluster -> Set(Create, ClusterAction)
   )
 
@@ -146,14 +146,14 @@ object AclCommand {
     val resources = getResource(opts)
 
     val topics: Set[Resource] = getResource(opts).filter(_.resourceType == Topic)
-    val consumerGroups: Set[Resource] = resources.filter(_.resourceType == ConsumerGroup)
+    val groups: Set[Resource] = resources.filter(_.resourceType == Group)
 
     //Read,Describe on topic, Read on consumerGroup + Create on cluster
 
     val acls = getAcl(opts, Set(Read, Describe))
 
     topics.map(_ -> acls).toMap[Resource, Set[Acl]] ++
-      consumerGroups.map(_ -> getAcl(opts, Set(Read))).toMap[Resource, Set[Acl]]
+      groups.map(_ -> getAcl(opts, Set(Read))).toMap[Resource, Set[Acl]]
   }
 
   private def getCliResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = {
@@ -221,10 +221,10 @@ object AclCommand {
       resources += Resource.ClusterResource
 
     if (opts.options.has(opts.groupOpt))
-      opts.options.valuesOf(opts.groupOpt).asScala.foreach(consumerGroup => resources += new Resource(ConsumerGroup, consumerGroup.trim))
+      opts.options.valuesOf(opts.groupOpt).asScala.foreach(group => resources += new Resource(Group, group.trim))
 
     if (resources.isEmpty && dieIfNoResourceFound)
-      CommandLineUtils.printUsageAndDie(opts.parser, "You must provide at least one resource: --topic <topic> or --cluster or --consumer-group <group>")
+      CommandLineUtils.printUsageAndDie(opts.parser, "You must provide at least one resource: --topic <topic> or --cluster or --group <group>")
 
     resources
   }
@@ -266,16 +266,16 @@ object AclCommand {
       .withValuesSeparatedBy(Delimiter)
 
     val clusterOpt = parser.accepts("cluster", "Add/Remove cluster acls.")
-    val groupOpt = parser.accepts("consumer-group", "Comma separated list of consumer groups to which the acls should be added or removed. " +
-      "A value of * indicates the acls should apply to all consumer-groups.")
+    val groupOpt = parser.accepts("group", "Comma separated list of groups to which the acls should be added or removed. " +
+      "A value of * indicates the acls should apply to all groups.")
       .withRequiredArg
-      .describedAs("consumer-group")
+      .describedAs("group")
       .ofType(classOf[String])
       .withValuesSeparatedBy(Delimiter)
 
     val addOpt = parser.accepts("add", "Indicates you are trying to add acls.")
     val removeOpt = parser.accepts("remove", "Indicates you are trying to remove acls.")
-    val listOpt = parser.accepts("list", "List acls for the specified resource, use --topic <topic> or --consumer-group <group> or --cluster to specify a resource.")
+    val listOpt = parser.accepts("list", "List acls for the specified resource, use --topic <topic> or --group <group> or --cluster to specify a resource.")
 
     val operationsOpt = parser.accepts("operations", "Comma separated list of operations, default is All. Valid operation names are: " + Newline +
       Operation.values.map("\t" + _).mkString(Newline) + Newline)
@@ -320,7 +320,7 @@ object AclCommand {
       "This will generate acls that allows WRITE,DESCRIBE on topic and CREATE on cluster. ")
 
     val consumerOpt = parser.accepts("consumer", "Convenience option to add/remove acls for consumer role. " +
-      "This will generate acls that allows READ,DESCRIBE on topic and READ on consumer-group.")
+      "This will generate acls that allows READ,DESCRIBE on topic and READ on group.")
 
     val helpOpt = parser.accepts("help", "Print usage information.")
 
@@ -343,7 +343,7 @@ object AclCommand {
         CommandLineUtils.printUsageAndDie(parser, "With --producer you must specify a --topic")
 
       if (options.has(consumerOpt) && (!options.has(topicOpt) || !options.has(groupOpt) || (!options.has(producerOpt) && options.has(clusterOpt))))
-        CommandLineUtils.printUsageAndDie(parser, "With --consumer you must specify a --topic and a --consumer-group and no --cluster option should be specified.")
+        CommandLineUtils.printUsageAndDie(parser, "With --consumer you must specify a --topic and a --group and no --cluster option should be specified.")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index e6ca112..ed54aee 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -23,16 +23,15 @@ import kafka.common.{Topic, AdminCommandFailedException}
 import kafka.utils.CommandLineUtils
 import kafka.utils._
 import kafka.utils.ZkUtils._
-import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import scala.collection._
 import scala.collection.JavaConversions._
 import kafka.log.{Defaults, LogConfig}
 import kafka.consumer.{ConsumerConfig, Whitelist}
-import kafka.server.{ConfigType, OffsetManager}
+import kafka.server.ConfigType
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.security.JaasUtils
-import kafka.coordinator.ConsumerCoordinator
+import kafka.coordinator.GroupCoordinator
 
 
 object TopicCommand extends Logging {
@@ -130,7 +129,7 @@ object TopicCommand extends Logging {
       }
 
       if(opts.options.has(opts.partitionsOpt)) {
-        if (topic == ConsumerCoordinator.OffsetsTopicName) {
+        if (topic == GroupCoordinator.OffsetsTopicName) {
           throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.")
         }
         println("WARNING: If partitions are increased for a topic that has a key, the partition " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
deleted file mode 100644
index 258d5fe..0000000
--- a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.nio.ByteBuffer
-
-import kafka.common.ErrorMapping
-import kafka.network.{RequestOrResponseSend, RequestChannel}
-import kafka.network.RequestChannel.Response
-
-object ConsumerMetadataRequest {
-  val CurrentVersion = 0.shortValue
-  val DefaultClientId = ""
-
-  def readFrom(buffer: ByteBuffer) = {
-    // envelope
-    val versionId = buffer.getShort
-    val correlationId = buffer.getInt
-    val clientId = ApiUtils.readShortString(buffer)
-
-    // request
-    val group = ApiUtils.readShortString(buffer)
-    ConsumerMetadataRequest(group, versionId, correlationId, clientId)
-  }
-
-}
-
-case class ConsumerMetadataRequest(group: String,
-                                   versionId: Short = ConsumerMetadataRequest.CurrentVersion,
-                                   correlationId: Int = 0,
-                                   clientId: String = ConsumerMetadataRequest.DefaultClientId)
-  extends RequestOrResponse(Some(RequestKeys.ConsumerMetadataKey)) {
-
-  def sizeInBytes =
-    2 + /* versionId */
-    4 + /* correlationId */
-    ApiUtils.shortStringLength(clientId) +
-    ApiUtils.shortStringLength(group)
-
-  def writeTo(buffer: ByteBuffer) {
-    // envelope
-    buffer.putShort(versionId)
-    buffer.putInt(correlationId)
-    ApiUtils.writeShortString(buffer, clientId)
-
-    // consumer metadata request
-    ApiUtils.writeShortString(buffer, group)
-  }
-
-  override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
-    // return ConsumerCoordinatorNotAvailable for all uncaught errors
-    val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, correlationId)
-    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
-  }
-
-  def describe(details: Boolean) = {
-    val consumerMetadataRequest = new StringBuilder
-    consumerMetadataRequest.append("Name: " + this.getClass.getSimpleName)
-    consumerMetadataRequest.append("; Version: " + versionId)
-    consumerMetadataRequest.append("; CorrelationId: " + correlationId)
-    consumerMetadataRequest.append("; ClientId: " + clientId)
-    consumerMetadataRequest.append("; Group: " + group)
-    consumerMetadataRequest.toString()
-  }
-}
\ No newline at end of file