You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/08/17 18:50:09 UTC

[2/4] kafka git commit: KAFKA-3888: send consumer heartbeats from a background thread (KIP-62)

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 176571c..8ec8b75 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -26,13 +26,13 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
 import org.apache.kafka.clients.consumer.RoundRobinAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ApiException;
-import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
@@ -79,11 +79,12 @@ public class ConsumerCoordinatorTest {
     private String topicName = "test";
     private String groupId = "test-group";
     private TopicPartition tp = new TopicPartition(topicName, 0);
+    private int rebalanceTimeoutMs = 60000;
     private int sessionTimeoutMs = 10000;
     private int heartbeatIntervalMs = 5000;
     private long retryBackoffMs = 100;
     private boolean autoCommitEnabled = false;
-    private long autoCommitIntervalMs = 2000;
+    private int autoCommitIntervalMs = 2000;
     private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor();
     private List<PartitionAssignor> assignors = Collections.<PartitionAssignor>singletonList(partitionAssignor);
     private MockTime time;
@@ -123,7 +124,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testNormalHeartbeat() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // normal heartbeat
@@ -141,7 +142,7 @@ public class ConsumerCoordinatorTest {
 
     @Test(expected = GroupAuthorizationException.class)
     public void testGroupDescribeUnauthorized() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.GROUP_AUTHORIZATION_FAILED.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.GROUP_AUTHORIZATION_FAILED.code()));
         coordinator.ensureCoordinatorReady();
     }
 
@@ -149,17 +150,17 @@ public class ConsumerCoordinatorTest {
     public void testGroupReadUnauthorized() {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
 
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupLeaderResponse(0, "memberId", Collections.<String, List<String>>emptyMap(),
                 Errors.GROUP_AUTHORIZATION_FAILED.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
     }
 
     @Test
     public void testCoordinatorNotAvailable() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // GROUP_COORDINATOR_NOT_AVAILABLE will mark coordinator as unknown
@@ -180,7 +181,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testNotCoordinator() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // not_coordinator will mark coordinator as unknown
@@ -201,7 +202,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testIllegalGeneration() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // illegal_generation will cause re-partition
@@ -225,7 +226,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testUnknownConsumerId() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // illegal_generation will cause re-partition
@@ -249,7 +250,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCoordinatorDisconnect() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // coordinator disconnect will mark coordinator as unknown
@@ -279,12 +280,12 @@ public class ConsumerCoordinatorTest {
         metadata.setTopics(Arrays.asList(topicName));
         metadata.update(cluster, time.milliseconds());
 
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupLeaderResponse(0, consumerId, Collections.<String, List<String>>emptyMap(),
                 Errors.INVALID_GROUP_ID.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
     }
 
     @Test
@@ -298,7 +299,7 @@ public class ConsumerCoordinatorTest {
         metadata.setTopics(Arrays.asList(topicName));
         metadata.update(cluster, time.milliseconds());
 
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // normal join group
@@ -315,7 +316,7 @@ public class ConsumerCoordinatorTest {
                         sync.groupAssignment().containsKey(consumerId);
             }
         }, syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         assertFalse(subscriptions.partitionAssignmentNeeded());
         assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
@@ -336,7 +337,7 @@ public class ConsumerCoordinatorTest {
         metadata.setTopics(Arrays.asList(topicName));
         metadata.update(cluster, time.milliseconds());
 
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, Arrays.asList(topicName));
@@ -347,14 +348,14 @@ public class ConsumerCoordinatorTest {
         consumerClient.wakeup();
 
         try {
-            coordinator.ensurePartitionAssignment();
+            coordinator.poll(time.milliseconds());
         } catch (WakeupException e) {
             // ignore
         }
 
         // now complete the second half
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         assertFalse(subscriptions.partitionAssignmentNeeded());
         assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
@@ -371,7 +372,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // normal join group
@@ -386,7 +387,7 @@ public class ConsumerCoordinatorTest {
             }
         }, syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
 
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         assertFalse(subscriptions.partitionAssignmentNeeded());
         assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
@@ -402,12 +403,12 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         final AtomicBoolean received = new AtomicBoolean(false);
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -430,12 +431,12 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         final AtomicBoolean received = new AtomicBoolean(false);
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -449,8 +450,9 @@ public class ConsumerCoordinatorTest {
         }, new LeaveGroupResponse(Errors.NONE.code()).toStruct());
         coordinator.maybeLeaveGroup();
         assertTrue(received.get());
-        assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, coordinator.memberId);
-        assertEquals(OffsetCommitRequest.DEFAULT_GENERATION_ID, coordinator.generation);
+
+        AbstractCoordinator.Generation generation = coordinator.generation();
+        assertNull(generation);
     }
 
     @Test(expected = KafkaException.class)
@@ -460,13 +462,13 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // join initially, but let coordinator rebalance on sync
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.UNKNOWN.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
     }
 
     @Test
@@ -476,7 +478,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // join initially, but let coordinator returns unknown member id
@@ -493,7 +495,7 @@ public class ConsumerCoordinatorTest {
         }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
 
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         assertFalse(subscriptions.partitionAssignmentNeeded());
         assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
@@ -506,7 +508,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // join initially, but let coordinator rebalance on sync
@@ -517,7 +519,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
 
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         assertFalse(subscriptions.partitionAssignmentNeeded());
         assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
@@ -530,7 +532,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // join initially, but let coordinator rebalance on sync
@@ -547,7 +549,7 @@ public class ConsumerCoordinatorTest {
         }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
 
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         assertFalse(subscriptions.partitionAssignmentNeeded());
         assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
@@ -560,13 +562,13 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
 
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         assertFalse(subscriptions.partitionAssignmentNeeded());
 
@@ -595,7 +597,7 @@ public class ConsumerCoordinatorTest {
         // we only have metadata for one topic initially
         metadata.update(TestUtils.singletonCluster(topic1, 1), time.milliseconds());
 
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // prepare initial rebalance
@@ -625,7 +627,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp1, tp2), Errors.NONE.code()));
 
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         assertFalse(subscriptions.partitionAssignmentNeeded());
         assertEquals(new HashSet<>(Arrays.asList(tp1, tp2)), subscriptions.assignedPartitions());
@@ -656,13 +658,13 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // join the group once
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         assertEquals(1, rebalanceListener.revokedCount);
         assertEquals(1, rebalanceListener.assignedCount);
@@ -671,7 +673,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.needReassignment();
         client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         assertEquals(2, rebalanceListener.revokedCount);
         assertEquals(Collections.singleton(tp), rebalanceListener.revoked);
@@ -684,15 +686,15 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // disconnected from original coordinator will cause re-discover and join again
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()), true);
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
         assertFalse(subscriptions.partitionAssignmentNeeded());
         assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
         assertEquals(1, rebalanceListener.revokedCount);
@@ -705,25 +707,26 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // coordinator doesn't like the session timeout
         client.prepareResponse(joinGroupFollowerResponse(0, "consumer", "", Errors.INVALID_SESSION_TIMEOUT.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
     }
 
     @Test
     public void testCommitOffsetOnly() {
         subscriptions.assignFromUser(Arrays.asList(tp));
 
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
 
         AtomicBoolean success = new AtomicBoolean(false);
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), callback(success));
+        coordinator.invokeCompletedOffsetCommitCallbacks();
         assertTrue(success.get());
 
         assertEquals(100L, subscriptions.committed(tp).offset());
@@ -739,18 +742,18 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         subscriptions.seek(tp, 100);
 
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
         time.sleep(autoCommitIntervalMs);
-        consumerClient.poll(0);
+        coordinator.poll(time.milliseconds());
 
         assertEquals(100L, subscriptions.committed(tp).offset());
     }
@@ -765,7 +768,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // haven't joined, so should not cause a commit
@@ -774,13 +777,13 @@ public class ConsumerCoordinatorTest {
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         subscriptions.seek(tp, 100);
 
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
         time.sleep(autoCommitIntervalMs);
-        consumerClient.poll(0);
+        coordinator.poll(time.milliseconds());
 
         assertEquals(100L, subscriptions.committed(tp).offset());
     }
@@ -793,12 +796,12 @@ public class ConsumerCoordinatorTest {
         subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.seek(tp, 100);
 
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
         time.sleep(autoCommitIntervalMs);
-        consumerClient.poll(0);
+        coordinator.poll(time.milliseconds());
 
         assertEquals(100L, subscriptions.committed(tp).offset());
     }
@@ -819,13 +822,13 @@ public class ConsumerCoordinatorTest {
         assertNull(subscriptions.committed(tp));
 
         // now find the coordinator
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // sleep only for the retry backoff
         time.sleep(retryBackoffMs);
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
-        consumerClient.poll(0);
+        coordinator.poll(time.milliseconds());
 
         assertEquals(100L, subscriptions.committed(tp).offset());
     }
@@ -834,13 +837,14 @@ public class ConsumerCoordinatorTest {
     public void testCommitOffsetMetadata() {
         subscriptions.assignFromUser(Arrays.asList(tp));
 
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
 
         AtomicBoolean success = new AtomicBoolean(false);
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "hello")), callback(success));
+        coordinator.invokeCompletedOffsetCommitCallbacks();
         assertTrue(success.get());
 
         assertEquals(100L, subscriptions.committed(tp).offset());
@@ -850,10 +854,11 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCommitOffsetAsyncWithDefaultCallback() {
         int invokedBeforeTest = defaultOffsetCommitCallback.invoked;
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null);
+        coordinator.invokeCompletedOffsetCommitCallbacks();
         assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
         assertNull(defaultOffsetCommitCallback.exception);
     }
@@ -863,12 +868,12 @@ public class ConsumerCoordinatorTest {
         // enable auto-assignment
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
 
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         // now switch to manual assignment
         client.prepareResponse(new LeaveGroupResponse(Errors.NONE.code()).toStruct());
@@ -888,29 +893,32 @@ public class ConsumerCoordinatorTest {
 
         AtomicBoolean success = new AtomicBoolean(false);
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), callback(success));
+        coordinator.invokeCompletedOffsetCommitCallbacks();
         assertTrue(success.get());
     }
 
     @Test
     public void testCommitOffsetAsyncFailedWithDefaultCallback() {
         int invokedBeforeTest = defaultOffsetCommitCallback.invoked;
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null);
+        coordinator.invokeCompletedOffsetCommitCallbacks();
         assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
         assertTrue(defaultOffsetCommitCallback.exception instanceof RetriableCommitFailedException);
     }
 
     @Test
     public void testCommitOffsetAsyncCoordinatorNotAvailable() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // async commit with coordinator not available
         MockCommitCallback cb = new MockCommitCallback();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
+        coordinator.invokeCompletedOffsetCommitCallbacks();
 
         assertTrue(coordinator.coordinatorUnknown());
         assertEquals(1, cb.invoked);
@@ -919,13 +927,14 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCommitOffsetAsyncNotCoordinator() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // async commit with not coordinator
         MockCommitCallback cb = new MockCommitCallback();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code())));
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
+        coordinator.invokeCompletedOffsetCommitCallbacks();
 
         assertTrue(coordinator.coordinatorUnknown());
         assertEquals(1, cb.invoked);
@@ -934,13 +943,14 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCommitOffsetAsyncDisconnected() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // async commit with coordinator disconnected
         MockCommitCallback cb = new MockCommitCallback();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
+        coordinator.invokeCompletedOffsetCommitCallbacks();
 
         assertTrue(coordinator.coordinatorUnknown());
         assertEquals(1, cb.invoked);
@@ -949,36 +959,36 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCommitOffsetSyncNotCoordinator() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code())));
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
         coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
     }
 
     @Test
     public void testCommitOffsetSyncCoordinatorNotAvailable() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
         coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
     }
 
     @Test
     public void testCommitOffsetSyncCoordinatorDisconnected() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
         coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
     }
@@ -986,7 +996,7 @@ public class ConsumerCoordinatorTest {
     @Test(expected = OffsetMetadataTooLarge.class)
     public void testCommitOffsetMetadataTooLarge() {
         // since offset metadata is provided by the user, we have to propagate the exception so they can handle it
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.OFFSET_METADATA_TOO_LARGE.code())));
@@ -996,7 +1006,7 @@ public class ConsumerCoordinatorTest {
     @Test(expected = CommitFailedException.class)
     public void testCommitOffsetIllegalGeneration() {
         // we cannot retry if a rebalance occurs before the commit completed
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.ILLEGAL_GENERATION.code())));
@@ -1006,7 +1016,7 @@ public class ConsumerCoordinatorTest {
     @Test(expected = CommitFailedException.class)
     public void testCommitOffsetUnknownMemberId() {
         // we cannot retry if a rebalance occurs before the commit completed
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.UNKNOWN_MEMBER_ID.code())));
@@ -1016,7 +1026,7 @@ public class ConsumerCoordinatorTest {
     @Test(expected = CommitFailedException.class)
     public void testCommitOffsetRebalanceInProgress() {
         // we cannot retry if a rebalance occurs before the commit completed
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.REBALANCE_IN_PROGRESS.code())));
@@ -1025,7 +1035,7 @@ public class ConsumerCoordinatorTest {
 
     @Test(expected = KafkaException.class)
     public void testCommitOffsetSyncCallbackWithNonRetriableException() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // sync commit with invalid partitions should throw if we have no callback
@@ -1035,7 +1045,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testRefreshOffset() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         subscriptions.assignFromUser(Arrays.asList(tp));
@@ -1048,7 +1058,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testRefreshOffsetLoadInProgress() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         subscriptions.assignFromUser(Arrays.asList(tp));
@@ -1062,13 +1072,13 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testRefreshOffsetNotCoordinatorForConsumer() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.needRefreshCommits();
         client.prepareResponse(offsetFetchResponse(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code(), "", 100L));
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded();
         assertFalse(subscriptions.refreshCommitsNeeded());
@@ -1077,7 +1087,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testRefreshOffsetWithNoFetchableOffsets() {
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         subscriptions.assignFromUser(Arrays.asList(tp));
@@ -1122,12 +1132,12 @@ public class ConsumerCoordinatorTest {
         metadata.setTopics(topics);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds());
         assertTrue("Topic not found in metadata", metadata.containsTopic(topicName));
@@ -1150,6 +1160,7 @@ public class ConsumerCoordinatorTest {
         return new ConsumerCoordinator(
                 consumerClient,
                 groupId,
+                rebalanceTimeoutMs,
                 sessionTimeoutMs,
                 heartbeatIntervalMs,
                 assignors,
@@ -1166,7 +1177,7 @@ public class ConsumerCoordinatorTest {
                 excludeInternalTopics);
     }
 
-    private Struct consumerMetadataResponse(Node node, short error) {
+    private Struct groupCoordinatorResponse(Node node, short error) {
         GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node);
         return response.toStruct();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index f0f2a97..8dcbde2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -15,9 +15,9 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
-import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -76,22 +76,6 @@ public class ConsumerNetworkClientTest {
     }
 
     @Test
-    public void schedule() {
-        TestDelayedTask task = new TestDelayedTask();
-        consumerClient.schedule(task, time.milliseconds());
-        consumerClient.poll(0);
-        assertEquals(1, task.executions);
-
-        consumerClient.schedule(task, time.milliseconds() + 100);
-        consumerClient.poll(0);
-        assertEquals(1, task.executions);
-
-        time.sleep(100);
-        consumerClient.poll(0);
-        assertEquals(2, task.executions);
-    }
-
-    @Test
     public void wakeup() {
         RequestFuture<ClientResponse> future = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
         consumerClient.wakeup();
@@ -175,12 +159,4 @@ public class ConsumerNetworkClientTest {
         return response.toStruct();
     }
 
-    private static class TestDelayedTask implements DelayedTask {
-        int executions = 0;
-        @Override
-        public void run(long now) {
-            executions++;
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
deleted file mode 100644
index db87b66..0000000
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.consumer.internals;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-
-import static org.junit.Assert.assertEquals;
-
-public class DelayedTaskQueueTest {
-    private DelayedTaskQueue scheduler = new DelayedTaskQueue();
-    private ArrayList<DelayedTask> executed = new ArrayList<DelayedTask>();
-
-    @Test
-    public void testScheduling() {
-        // Empty scheduler
-        assertEquals(Long.MAX_VALUE, scheduler.nextTimeout(0));
-        scheduler.poll(0);
-        assertEquals(Collections.emptyList(), executed);
-
-        TestTask task1 = new TestTask();
-        TestTask task2 = new TestTask();
-        TestTask task3 = new TestTask();
-        scheduler.add(task1, 20);
-        assertEquals(20, scheduler.nextTimeout(0));
-        scheduler.add(task2, 10);
-        assertEquals(10, scheduler.nextTimeout(0));
-        scheduler.add(task3, 30);
-        assertEquals(10, scheduler.nextTimeout(0));
-
-        scheduler.poll(5);
-        assertEquals(Collections.emptyList(), executed);
-        assertEquals(5, scheduler.nextTimeout(5));
-
-        scheduler.poll(10);
-        assertEquals(Arrays.asList(task2), executed);
-        assertEquals(10, scheduler.nextTimeout(10));
-
-        scheduler.poll(20);
-        assertEquals(Arrays.asList(task2, task1), executed);
-        assertEquals(20, scheduler.nextTimeout(10));
-
-        scheduler.poll(30);
-        assertEquals(Arrays.asList(task2, task1, task3), executed);
-        assertEquals(Long.MAX_VALUE, scheduler.nextTimeout(30));
-    }
-
-    @Test
-    public void testRemove() {
-        TestTask task1 = new TestTask();
-        TestTask task2 = new TestTask();
-        TestTask task3 = new TestTask();
-        scheduler.add(task1, 20);
-        scheduler.add(task2, 10);
-        scheduler.add(task3, 30);
-        scheduler.add(task1, 40);
-        assertEquals(10, scheduler.nextTimeout(0));
-
-        scheduler.remove(task2);
-        assertEquals(20, scheduler.nextTimeout(0));
-
-        scheduler.remove(task1);
-        assertEquals(30, scheduler.nextTimeout(0));
-
-        scheduler.remove(task3);
-        assertEquals(Long.MAX_VALUE, scheduler.nextTimeout(0));
-    }
-
-    private class TestTask implements DelayedTask {
-        @Override
-        public void run(long now) {
-            executed.add(this);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index ba04cb5..5186618 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -438,6 +438,7 @@ public class FetcherTest {
         fetcherNoAutoReset.sendFetches();
         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
         consumerClient.poll(0);
+
         assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp));
         try {
             fetcherNoAutoReset.fetchedRecords();

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
index 75e68cc..0177c79 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
@@ -28,8 +28,10 @@ public class HeartbeatTest {
 
     private long timeout = 300L;
     private long interval = 100L;
+    private long maxPollInterval = 900L;
+    private long retryBackoff = 10L;
     private MockTime time = new MockTime();
-    private Heartbeat heartbeat = new Heartbeat(timeout, interval, -1L);
+    private Heartbeat heartbeat = new Heartbeat(timeout, interval, maxPollInterval, retryBackoff);
 
     @Test
     public void testShouldHeartbeat() {
@@ -64,7 +66,7 @@ public class HeartbeatTest {
     public void testResetSession() {
         heartbeat.sentHeartbeat(time.milliseconds());
         time.sleep(305);
-        heartbeat.resetSessionTimeout(time.milliseconds());
+        heartbeat.resetTimeouts(time.milliseconds());
         assertFalse(heartbeat.sessionTimeoutExpired(time.milliseconds()));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index be7f974..766c745 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -55,8 +55,9 @@ public class RequestResponseTest {
                 createHeartBeatRequest(),
                 createHeartBeatRequest().getErrorResponse(0, new UnknownServerException()),
                 createHeartBeatResponse(),
-                createJoinGroupRequest(),
-                createJoinGroupRequest().getErrorResponse(0, new UnknownServerException()),
+                createJoinGroupRequest(1),
+                createJoinGroupRequest(0).getErrorResponse(0, new UnknownServerException()),
+                createJoinGroupRequest(1).getErrorResponse(1, new UnknownServerException()),
                 createJoinGroupResponse(),
                 createLeaveGroupRequest(),
                 createLeaveGroupRequest().getErrorResponse(0, new UnknownServerException()),
@@ -118,6 +119,7 @@ public class RequestResponseTest {
         checkSerialization(createOffsetCommitRequest(0).getErrorResponse(0, new UnknownServerException()), 0);
         checkSerialization(createOffsetCommitRequest(1), 1);
         checkSerialization(createOffsetCommitRequest(1).getErrorResponse(1, new UnknownServerException()), 1);
+        checkSerialization(createJoinGroupRequest(0), 0);
         checkSerialization(createUpdateMetadataRequest(0, null), 0);
         checkSerialization(createUpdateMetadataRequest(0, null).getErrorResponse(0, new UnknownServerException()), 0);
         checkSerialization(createUpdateMetadataRequest(1, null), 1);
@@ -236,11 +238,15 @@ public class RequestResponseTest {
         return new HeartbeatResponse(Errors.NONE.code());
     }
 
-    private AbstractRequest createJoinGroupRequest() {
+    private AbstractRequest createJoinGroupRequest(int version) {
         ByteBuffer metadata = ByteBuffer.wrap(new byte[] {});
         List<JoinGroupRequest.ProtocolMetadata> protocols = new ArrayList<>();
         protocols.add(new JoinGroupRequest.ProtocolMetadata("consumer-range", metadata));
-        return new JoinGroupRequest("group1", 30000, "consumer1", "consumer", protocols);
+        if (version == 0) {
+            return new JoinGroupRequest("group1", 30000, "consumer1", "consumer", protocols);
+        } else {
+            return new JoinGroupRequest("group1", 10000, 60000, "consumer1", "consumer", protocols);
+        }
     }
 
     private AbstractRequestResponse createJoinGroupResponse() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index f5aa8ae..6e9d7b4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -43,13 +43,31 @@ public class DistributedConfig extends WorkerConfig {
      * <code>session.timeout.ms</code>
      */
     public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
-    private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect failures when using Kafka's group management facilities.";
+    private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect worker failures." +
+            "The worker sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are " +
+            "received by the broker before the expiration of this session timeout, then the broker will remove the " +
+            "worker from the group and initiate a rebalance. Note that the value must be in the allowable range as " +
+            "configured in the broker configuration by <code>group.min.session.timeout.ms</code> " +
+            "and <code>group.max.session.timeout.ms</code>.";
 
     /**
      * <code>heartbeat.interval.ms</code>
      */
     public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
-    private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the group coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the worker's session stays active and to facilitate rebalancing when new members join or leave the group. The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";
+    private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the group " +
+            "coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the " +
+            "worker's session stays active and to facilitate rebalancing when new members join or leave the group. " +
+            "The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher " +
+            "than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";
+
+    /**
+     * <code>rebalance.timeout.ms</code>
+     */
+    public static final String REBALANCE_TIMEOUT_MS_CONFIG = "rebalance.timeout.ms";
+    private static final String REBALANCE_TIMEOUT_MS_DOC = "The maximum allowed time for each worker to join the group " +
+            "once a rebalance has begun. This is basically a limit on the amount of time needed for all tasks to " +
+            "flush any pending data and commit offsets. If the timeout is exceeded, then the worker will be removed " +
+            "from the group, which will cause offset commit failures.";
 
     /**
      * <code>worker.sync.timeout.ms</code>
@@ -90,9 +108,14 @@ public class DistributedConfig extends WorkerConfig {
                 .define(GROUP_ID_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, GROUP_ID_DOC)
                 .define(SESSION_TIMEOUT_MS_CONFIG,
                         ConfigDef.Type.INT,
-                        30000,
+                        10000,
                         ConfigDef.Importance.HIGH,
                         SESSION_TIMEOUT_MS_DOC)
+                .define(REBALANCE_TIMEOUT_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        60000,
+                        ConfigDef.Importance.HIGH,
+                        REBALANCE_TIMEOUT_MS_DOC)
                 .define(HEARTBEAT_INTERVAL_MS_CONFIG,
                         ConfigDef.Type.INT,
                         3000,

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 9c74960..9114555 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
 import org.apache.kafka.common.utils.CircularIterator;
 import org.apache.kafka.common.utils.Time;
@@ -63,6 +64,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
      */
     public WorkerCoordinator(ConsumerNetworkClient client,
                              String groupId,
+                             int rebalanceTimeoutMs,
                              int sessionTimeoutMs,
                              int heartbeatIntervalMs,
                              Metrics metrics,
@@ -74,6 +76,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
                              WorkerRebalanceListener listener) {
         super(client,
                 groupId,
+                rebalanceTimeoutMs,
                 sessionTimeoutMs,
                 heartbeatIntervalMs,
                 metrics,
@@ -97,6 +100,32 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
         return "connect";
     }
 
+    public void poll(long timeout) {
+        // poll for io until the timeout expires
+        long now = time.milliseconds();
+        long deadline = now + timeout;
+
+        while (now <= deadline) {
+            if (coordinatorUnknown()) {
+                ensureCoordinatorReady();
+                now = time.milliseconds();
+            }
+
+            if (needRejoin()) {
+                ensureActiveGroup();
+                now = time.milliseconds();
+            }
+
+            pollHeartbeat(now);
+
+            // Note that because the network client is shared with the background heartbeat thread,
+            // we do not want to block in poll longer than the time to the next heartbeat.
+            long remaining = Math.max(0, deadline - now);
+            client.poll(Math.min(remaining, timeToNextHeartbeat(now)));
+            now = time.milliseconds();
+        }
+    }
+
     @Override
     public List<ProtocolMetadata> metadata() {
         configSnapshot = configStorage.snapshot();
@@ -238,12 +267,15 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
     }
 
     @Override
-    public boolean needRejoin() {
+    protected boolean needRejoin() {
         return super.needRejoin() || (assignmentSnapshot == null || assignmentSnapshot.failed()) || rejoinRequested;
     }
 
     public String memberId() {
-        return this.memberId;
+        Generation generation = generation();
+        if (generation != null)
+            return generation.memberId;
+        return JoinGroupRequest.UNKNOWN_MEMBER_ID;
     }
 
     @Override
@@ -252,7 +284,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
     }
 
     private boolean isLeader() {
-        return assignmentSnapshot != null && memberId.equals(assignmentSnapshot.leader());
+        return assignmentSnapshot != null && memberId().equals(assignmentSnapshot.leader());
     }
 
     public String ownerUrl(String connector) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index c21b9bf..a5213db 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -104,6 +104,7 @@ public class WorkerGroupMember {
                     config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG));
             this.coordinator = new WorkerCoordinator(this.client,
                     config.getString(DistributedConfig.GROUP_ID_CONFIG),
+                    config.getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG),
                     config.getInt(DistributedConfig.SESSION_TIMEOUT_MS_CONFIG),
                     config.getInt(DistributedConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
                     metrics,
@@ -131,23 +132,13 @@ public class WorkerGroupMember {
     }
 
     public void ensureActive() {
-        coordinator.ensureCoordinatorReady();
-        coordinator.ensureActiveGroup();
+        coordinator.poll(0);
     }
 
     public void poll(long timeout) {
         if (timeout < 0)
             throw new IllegalArgumentException("Timeout must not be negative");
-
-        // poll for new data until the timeout expires
-        long remaining = timeout;
-        while (remaining >= 0) {
-            long start = time.milliseconds();
-            coordinator.ensureCoordinatorReady();
-            coordinator.ensureActiveGroup();
-            client.poll(remaining);
-            remaining -= time.milliseconds() - start;
-        }
+        coordinator.poll(timeout);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 4c2ac40..3bfa83f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -67,6 +67,7 @@ public class WorkerCoordinatorTest {
 
     private String groupId = "test-group";
     private int sessionTimeoutMs = 10;
+    private int rebalanceTimeoutMs = 60;
     private int heartbeatIntervalMs = 2;
     private long retryBackoffMs = 100;
     private MockTime time;
@@ -98,6 +99,7 @@ public class WorkerCoordinatorTest {
 
         this.coordinator = new WorkerCoordinator(consumerClient,
                 groupId,
+                rebalanceTimeoutMs,
                 sessionTimeoutMs,
                 heartbeatIntervalMs,
                 metrics,

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/core/src/main/scala/kafka/api/ApiVersion.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index 666d0e7..d955225 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -51,7 +51,10 @@ object ApiVersion {
     "0.10.0-IV0" -> KAFKA_0_10_0_IV0,
     // 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL handshake).
     "0.10.0-IV1" -> KAFKA_0_10_0_IV1,
-    "0.10.0" -> KAFKA_0_10_0_IV1
+    "0.10.0" -> KAFKA_0_10_0_IV1,
+
+    // introduced for JoinGroup protocol change in KIP-62
+    "0.10.1-IV0" -> KAFKA_0_10_1_IV0
   )
 
   private val versionPattern = "\\.".r
@@ -111,3 +114,9 @@ case object KAFKA_0_10_0_IV1 extends ApiVersion {
   val messageFormatVersion: Byte = Message.MagicValue_V1
   val id: Int = 5
 }
+
+case object KAFKA_0_10_1_IV0 extends ApiVersion {
+  val version: String = "0.10.1-IV0"
+  val messageFormatVersion: Byte = Message.MagicValue_V1
+  val id: Int = 6
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 0d02a4c..726426a 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -93,6 +93,7 @@ class GroupCoordinator(val brokerId: Int,
                       memberId: String,
                       clientId: String,
                       clientHost: String,
+                      rebalanceTimeoutMs: Int,
                       sessionTimeoutMs: Int,
                       protocolType: String,
                       protocols: List[(String, Array[Byte])],
@@ -118,11 +119,11 @@ class GroupCoordinator(val brokerId: Int,
             responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
           } else {
             val group = groupManager.addGroup(new GroupMetadata(groupId))
-            doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
+            doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
           }
 
         case Some(group) =>
-          doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
+          doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
       }
     }
   }
@@ -131,6 +132,7 @@ class GroupCoordinator(val brokerId: Int,
                           memberId: String,
                           clientId: String,
                           clientHost: String,
+                          rebalanceTimeoutMs: Int,
                           sessionTimeoutMs: Int,
                           protocolType: String,
                           protocols: List[(String, Array[Byte])],
@@ -154,7 +156,7 @@ class GroupCoordinator(val brokerId: Int,
 
           case PreparingRebalance =>
             if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
-              addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
+              addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
             } else {
               val member = group.get(memberId)
               updateMemberAndRebalance(group, member, protocols, responseCallback)
@@ -162,7 +164,7 @@ class GroupCoordinator(val brokerId: Int,
 
           case AwaitingSync =>
             if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
-              addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
+              addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
             } else {
               val member = group.get(memberId)
               if (member.matches(protocols)) {
@@ -189,7 +191,7 @@ class GroupCoordinator(val brokerId: Int,
           case Empty | Stable =>
             if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
               // if the member id is unknown, register the member to the group
-              addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
+              addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
             } else {
               val member = group.get(memberId)
               if (memberId == group.leaderId || !member.matches(protocols)) {
@@ -256,7 +258,6 @@ class GroupCoordinator(val brokerId: Int,
 
           case AwaitingSync =>
             group.get(memberId).awaitingSyncCallback = responseCallback
-            completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
 
             // if this is the leader, then we can attempt to persist state and transition to stable
             if (memberId == group.leaderId) {
@@ -299,7 +300,7 @@ class GroupCoordinator(val brokerId: Int,
     delayedGroupStore.foreach(groupManager.store)
   }
 
-  def handleLeaveGroup(groupId: String, consumerId: String, responseCallback: Short => Unit) {
+  def handleLeaveGroup(groupId: String, memberId: String, responseCallback: Short => Unit) {
     if (!isActive.get) {
       responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
     } else if (!isCoordinatorForGroup(groupId)) {
@@ -317,10 +318,10 @@ class GroupCoordinator(val brokerId: Int,
 
         case Some(group) =>
           group synchronized {
-            if (group.is(Dead) || !group.has(consumerId)) {
+            if (group.is(Dead) || !group.has(memberId)) {
               responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
             } else {
-              val member = group.get(consumerId)
+              val member = group.get(memberId)
               removeHeartbeatForLeavingMember(group, member)
               onMemberFailure(group, member)
               responseCallback(Errors.NONE.code)
@@ -343,27 +344,49 @@ class GroupCoordinator(val brokerId: Int,
       responseCallback(Errors.NONE.code)
     } else {
       groupManager.getGroup(groupId) match {
-        case None => responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+        case None =>
+          responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+
         case Some(group) =>
           group synchronized {
-            if (group.is(Empty)) {
-              responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
-            } else if (group.is(Dead)) {
-              // if the group is marked as dead, it means some other thread has just removed the group
-              // from the coordinator metadata; this is likely that the group has migrated to some other
-              // coordinator OR the group is in a transient unstable phase. Let the member retry
-              // joining without the specified member id,
-              responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
-            } else if (!group.is(Stable)) {
-              responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
-            } else if (!group.has(memberId)) {
-              responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
-            } else if (generationId != group.generationId) {
-              responseCallback(Errors.ILLEGAL_GENERATION.code)
-            } else {
-              val member = group.get(memberId)
-              completeAndScheduleNextHeartbeatExpiration(group, member)
-              responseCallback(Errors.NONE.code)
+            group.currentState match {
+              case Dead =>
+                // if the group is marked as dead, it means some other thread has just removed the group
+                // from the coordinator metadata; this is likely that the group has migrated to some other
+                // coordinator OR the group is in a transient unstable phase. Let the member retry
+                // joining without the specified member id,
+                responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+
+              case Empty =>
+                responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+
+              case AwaitingSync =>
+                if (!group.has(memberId))
+                  responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+                else
+                  responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
+
+              case PreparingRebalance =>
+                if (!group.has(memberId)) {
+                  responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+                } else if (generationId != group.generationId) {
+                  responseCallback(Errors.ILLEGAL_GENERATION.code)
+                } else {
+                  val member = group.get(memberId)
+                  completeAndScheduleNextHeartbeatExpiration(group, member)
+                  responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
+                }
+
+              case Stable =>
+                if (!group.has(memberId)) {
+                  responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+                } else if (generationId != group.generationId) {
+                  responseCallback(Errors.ILLEGAL_GENERATION.code)
+                } else {
+                  val member = group.get(memberId)
+                  completeAndScheduleNextHeartbeatExpiration(group, member)
+                  responseCallback(Errors.NONE.code)
+                }
             }
           }
       }
@@ -585,7 +608,8 @@ class GroupCoordinator(val brokerId: Int,
     heartbeatPurgatory.checkAndComplete(memberKey)
   }
 
-  private def addMemberAndRebalance(sessionTimeoutMs: Int,
+  private def addMemberAndRebalance(rebalanceTimeoutMs: Int,
+                                    sessionTimeoutMs: Int,
                                     clientId: String,
                                     clientHost: String,
                                     protocolType: String,
@@ -594,7 +618,8 @@ class GroupCoordinator(val brokerId: Int,
                                     callback: JoinCallback) = {
     // use the client-id with a random id suffix as the member-id
     val memberId = clientId + "-" + group.generateMemberIdSuffix
-    val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols)
+    val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs,
+      sessionTimeoutMs, protocolType, protocols)
     member.awaitingJoinCallback = callback
     group.add(member.memberId, member)
     maybePrepareRebalance(group)
@@ -625,7 +650,7 @@ class GroupCoordinator(val brokerId: Int,
     group.transitionTo(PreparingRebalance)
     info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId))
 
-    val rebalanceTimeout = group.rebalanceTimeout
+    val rebalanceTimeout = group.rebalanceTimeoutMs
     val delayedRebalance = new DelayedJoin(this, group, rebalanceTimeout)
     val groupKey = GroupKey(group.groupId)
     joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
@@ -770,7 +795,8 @@ object GroupCoordinator {
     val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
       groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs)
 
-    val groupMetadataManager = new GroupMetadataManager(config.brokerId, offsetConfig, replicaManager, zkUtils, time)
+    val groupMetadataManager = new GroupMetadataManager(config.brokerId, config.interBrokerProtocolVersion,
+      offsetConfig, replicaManager, zkUtils, time)
     new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, joinPurgatory, time)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
index b455964..c86c7f8 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
@@ -190,8 +190,8 @@ private[coordinator] class GroupMetadata(val groupId: String, initialState: Grou
 
   def allMemberMetadata = members.values.toList
 
-  def rebalanceTimeout = members.values.foldLeft(0) {(timeout, member) =>
-    timeout.max(member.sessionTimeoutMs)
+  def rebalanceTimeoutMs = members.values.foldLeft(0) { (timeout, member) =>
+    timeout.max(member.rebalanceTimeoutMs)
   }
 
   // TODO: decide if ids should be predictable or random

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index ef8b295..cf8ae91 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -47,10 +47,12 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
 
 import com.yammer.metrics.core.Gauge
+import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0}
 import kafka.utils.CoreUtils.inLock
 
 
 class GroupMetadataManager(val brokerId: Int,
+                           val interBrokerProtocolVersion: ApiVersion,
                            val config: OffsetConfig,
                            replicaManager: ReplicaManager,
                            zkUtils: ZkUtils,
@@ -175,9 +177,11 @@ class GroupMetadataManager(val brokerId: Int,
                         groupAssignment: Map[String, Array[Byte]],
                         responseCallback: Short => Unit): DelayedStore = {
     val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId))
+    val groupMetadataValueVersion = if (interBrokerProtocolVersion < KAFKA_0_10_1_IV0) 0.toShort else GroupMetadataManager.CURRENT_GROUP_VALUE_SCHEMA_VERSION
+
     val message = new Message(
       key = GroupMetadataManager.groupMetadataKey(group.groupId),
-      bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment),
+      bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment, version = groupMetadataValueVersion),
       timestamp = timestamp,
       magicValue = magicValue)
 
@@ -704,30 +708,51 @@ object GroupMetadataManager {
   private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", STRING))
   private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
 
-  private val MEMBER_METADATA_V0 = new Schema(new Field("member_id", STRING),
-    new Field("client_id", STRING),
-    new Field("client_host", STRING),
-    new Field("session_timeout", INT32),
-    new Field("subscription", BYTES),
-    new Field("assignment", BYTES))
-  private val MEMBER_METADATA_MEMBER_ID_V0 = MEMBER_METADATA_V0.get("member_id")
-  private val MEMBER_METADATA_CLIENT_ID_V0 = MEMBER_METADATA_V0.get("client_id")
-  private val MEMBER_METADATA_CLIENT_HOST_V0 = MEMBER_METADATA_V0.get("client_host")
-  private val MEMBER_METADATA_SESSION_TIMEOUT_V0 = MEMBER_METADATA_V0.get("session_timeout")
-  private val MEMBER_METADATA_SUBSCRIPTION_V0 = MEMBER_METADATA_V0.get("subscription")
-  private val MEMBER_METADATA_ASSIGNMENT_V0 = MEMBER_METADATA_V0.get("assignment")
-
-
-  private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(new Field("protocol_type", STRING),
-    new Field("generation", INT32),
-    new Field("protocol", NULLABLE_STRING),
-    new Field("leader", NULLABLE_STRING),
-    new Field("members", new ArrayOf(MEMBER_METADATA_V0)))
-  private val GROUP_METADATA_PROTOCOL_TYPE_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("protocol_type")
-  private val GROUP_METADATA_GENERATION_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("generation")
-  private val GROUP_METADATA_PROTOCOL_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("protocol")
-  private val GROUP_METADATA_LEADER_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("leader")
-  private val GROUP_METADATA_MEMBERS_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("members")
+  private val MEMBER_ID_KEY = "member_id"
+  private val CLIENT_ID_KEY = "client_id"
+  private val CLIENT_HOST_KEY = "client_host"
+  private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout"
+  private val SESSION_TIMEOUT_KEY = "session_timeout"
+  private val SUBSCRIPTION_KEY = "subscription"
+  private val ASSIGNMENT_KEY = "assignment"
+
+  private val MEMBER_METADATA_V0 = new Schema(
+    new Field(MEMBER_ID_KEY, STRING),
+    new Field(CLIENT_ID_KEY, STRING),
+    new Field(CLIENT_HOST_KEY, STRING),
+    new Field(SESSION_TIMEOUT_KEY, INT32),
+    new Field(SUBSCRIPTION_KEY, BYTES),
+    new Field(ASSIGNMENT_KEY, BYTES))
+
+  private val MEMBER_METADATA_V1 = new Schema(
+    new Field(MEMBER_ID_KEY, STRING),
+    new Field(CLIENT_ID_KEY, STRING),
+    new Field(CLIENT_HOST_KEY, STRING),
+    new Field(REBALANCE_TIMEOUT_KEY, INT32),
+    new Field(SESSION_TIMEOUT_KEY, INT32),
+    new Field(SUBSCRIPTION_KEY, BYTES),
+    new Field(ASSIGNMENT_KEY, BYTES))
+
+  private val PROTOCOL_TYPE_KEY = "protocol_type"
+  private val GENERATION_KEY = "generation"
+  private val PROTOCOL_KEY = "protocol"
+  private val LEADER_KEY = "leader"
+  private val MEMBERS_KEY = "members"
+
+  private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(
+    new Field(PROTOCOL_TYPE_KEY, STRING),
+    new Field(GENERATION_KEY, INT32),
+    new Field(PROTOCOL_KEY, NULLABLE_STRING),
+    new Field(LEADER_KEY, NULLABLE_STRING),
+    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V0)))
+
+  private val GROUP_METADATA_VALUE_SCHEMA_V1 = new Schema(
+    new Field(PROTOCOL_TYPE_KEY, STRING),
+    new Field(GENERATION_KEY, INT32),
+    new Field(PROTOCOL_KEY, NULLABLE_STRING),
+    new Field(LEADER_KEY, NULLABLE_STRING),
+    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))
+
 
   // map of versions to key schemas as data types
   private val MESSAGE_TYPE_SCHEMAS = Map(
@@ -742,8 +767,10 @@ object GroupMetadataManager {
   private val CURRENT_OFFSET_VALUE_SCHEMA_VERSION = 1.toShort
 
   // map of version of group metadata value schemas
-  private val GROUP_VALUE_SCHEMAS = Map(0 -> GROUP_METADATA_VALUE_SCHEMA_V0)
-  private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 0.toShort
+  private val GROUP_VALUE_SCHEMAS = Map(
+    0 -> GROUP_METADATA_VALUE_SCHEMA_V0,
+    1 -> GROUP_METADATA_VALUE_SCHEMA_V1)
+  private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 1.toShort
 
   private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
   private val CURRENT_GROUP_KEY_SCHEMA = schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
@@ -830,40 +857,47 @@ object GroupMetadataManager {
    * Generates the payload for group metadata message from given offset and metadata
    * assuming the generation id, selected protocol, leader and member assignment are all available
    *
-   * @param groupMetadata
+   * @param groupMetadata current group metadata
+   * @param assignment the assignment for the rebalancing generation
+   * @param version the version of the value message to use
    * @return payload for offset commit message
    */
-  def groupMetadataValue(groupMetadata: GroupMetadata, assignment: Map[String, Array[Byte]]): Array[Byte] = {
-    // generate commit value with schema version 1
-    val value = new Struct(CURRENT_GROUP_VALUE_SCHEMA)
-    value.set(GROUP_METADATA_PROTOCOL_TYPE_V0, groupMetadata.protocolType.getOrElse(""))
-    value.set(GROUP_METADATA_GENERATION_V0, groupMetadata.generationId)
-    value.set(GROUP_METADATA_PROTOCOL_V0, groupMetadata.protocol)
-    value.set(GROUP_METADATA_LEADER_V0, groupMetadata.leaderId)
+  def groupMetadataValue(groupMetadata: GroupMetadata,
+                         assignment: Map[String, Array[Byte]],
+                         version: Short = 0): Array[Byte] = {
+    val value = if (version == 0) new Struct(GROUP_METADATA_VALUE_SCHEMA_V0) else new Struct(CURRENT_GROUP_VALUE_SCHEMA)
+
+    value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
+    value.set(GENERATION_KEY, groupMetadata.generationId)
+    value.set(PROTOCOL_KEY, groupMetadata.protocol)
+    value.set(LEADER_KEY, groupMetadata.leaderId)
 
     val memberArray = groupMetadata.allMemberMetadata.map {
       case memberMetadata =>
-        val memberStruct = value.instance(GROUP_METADATA_MEMBERS_V0)
-        memberStruct.set(MEMBER_METADATA_MEMBER_ID_V0, memberMetadata.memberId)
-        memberStruct.set(MEMBER_METADATA_CLIENT_ID_V0, memberMetadata.clientId)
-        memberStruct.set(MEMBER_METADATA_CLIENT_HOST_V0, memberMetadata.clientHost)
-        memberStruct.set(MEMBER_METADATA_SESSION_TIMEOUT_V0, memberMetadata.sessionTimeoutMs)
+        val memberStruct = value.instance(MEMBERS_KEY)
+        memberStruct.set(MEMBER_ID_KEY, memberMetadata.memberId)
+        memberStruct.set(CLIENT_ID_KEY, memberMetadata.clientId)
+        memberStruct.set(CLIENT_HOST_KEY, memberMetadata.clientHost)
+        memberStruct.set(SESSION_TIMEOUT_KEY, memberMetadata.sessionTimeoutMs)
+
+        if (version > 0)
+          memberStruct.set(REBALANCE_TIMEOUT_KEY, memberMetadata.rebalanceTimeoutMs)
 
         val metadata = memberMetadata.metadata(groupMetadata.protocol)
-        memberStruct.set(MEMBER_METADATA_SUBSCRIPTION_V0, ByteBuffer.wrap(metadata))
+        memberStruct.set(SUBSCRIPTION_KEY, ByteBuffer.wrap(metadata))
 
         val memberAssignment = assignment(memberMetadata.memberId)
         assert(memberAssignment != null)
 
-        memberStruct.set(MEMBER_METADATA_ASSIGNMENT_V0, ByteBuffer.wrap(memberAssignment))
+        memberStruct.set(ASSIGNMENT_KEY, ByteBuffer.wrap(memberAssignment))
 
         memberStruct
     }
 
-    value.set(GROUP_METADATA_MEMBERS_V0, memberArray.toArray)
+    value.set(MEMBERS_KEY, memberArray.toArray)
 
     val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
-    byteBuffer.putShort(CURRENT_GROUP_VALUE_SCHEMA_VERSION)
+    byteBuffer.putShort(version)
     value.writeTo(byteBuffer)
     byteBuffer.array()
   }
@@ -944,31 +978,33 @@ object GroupMetadataManager {
       val valueSchema = schemaForGroup(version)
       val value = valueSchema.read(buffer)
 
-      if (version == 0) {
-        val protocolType = value.get(GROUP_METADATA_PROTOCOL_TYPE_V0).asInstanceOf[String]
+      if (version == 0 || version == 1) {
+        val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
 
-        val memberMetadataArray = value.getArray(GROUP_METADATA_MEMBERS_V0)
+        val memberMetadataArray = value.getArray(MEMBERS_KEY)
         val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
 
         val group = new GroupMetadata(groupId, initialState)
 
-        group.generationId = value.get(GROUP_METADATA_GENERATION_V0).asInstanceOf[Int]
-        group.leaderId = value.get(GROUP_METADATA_LEADER_V0).asInstanceOf[String]
-        group.protocol = value.get(GROUP_METADATA_PROTOCOL_V0).asInstanceOf[String]
+        group.generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
+        group.leaderId = value.get(LEADER_KEY).asInstanceOf[String]
+        group.protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
 
         memberMetadataArray.foreach {
           case memberMetadataObj =>
             val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
-            val memberId = memberMetadata.get(MEMBER_METADATA_MEMBER_ID_V0).asInstanceOf[String]
-            val clientId = memberMetadata.get(MEMBER_METADATA_CLIENT_ID_V0).asInstanceOf[String]
-            val clientHost = memberMetadata.get(MEMBER_METADATA_CLIENT_HOST_V0).asInstanceOf[String]
-            val sessionTimeout = memberMetadata.get(MEMBER_METADATA_SESSION_TIMEOUT_V0).asInstanceOf[Int]
-            val subscription = Utils.toArray(memberMetadata.get(MEMBER_METADATA_SUBSCRIPTION_V0).asInstanceOf[ByteBuffer])
+            val memberId = memberMetadata.get(MEMBER_ID_KEY).asInstanceOf[String]
+            val clientId = memberMetadata.get(CLIENT_ID_KEY).asInstanceOf[String]
+            val clientHost = memberMetadata.get(CLIENT_HOST_KEY).asInstanceOf[String]
+            val sessionTimeout = memberMetadata.get(SESSION_TIMEOUT_KEY).asInstanceOf[Int]
+            val rebalanceTimeout = if (version == 0) sessionTimeout else memberMetadata.get(REBALANCE_TIMEOUT_KEY).asInstanceOf[Int]
+
+            val subscription = Utils.toArray(memberMetadata.get(SUBSCRIPTION_KEY).asInstanceOf[ByteBuffer])
 
-            val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeout,
+            val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
               protocolType, List((group.protocol, subscription)))
 
-            member.assignment = Utils.toArray(memberMetadata.get(MEMBER_METADATA_ASSIGNMENT_V0).asInstanceOf[ByteBuffer])
+            member.assignment = Utils.toArray(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer])
 
             group.add(memberId, member)
         }