You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/21 21:08:52 UTC

[8/8] kafka git commit: KAFKA-2464: client-side assignment for new consumer

KAFKA-2464: client-side assignment for new consumer

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Jiangjie Qin, Onur Karaman, Ewen Cheslack-Postava, Guozhang Wang

Closes #165 from hachikuji/KAFKA-2464


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/86eb74d9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/86eb74d9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/86eb74d9

Branch: refs/heads/trunk
Commit: 86eb74d9236c586af5889fe79f4b9e066c9c2af3
Parents: 6e747d4
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Oct 21 12:13:42 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Oct 21 12:13:42 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/ConsumerConfig.java  |   9 +-
 .../kafka/clients/consumer/KafkaConsumer.java   |  47 +-
 .../kafka/clients/consumer/RangeAssignor.java   |  97 ++
 .../clients/consumer/RoundRobinAssignor.java    | 114 +++
 .../consumer/internals/AbstractCoordinator.java | 638 +++++++++++++
 .../internals/AbstractPartitionAssignor.java    |  90 ++
 .../consumer/internals/ConsumerCoordinator.java | 595 ++++++++++++
 .../internals/ConsumerNetworkClient.java        |   9 +
 .../consumer/internals/ConsumerProtocol.java    | 162 ++++
 .../clients/consumer/internals/Coordinator.java | 848 -----------------
 .../clients/consumer/internals/Fetcher.java     |  31 +-
 .../consumer/internals/PartitionAssignor.java   | 117 +++
 .../consumer/internals/RequestFuture.java       |  16 +-
 .../consumer/internals/SubscriptionState.java   |  44 +-
 .../java/org/apache/kafka/common/Cluster.java   |  10 +
 ...onsumerCoordinatorNotAvailableException.java |  40 -
 .../GroupCoordinatorNotAvailableException.java  |  40 +
 .../NotCoordinatorForConsumerException.java     |  40 -
 .../errors/NotCoordinatorForGroupException.java |  40 +
 .../errors/UnknownConsumerIdException.java      |  33 -
 .../common/errors/UnknownMemberIdException.java |  33 +
 .../apache/kafka/common/protocol/ApiKeys.java   |   5 +-
 .../apache/kafka/common/protocol/Errors.java    |  22 +-
 .../apache/kafka/common/protocol/Protocol.java  |  97 +-
 .../kafka/common/requests/AbstractRequest.java  |   6 +-
 .../requests/ConsumerMetadataRequest.java       |  65 --
 .../requests/ConsumerMetadataResponse.java      |  70 --
 .../common/requests/GroupMetadataRequest.java   |  65 ++
 .../common/requests/GroupMetadataResponse.java  |  70 ++
 .../kafka/common/requests/HeartbeatRequest.java |  16 +-
 .../common/requests/HeartbeatResponse.java      |   6 +-
 .../kafka/common/requests/JoinGroupRequest.java |  96 +-
 .../common/requests/JoinGroupResponse.java      | 110 ++-
 .../common/requests/OffsetCommitRequest.java    |  34 +-
 .../common/requests/OffsetCommitResponse.java   |   6 +-
 .../common/requests/OffsetFetchResponse.java    |   4 +-
 .../kafka/common/requests/SyncGroupRequest.java | 118 +++
 .../common/requests/SyncGroupResponse.java      |  71 ++
 .../org/apache/kafka/common/utils/Utils.java    |  28 +
 .../org/apache/kafka/clients/MetadataTest.java  |   1 -
 .../clients/consumer/RangeAssignorTest.java     | 217 +++++
 .../consumer/RoundRobinAssignorTest.java        | 209 +++++
 .../internals/ConsumerCoordinatorTest.java      | 749 +++++++++++++++
 .../internals/ConsumerNetworkClientTest.java    |   2 +-
 .../internals/ConsumerProtocolTest.java         | 118 +++
 .../consumer/internals/CoordinatorTest.java     | 635 -------------
 .../clients/consumer/internals/FetcherTest.java |   4 +-
 .../internals/MockPartitionAssignor.java        |  49 +
 .../common/requests/RequestResponseTest.java    |  16 +-
 .../kafka/copycat/util/KafkaBasedLogTest.java   |   2 +-
 .../src/main/scala/kafka/admin/AclCommand.scala |  22 +-
 .../main/scala/kafka/admin/TopicCommand.scala   |   7 +-
 .../kafka/api/ConsumerMetadataRequest.scala     |  80 --
 .../kafka/api/ConsumerMetadataResponse.scala    |  58 --
 .../scala/kafka/api/GroupMetadataRequest.scala  |  80 ++
 .../scala/kafka/api/GroupMetadataResponse.scala |  58 ++
 .../scala/kafka/api/OffsetCommitRequest.scala   |  16 +-
 core/src/main/scala/kafka/api/RequestKeys.scala |   5 +-
 .../main/scala/kafka/client/ClientUtils.scala   |   4 +-
 .../kafka/common/OffsetMetadataAndError.scala   |   5 +-
 core/src/main/scala/kafka/common/Topic.scala    |   4 +-
 .../scala/kafka/consumer/SimpleConsumer.scala   |   4 +-
 .../kafka/coordinator/ConsumerCoordinator.scala | 535 -----------
 .../coordinator/ConsumerGroupMetadata.scala     | 133 ---
 .../kafka/coordinator/ConsumerMetadata.scala    |  50 -
 .../kafka/coordinator/CoordinatorMetadata.scala | 160 +---
 .../kafka/coordinator/DelayedHeartbeat.scala    |  12 +-
 .../scala/kafka/coordinator/DelayedJoin.scala   |  40 +
 .../kafka/coordinator/DelayedRebalance.scala    |  40 -
 .../kafka/coordinator/GroupCoordinator.scala    | 632 +++++++++++++
 .../scala/kafka/coordinator/GroupMetadata.scala | 209 +++++
 .../kafka/coordinator/MemberMetadata.scala      |  99 ++
 .../kafka/coordinator/PartitionAssignor.scala   | 125 ---
 .../javaapi/ConsumerMetadataResponse.scala      |  47 -
 .../kafka/javaapi/GroupMetadataResponse.scala   |  47 +
 .../kafka/security/auth/ResourceType.scala      |   6 +-
 .../src/main/scala/kafka/server/KafkaApis.scala | 144 +--
 .../main/scala/kafka/server/KafkaConfig.scala   |  16 +-
 .../main/scala/kafka/server/KafkaServer.scala   |   6 +-
 .../main/scala/kafka/server/OffsetManager.scala |  16 +-
 .../kafka/api/BaseConsumerTest.scala            |  25 +-
 .../kafka/api/ConsumerBounceTest.scala          |  10 +-
 .../kafka/api/IntegrationTestHarness.scala      |  12 +-
 .../integration/kafka/api/QuotasTest.scala      |   1 -
 .../integration/kafka/api/SslConsumerTest.scala |  22 -
 .../scala/other/kafka/TestOffsetManager.scala   |   4 +-
 .../scala/unit/kafka/admin/AclCommandTest.scala |   8 +-
 .../unit/kafka/admin/TopicCommandTest.scala     |   8 +-
 .../api/RequestResponseSerializationTest.scala  |  10 +-
 .../unit/kafka/consumer/TopicFilterTest.scala   |  10 +-
 .../ConsumerCoordinatorResponseTest.scala       | 447 ---------
 .../coordinator/ConsumerGroupMetadataTest.scala | 172 ----
 .../coordinator/CoordinatorMetadataTest.scala   | 160 +---
 .../GroupCoordinatorResponseTest.scala          | 907 +++++++++++++++++++
 .../kafka/coordinator/GroupMetadataTest.scala   | 249 +++++
 .../kafka/coordinator/MemberMetadataTest.scala  |  90 ++
 .../coordinator/PartitionAssignorTest.scala     | 305 -------
 .../security/auth/SimpleAclAuthorizerTest.scala |   4 +-
 .../unit/kafka/server/KafkaConfigTest.scala     |   4 +-
 .../unit/kafka/server/OffsetCommitTest.scala    |   4 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |   6 +-
 101 files changed, 6634 insertions(+), 4428 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 1894822..5cc0419 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -17,9 +17,9 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.config.SSLConfigs;
 import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.serialization.Deserializer;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -78,7 +78,7 @@ public class ConsumerConfig extends AbstractConfig {
      * <code>partition.assignment.strategy</code>
      */
     public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
-    private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The friendly name of the partition assignment strategy that the server will use to distribute partition ownership amongst consumer instances when group management is used";
+    private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used";
 
     /**
      * <code>auto.offset.reset</code>
@@ -182,9 +182,8 @@ public class ConsumerConfig extends AbstractConfig {
                                         Importance.HIGH,
                                         HEARTBEAT_INTERVAL_MS_DOC)
                                 .define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
-                                        Type.STRING,
-                                        "range",
-                                        in("range", "roundrobin"),
+                                        Type.LIST,
+                                        RangeAssignor.class.getName(),
                                         Importance.MEDIUM,
                                         PARTITION_ASSIGNMENT_STRATEGY_DOC)
                                 .define(METADATA_MAX_AGE_CONFIG,

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 2f7f153..cd166f0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -16,9 +16,10 @@ import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
-import org.apache.kafka.clients.consumer.internals.Coordinator;
+import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
@@ -43,7 +44,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -403,7 +403,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     private static final String JMX_PREFIX = "kafka.consumer";
 
     private String clientId;
-    private final Coordinator coordinator;
+    private final ConsumerCoordinator coordinator;
     private final Deserializer<K> keyDeserializer;
     private final Deserializer<V> valueDeserializer;
     private final Fetcher<K, V> fetcher;
@@ -416,7 +416,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     private final long retryBackoffMs;
     private long requestTimeoutMs;
     private boolean closed = false;
-    private Metadata.Listener metadataListener;
 
     // currentThread holds the threadId of the current thread accessing KafkaConsumer
     // and is used to prevent multi-threaded access
@@ -531,11 +530,15 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs);
             OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase());
             this.subscriptions = new SubscriptionState(offsetResetStrategy);
-            this.coordinator = new Coordinator(this.client,
+            List<PartitionAssignor> assignors = config.getConfiguredInstances(
+                    ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+                    PartitionAssignor.class);
+            this.coordinator = new ConsumerCoordinator(this.client,
                     config.getString(ConsumerConfig.GROUP_ID_CONFIG),
                     config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
                     config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
-                    config.getString(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
+                    assignors,
+                    this.metadata,
                     this.subscriptions,
                     metrics,
                     metricGrpPrefix,
@@ -543,7 +546,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     this.time,
                     requestTimeoutMs,
                     retryBackoffMs,
-                    new Coordinator.DefaultOffsetCommitCallback(),
+                    new ConsumerCoordinator.DefaultOffsetCommitCallback(),
                     config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
                     config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
             if (keyDeserializer == null) {
@@ -652,7 +655,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         try {
             log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
             this.subscriptions.subscribe(topics, listener);
-            metadata.setTopics(topics);
+            metadata.setTopics(subscriptions.groupSubscription());
         } finally {
             release();
         }
@@ -699,22 +702,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         acquire();
         try {
             log.debug("Subscribed to pattern: {}", pattern);
-            metadataListener = new Metadata.Listener() {
-                @Override
-                public void onMetadataUpdate(Cluster cluster) {
-                    final List<String> topicsToSubscribe = new ArrayList<>();
-
-                    for (String topic : cluster.topics())
-                        if (subscriptions.getSubscribedPattern().matcher(topic).matches())
-                            topicsToSubscribe.add(topic);
-
-                    subscriptions.changeSubscription(topicsToSubscribe);
-                    metadata.setTopics(topicsToSubscribe);
-                }
-            };
             this.subscriptions.subscribe(pattern, listener);
             this.metadata.needMetadataForAllTopics(true);
-            this.metadata.addListener(metadataListener);
         } finally {
             release();
         }
@@ -729,7 +718,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             this.subscriptions.unsubscribe();
             this.coordinator.resetGeneration();
             this.metadata.needMetadataForAllTopics(false);
-            this.metadata.removeListener(metadataListener);
         } finally {
             release();
         }
@@ -1079,12 +1067,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         try {
             Cluster cluster = this.metadata.fetch();
             List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
-            if (parts == null) {
-                metadata.add(topic);
-                client.awaitMetadataUpdate();
-                parts = metadata.fetch().partitionsForTopic(topic);
-            }
-            return parts;
+            if (parts != null)
+                return parts;
+
+            Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(Collections.singletonList(topic), requestTimeoutMs);
+            return topicMetadata.get(topic);
         } finally {
             release();
         }
@@ -1101,7 +1088,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     public Map<String, List<PartitionInfo>> listTopics() {
         acquire();
         try {
-            return fetcher.getAllTopics(requestTimeoutMs);
+            return fetcher.getAllTopicMetadata(requestTimeoutMs);
         } finally {
             release();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
new file mode 100644
index 0000000..f23151c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order
+ * and the consumers in lexicographic order. We then divide the number of partitions by the total number of
+ * consumers to determine the number of partitions to assign to each consumer. If it does not evenly
+ * divide, then the first few consumers will have one extra partition.
+ *
+ * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,
+ * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
+ *
+ * The assignment will be:
+ * C0: [t0p0, t0p1, t1p0, t1p1]
+ * C1: [t0p2, t1p2]
+ */
+public class RangeAssignor extends AbstractPartitionAssignor {
+
+    @Override
+    public String name() {
+        return "range";
+    }
+
+    private List<TopicPartition> partitions(String topic,
+                                            int numPartitions) {
+        List<TopicPartition> partitions = new ArrayList<>();
+        for (int i = 0; i < numPartitions; i++)
+            partitions.add(new TopicPartition(topic, i));
+        return partitions;
+    }
+
+    private Map<String, List<String>> consumersPerTopic(Map<String, List<String>> consumerMetadata) {
+        Map<String, List<String>> res = new HashMap<>();
+        for (Map.Entry<String, List<String>> subscriptionEntry : consumerMetadata.entrySet()) {
+            String consumerId = subscriptionEntry.getKey();
+            for (String topic : subscriptionEntry.getValue())
+                put(res, topic, consumerId);
+        }
+        return res;
+    }
+
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+                                                    Map<String, List<String>> subscriptions) {
+        Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
+        Map<String, List<TopicPartition>> assignment = new HashMap<>();
+        for (String memberId : subscriptions.keySet())
+            assignment.put(memberId, new ArrayList<TopicPartition>());
+
+        for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
+            String topic = topicEntry.getKey();
+            List<String> consumersForTopic = topicEntry.getValue();
+
+            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
+            if (numPartitionsForTopic == null)
+                continue;
+
+            Collections.sort(consumersForTopic);
+
+            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
+            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
+
+            List<TopicPartition> partitions = partitions(topic, numPartitionsForTopic);
+            for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
+                int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
+                int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
+                assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
+            }
+        }
+        return assignment;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
new file mode 100644
index 0000000..c5ea2bb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * The roundrobin assignor lays out all the available partitions and all the available consumers. It
+ * then proceeds to do a roundrobin assignment from partition to consumer. If the subscriptions of all consumer
+ * instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts
+ * will be within a delta of exactly one across all consumers.)
+ *
+ * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,
+ * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
+ *
+ * The assignment will be:
+ * C0: [t0p0, t0p2, t1p1]
+ * C1: [t0p1, t1p0, t1p2]
+ */
+public class RoundRobinAssignor extends AbstractPartitionAssignor {
+
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+                                                    Map<String, List<String>> subscriptions) {
+        Map<String, List<TopicPartition>> assignment = new HashMap<>();
+        for (String memberId : subscriptions.keySet())
+            assignment.put(memberId, new ArrayList<TopicPartition>());
+
+        CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet()));
+        for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
+            final String topic = partition.topic();
+            while (!subscriptions.get(assigner.peek()).contains(topic))
+                assigner.next();
+            assignment.get(assigner.next()).add(partition);
+        }
+        return assignment;
+    }
+
+
+    public List<TopicPartition> allPartitionsSorted(Map<String, Integer> partitionsPerTopic,
+                                                    Map<String, List<String>> subscriptions) {
+        SortedSet<String> topics = new TreeSet<>();
+        for (List<String> subscription : subscriptions.values())
+            topics.addAll(subscription);
+
+        List<TopicPartition> allPartitions = new ArrayList<>();
+        for (String topic : topics) {
+            Integer partitions = partitionsPerTopic.get(topic);
+            for (int partition = 0; partition < partitions; partition++) {
+                allPartitions.add(new TopicPartition(topic, partition));
+            }
+        }
+        return allPartitions;
+    }
+
+    @Override
+    public String name() {
+        return "roundrobin";
+    }
+
+    private static class CircularIterator<T> implements Iterator<T> {
+        int i = 0;
+        private List<T> list;
+
+        public CircularIterator(List<T> list) {
+            if (list.isEmpty()) {
+                throw new IllegalArgumentException("CircularIterator can only be used on non-empty lists");
+            }
+            this.list = list;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return true;
+        }
+
+        @Override
+        public T next() {
+            T next = list.get(i);
+            i = (i + 1) % list.size();
+            return next;
+        }
+
+        public T peek() {
+            return list.get(i);
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
new file mode 100644
index 0000000..1ffd2bb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -0,0 +1,638 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.requests.GroupMetadataRequest;
+import org.apache.kafka.common.requests.GroupMetadataResponse;
+import org.apache.kafka.common.requests.HeartbeatRequest;
+import org.apache.kafka.common.requests.HeartbeatResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.SyncGroupRequest;
+import org.apache.kafka.common.requests.SyncGroupResponse;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * AbstractCoordinator implements group management for a single group member by interacting with
+ * a designated Kafka broker (the coordinator). Group semantics are provided by extending this class.
+ * See {@link ConsumerCoordinator} for example usage.
+ *
+ * From a high level, Kafka's group management protocol consists of the following sequence of actions:
+ *
+ * <ol>
+ *     <li>Group Registration: Group members register with the coordinator providing their own metadata
+ *         (such as the set of topics they are interested in).</li>
+ *     <li>Group/Leader Selection: The coordinator select the members of the group and chooses one member
+ *         as the leader.</li>
+ *     <li>State Assignment: The leader collects the metadata from all the members of the group and
+ *         assigns state.</li>
+ *     <li>Group Stabilization: Each member receives the state assigned by the leader and begins
+ *         processing.</li>
+ * </ol>
+ *
+ * To leverage this protocol, an implementation must define the format of metadata provided by each
+ * member for group registration in {@link #metadata()} and the format of the state assignment provided
+ * by the leader in {@link #doSync(String, String, Map)} and becomes available to members in
+ * {@link #onJoin(int, String, String, ByteBuffer)}.
+ *
+ */
+public abstract class AbstractCoordinator {
+
+    private static final Logger log = LoggerFactory.getLogger(AbstractCoordinator.class);
+
+    private final Heartbeat heartbeat;
+    private final HeartbeatTask heartbeatTask;
+    private final int sessionTimeoutMs;
+    private final GroupCoordinatorMetrics sensors;
+    protected final String groupId;
+    protected final ConsumerNetworkClient client;
+    protected final Time time;
+    protected final long retryBackoffMs;
+    protected final long requestTimeoutMs;
+
+    private boolean rejoinNeeded = true;
+    protected Node coordinator;
+    protected String memberId;
+    protected String protocol;
+    protected int generation;
+
+    /**
+     * Initialize the coordination manager.
+     */
+    public AbstractCoordinator(ConsumerNetworkClient client,
+                               String groupId,
+                               int sessionTimeoutMs,
+                               int heartbeatIntervalMs,
+                               Metrics metrics,
+                               String metricGrpPrefix,
+                               Map<String, String> metricTags,
+                               Time time,
+                               long requestTimeoutMs,
+                               long retryBackoffMs) {
+        this.client = client;
+        this.time = time;
+        this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID;
+        this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
+        this.groupId = groupId;
+        this.coordinator = null;
+        this.sessionTimeoutMs = sessionTimeoutMs;
+        this.heartbeat = new Heartbeat(this.sessionTimeoutMs, heartbeatIntervalMs, time.milliseconds());
+        this.heartbeatTask = new HeartbeatTask();
+        this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.retryBackoffMs = retryBackoffMs;
+    }
+
+    /**
+     * Unique identifier for the class of protocols implements (e.g. "consumer" or "copycat").
+     * @return Non-null protocol type namej
+     */
+    protected abstract String protocolType();
+
+    /**
+     * Get the current list of protocols and their associated metadata supported
+     * by the local member. The order of the protocols in the map indicates the preference
+     * of the protocol (the first entry is the most preferred). The coordinator takes this
+     * preference into account when selecting the generation protocol (generally more preferred
+     * protocols will be selected as long as all members support them and there is no disagreement
+     * on the preference).
+     * @return Non-empty map of supported protocols and metadata
+     */
+    protected abstract LinkedHashMap<String, ByteBuffer> metadata();
+
+    /**
+     * Invoked when a group member has successfully joined a group.
+     * @param generation The generation that was joined
+     * @param memberId The identifier for the local member in the group
+     * @param protocol The protocol selected by the coordinator
+     * @param memberAssignment The assignment propagated from the group leader
+     */
+    protected abstract void onJoin(int generation,
+                                   String memberId,
+                                   String protocol,
+                                   ByteBuffer memberAssignment);
+
+    /**
+     * Perform synchronization for the group. This is used by the leader to push state to all the members
+     * of the group (e.g. to push partition assignments in the case of the new consumer)
+     * @param leaderId The id of the leader (which is this member)
+     * @param allMemberMetadata Metadata from all members of the group
+     * @return A map from each member to their state assignment
+     */
+    protected abstract Map<String, ByteBuffer> doSync(String leaderId,
+                                                      String protocol,
+                                                      Map<String, ByteBuffer> allMemberMetadata);
+
+    /**
+     * Invoked when the group is left (whether because of shutdown, metadata change, stale generation, etc.)
+     * @param generation The generation that was left
+     * @param memberId The identifier of the local member in the group
+     */
+    protected abstract void onLeave(int generation, String memberId);
+
+
+    /**
+     * Block until the coordinator for this group is known.
+     */
+    public void ensureCoordinatorKnown() {
+        while (coordinatorUnknown()) {
+            RequestFuture<Void> future = sendGroupMetadataRequest();
+            client.poll(future, requestTimeoutMs);
+
+            if (future.failed())
+                client.awaitMetadataUpdate();
+        }
+    }
+
+    /**
+     * Check whether the group should be rejoined (e.g. if metadata changes)
+     * @return true if it should, false otherwise
+     */
+    protected boolean needRejoin() {
+        return rejoinNeeded;
+    }
+
+    /**
+     * Reset the generation/memberId tracked by this member
+     */
+    public void resetGeneration() {
+        this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID;
+        this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
+        rejoinNeeded = true;
+    }
+
+    /**
+     * Ensure that the group is active (i.e. joined and synced)
+     */
+    public void ensureActiveGroup() {
+        if (!needRejoin())
+            return;
+
+        // onLeave only invoked if we have a valid current generation
+        onLeave(generation, memberId);
+
+        while (needRejoin()) {
+            ensureCoordinatorKnown();
+
+            // ensure that there are no pending requests to the coordinator. This is important
+            // in particular to avoid resending a pending JoinGroup request.
+            if (client.pendingRequestCount(this.coordinator) > 0) {
+                client.awaitPendingRequests(this.coordinator);
+                continue;
+            }
+
+            RequestFuture<ByteBuffer> future = sendJoinGroupRequest();
+            client.poll(future);
+
+            if (future.succeeded()) {
+                onJoin(generation, memberId, protocol, future.value());
+                heartbeatTask.reset();
+            } else {
+                if (future.exception() instanceof UnknownMemberIdException)
+                    continue;
+                else if (!future.isRetriable())
+                    throw future.exception();
+                Utils.sleep(retryBackoffMs);
+            }
+        }
+    }
+
+    private class HeartbeatTask implements DelayedTask {
+
+        public void reset() {
+            // start or restart the heartbeat task to be executed at the next chance
+            long now = time.milliseconds();
+            heartbeat.resetSessionTimeout(now);
+            client.unschedule(this);
+            client.schedule(this, now);
+        }
+
+        @Override
+        public void run(final long now) {
+            if (generation < 0 || needRejoin() || coordinatorUnknown()) {
+                // no need to send the heartbeat we're not using auto-assignment or if we are
+                // awaiting a rebalance
+                return;
+            }
+
+            if (heartbeat.sessionTimeoutExpired(now)) {
+                // we haven't received a successful heartbeat in one session interval
+                // so mark the coordinator dead
+                coordinatorDead();
+                return;
+            }
+
+            if (!heartbeat.shouldHeartbeat(now)) {
+                // we don't need to heartbeat now, so reschedule for when we do
+                client.schedule(this, now + heartbeat.timeToNextHeartbeat(now));
+            } else {
+                heartbeat.sentHeartbeat(now);
+                RequestFuture<Void> future = sendHeartbeatRequest();
+                future.addListener(new RequestFutureListener<Void>() {
+                    @Override
+                    public void onSuccess(Void value) {
+                        long now = time.milliseconds();
+                        heartbeat.receiveHeartbeat(now);
+                        long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now);
+                        client.schedule(HeartbeatTask.this, nextHeartbeatTime);
+                    }
+
+                    @Override
+                    public void onFailure(RuntimeException e) {
+                        client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs);
+                    }
+                });
+            }
+        }
+    }
+
+    /**
+     * Send a request to get a new partition assignment. This is a non-blocking call which sends
+     * a JoinGroup request to the coordinator (if it is available). The returned future must
+     * be polled to see if the request completed successfully.
+     * @return A request future whose completion indicates the result of the JoinGroup request.
+     */
+    private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
+        if (coordinatorUnknown())
+            return RequestFuture.coordinatorNotAvailable();
+
+        // send a join group request to the coordinator
+        log.debug("(Re-)joining group {}", groupId);
+
+        List<JoinGroupRequest.GroupProtocol> protocols = new ArrayList<>();
+        for (Map.Entry<String, ByteBuffer> metadataEntry : metadata().entrySet())
+            protocols.add(new JoinGroupRequest.GroupProtocol(metadataEntry.getKey(), metadataEntry.getValue()));
+
+        JoinGroupRequest request = new JoinGroupRequest(
+                groupId,
+                this.sessionTimeoutMs,
+                this.memberId,
+                protocolType(),
+                protocols);
+
+        // create the request for the coordinator
+        log.debug("Issuing request ({}: {}) to coordinator {}", ApiKeys.JOIN_GROUP, request, this.coordinator.id());
+        return client.send(coordinator, ApiKeys.JOIN_GROUP, request)
+                .compose(new JoinGroupResponseHandler());
+    }
+
+
+    private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
+
+        @Override
+        public JoinGroupResponse parse(ClientResponse response) {
+            return new JoinGroupResponse(response.responseBody());
+        }
+
+        @Override
+        public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
+            // process the response
+            short errorCode = joinResponse.errorCode();
+            if (errorCode == Errors.NONE.code()) {
+                log.debug("Joined group: {}", joinResponse.toStruct());
+                AbstractCoordinator.this.memberId = joinResponse.memberId();
+                AbstractCoordinator.this.generation = joinResponse.generationId();
+                AbstractCoordinator.this.rejoinNeeded = false;
+                AbstractCoordinator.this.protocol = joinResponse.groupProtocol();
+                sensors.joinLatency.record(response.requestLatencyMs());
+                performSync(joinResponse).chain(future);
+            } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) {
+                // reset the member id and retry immediately
+                AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
+                log.info("Attempt to join group {} failed due to unknown member id, resetting and retrying.",
+                        groupId);
+                future.raise(Errors.UNKNOWN_MEMBER_ID);
+            } else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()
+                    || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
+                // re-discover the coordinator and retry with backoff
+                coordinatorDead();
+                log.info("Attempt to join group {} failed due to obsolete coordinator information, retrying.",
+                        groupId);
+                future.raise(Errors.forCode(errorCode));
+            } else if (errorCode == Errors.INCONSISTENT_GROUP_PROTOCOL.code()
+                    || errorCode == Errors.INVALID_SESSION_TIMEOUT.code()) {
+                // log the error and re-throw the exception
+                Errors error = Errors.forCode(errorCode);
+                log.error("Attempt to join group {} failed due to: {}",
+                        groupId, error.exception().getMessage());
+                future.raise(error);
+            } else {
+                // unexpected error, throw the exception
+                future.raise(new KafkaException("Unexpected error in join group response: "
+                        + Errors.forCode(joinResponse.errorCode()).exception().getMessage()));
+            }
+        }
+    }
+
+    private RequestFuture<ByteBuffer> performSync(JoinGroupResponse joinResponse) {
+        if (joinResponse.isLeader()) {
+            try {
+                // perform the leader synchronization and send back the assignment for the group
+                Map<String, ByteBuffer> groupAssignment = doSync(joinResponse.leaderId(), joinResponse.groupProtocol(),
+                        joinResponse.members());
+
+                SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment);
+                log.debug("Issuing leader SyncGroup ({}: {}) to coordinator {}", ApiKeys.SYNC_GROUP, request, this.coordinator.id());
+                return sendSyncGroupRequest(request);
+            } catch (RuntimeException e) {
+                return RequestFuture.failure(e);
+            }
+        } else {
+            // send follower's sync group with an empty assignment
+            SyncGroupRequest request = new SyncGroupRequest(groupId, generation,
+                    memberId, Collections.<String, ByteBuffer>emptyMap());
+            log.debug("Issuing follower SyncGroup ({}: {}) to coordinator {}", ApiKeys.SYNC_GROUP, request, this.coordinator.id());
+            return sendSyncGroupRequest(request);
+        }
+    }
+
+    private RequestFuture<ByteBuffer> sendSyncGroupRequest(SyncGroupRequest request) {
+        if (coordinatorUnknown())
+            return RequestFuture.coordinatorNotAvailable();
+        return client.send(coordinator, ApiKeys.SYNC_GROUP, request)
+                .compose(new SyncGroupRequestHandler());
+    }
+
+    private class SyncGroupRequestHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
+
+        @Override
+        public SyncGroupResponse parse(ClientResponse response) {
+            return new SyncGroupResponse(response.responseBody());
+        }
+
+        @Override
+        public void handle(SyncGroupResponse syncResponse,
+                           RequestFuture<ByteBuffer> future) {
+            short errorCode = syncResponse.errorCode();
+            if (errorCode == Errors.NONE.code()) {
+                try {
+                    future.complete(syncResponse.memberAssignment());
+                    sensors.syncLatency.record(response.requestLatencyMs());
+                } catch (SchemaException e) {
+                    future.raise(e);
+                }
+            } else {
+                AbstractCoordinator.this.rejoinNeeded = true;
+                future.raise(Errors.forCode(errorCode));
+            }
+        }
+    }
+
+    /**
+     * Discover the current coordinator for the group. Sends a GroupMetadata request to
+     * one of the brokers. The returned future should be polled to get the result of the request.
+     * @return A request future which indicates the completion of the metadata request
+     */
+    private RequestFuture<Void> sendGroupMetadataRequest() {
+        // initiate the group metadata request
+        // find a node to ask about the coordinator
+        Node node = this.client.leastLoadedNode();
+        if (node == null) {
+            // TODO: If there are no brokers left, perhaps we should use the bootstrap set
+            // from configuration?
+            return RequestFuture.noBrokersAvailable();
+        } else {
+            // create a group  metadata request
+            log.debug("Issuing group metadata request to broker {}", node.id());
+            GroupMetadataRequest metadataRequest = new GroupMetadataRequest(this.groupId);
+            return client.send(node, ApiKeys.GROUP_METADATA, metadataRequest)
+                    .compose(new RequestFutureAdapter<ClientResponse, Void>() {
+                        @Override
+                        public void onSuccess(ClientResponse response, RequestFuture<Void> future) {
+                            handleGroupMetadataResponse(response, future);
+                        }
+                    });
+        }
+    }
+
+    private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture<Void> future) {
+        log.debug("Group metadata response {}", resp);
+
+        // parse the response to get the coordinator info if it is not disconnected,
+        // otherwise we need to request metadata update
+        if (resp.wasDisconnected()) {
+            future.raise(new DisconnectException());
+        } else if (!coordinatorUnknown()) {
+            // We already found the coordinator, so ignore the request
+            future.complete(null);
+        } else {
+            GroupMetadataResponse groupMetadataResponse = new GroupMetadataResponse(resp.responseBody());
+            // use MAX_VALUE - node.id as the coordinator id to mimic separate connections
+            // for the coordinator in the underlying network client layer
+            // TODO: this needs to be better handled in KAFKA-1935
+            if (groupMetadataResponse.errorCode() == Errors.NONE.code()) {
+                this.coordinator = new Node(Integer.MAX_VALUE - groupMetadataResponse.node().id(),
+                        groupMetadataResponse.node().host(),
+                        groupMetadataResponse.node().port());
+
+                // start sending heartbeats only if we have a valid generation
+                if (generation > 0)
+                    heartbeatTask.reset();
+                future.complete(null);
+            } else {
+                future.raise(Errors.forCode(groupMetadataResponse.errorCode()));
+            }
+        }
+    }
+
+    /**
+     * Check if we know who the coordinator is.
+     * @return true if the coordinator is unknown
+     */
+    public boolean coordinatorUnknown() {
+        return this.coordinator == null;
+    }
+
+
+    /**
+     * Mark the current coordinator as dead.
+     */
+    protected void coordinatorDead() {
+        if (this.coordinator != null) {
+            log.info("Marking the coordinator {} dead.", this.coordinator.id());
+            this.coordinator = null;
+        }
+    }
+
+    /**
+     * Send a heartbeat request now (visible only for testing).
+     */
+    public RequestFuture<Void> sendHeartbeatRequest() {
+        HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.memberId);
+        return client.send(coordinator, ApiKeys.HEARTBEAT, req)
+                .compose(new HeartbeatCompletionHandler());
+    }
+
+    private class HeartbeatCompletionHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
+        @Override
+        public HeartbeatResponse parse(ClientResponse response) {
+            return new HeartbeatResponse(response.responseBody());
+        }
+
+        @Override
+        public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
+            sensors.heartbeatLatency.record(response.requestLatencyMs());
+            short error = heartbeatResponse.errorCode();
+            if (error == Errors.NONE.code()) {
+                log.debug("Received successful heartbeat response.");
+                future.complete(null);
+            } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()
+                    || error == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
+                log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead.");
+                coordinatorDead();
+                future.raise(Errors.forCode(error));
+            } else if (error == Errors.REBALANCE_IN_PROGRESS.code()) {
+                log.info("Attempt to heart beat failed since the group is rebalancing, try to re-join group.");
+                AbstractCoordinator.this.rejoinNeeded = true;
+                future.raise(Errors.REBALANCE_IN_PROGRESS);
+            } else if (error == Errors.ILLEGAL_GENERATION.code()) {
+                log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group.");
+                AbstractCoordinator.this.rejoinNeeded = true;
+                future.raise(Errors.ILLEGAL_GENERATION);
+            } else if (error == Errors.UNKNOWN_MEMBER_ID.code()) {
+                log.info("Attempt to heart beat failed since member id is not valid, reset it and try to re-join group.");
+                memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
+                AbstractCoordinator.this.rejoinNeeded = true;
+                future.raise(Errors.UNKNOWN_MEMBER_ID);
+            } else {
+                future.raise(new KafkaException("Unexpected error in heartbeat response: "
+                        + Errors.forCode(error).exception().getMessage()));
+            }
+        }
+    }
+
+    protected abstract class CoordinatorResponseHandler<R, T>
+            extends RequestFutureAdapter<ClientResponse, T> {
+        protected ClientResponse response;
+
+        public abstract R parse(ClientResponse response);
+
+        public abstract void handle(R response, RequestFuture<T> future);
+
+        @Override
+        public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) {
+            this.response = clientResponse;
+
+            if (clientResponse.wasDisconnected()) {
+                int correlation = response.request().request().header().correlationId();
+                log.debug("Cancelled request {} with correlation id {} due to coordinator {} being disconnected",
+                        response.request(),
+                        correlation,
+                        response.request().request().destination());
+
+                // mark the coordinator as dead
+                coordinatorDead();
+                future.raise(new DisconnectException());
+                return;
+            }
+
+            R response = parse(clientResponse);
+            handle(response, future);
+        }
+
+    }
+
+    private class GroupCoordinatorMetrics {
+        public final Metrics metrics;
+        public final String metricGrpName;
+
+        public final Sensor heartbeatLatency;
+        public final Sensor joinLatency;
+        public final Sensor syncLatency;
+
+        public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
+            this.metrics = metrics;
+            this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
+
+            this.heartbeatLatency = metrics.sensor("heartbeat-latency");
+            this.heartbeatLatency.add(new MetricName("heartbeat-response-time-max",
+                this.metricGrpName,
+                "The max time taken to receive a response to a heartbeat request",
+                tags), new Max());
+            this.heartbeatLatency.add(new MetricName("heartbeat-rate",
+                this.metricGrpName,
+                "The average number of heartbeats per second",
+                tags), new Rate(new Count()));
+
+            this.joinLatency = metrics.sensor("join-latency");
+            this.joinLatency.add(new MetricName("join-time-avg",
+                    this.metricGrpName,
+                    "The average time taken for a group rejoin",
+                    tags), new Avg());
+            this.joinLatency.add(new MetricName("join-time-max",
+                    this.metricGrpName,
+                    "The max time taken for a group rejoin",
+                    tags), new Avg());
+            this.joinLatency.add(new MetricName("join-rate",
+                    this.metricGrpName,
+                    "The number of group joins per second",
+                    tags), new Rate(new Count()));
+
+            this.syncLatency = metrics.sensor("sync-latency");
+            this.syncLatency.add(new MetricName("sync-time-avg",
+                    this.metricGrpName,
+                    "The average time taken for a group sync",
+                    tags), new Avg());
+            this.syncLatency.add(new MetricName("sync-time-max",
+                    this.metricGrpName,
+                    "The max time taken for a group sync",
+                    tags), new Avg());
+            this.syncLatency.add(new MetricName("sync-rate",
+                    this.metricGrpName,
+                    "The number of group syncs per second",
+                    tags), new Rate(new Count()));
+
+            Measurable lastHeartbeat =
+                new Measurable() {
+                    public double measure(MetricConfig config, long now) {
+                        return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
+                    }
+                };
+            metrics.addMetric(new MetricName("last-heartbeat-seconds-ago",
+                this.metricGrpName,
+                "The number of seconds since the last controller heartbeat",
+                tags),
+                lastHeartbeat);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
new file mode 100644
index 0000000..12fa913
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Abstract assignor implementation which does some common grunt work (in particular collecting
+ * partition counts which are always needed in assignors).
+ */
+public abstract class AbstractPartitionAssignor implements PartitionAssignor {
+    private static final Logger log = LoggerFactory.getLogger(AbstractPartitionAssignor.class);
+
+    /**
+     * Perform the group assignment given the partition counts and member subscriptions
+     * @param partitionsPerTopic The number of partitions for each subscribed topic (may be empty for some topics)
+     * @param subscriptions Map from the memberId to their respective topic subscription
+     * @return Map from each member to the
+     */
+    public abstract Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+                                                             Map<String, List<String>> subscriptions);
+
+    @Override
+    public Subscription subscription(Set<String> topics) {
+        return new Subscription(new ArrayList<>(topics));
+    }
+
+    @Override
+    public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
+        Set<String> allSubscribedTopics = new HashSet<>();
+        Map<String, List<String>> topicSubscriptions = new HashMap<>();
+        for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
+            List<String> topics = subscriptionEntry.getValue().topics();
+            allSubscribedTopics.addAll(topics);
+            topicSubscriptions.put(subscriptionEntry.getKey(), topics);
+        }
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        for (String topic : allSubscribedTopics) {
+            Integer numPartitions = metadata.partitionCountForTopic(topic);
+            if (numPartitions != null)
+                partitionsPerTopic.put(topic, numPartitions);
+            else
+                log.debug("Skipping assignment for topic {} since no metadata is available", topic);
+        }
+
+        Map<String, List<TopicPartition>> rawAssignments = assign(partitionsPerTopic, topicSubscriptions);
+
+        // this class has maintains no user data, so just wrap the results
+        Map<String, Assignment> assignments = new HashMap<>();
+        for (Map.Entry<String, List<TopicPartition>> assignmentEntry : rawAssignments.entrySet())
+            assignments.put(assignmentEntry.getKey(), new Assignment(assignmentEntry.getValue()));
+        return assignments;
+    }
+
+    @Override
+    public void onAssignment(Assignment assignment) {
+        // this assignor maintains no internal state, so nothing to do
+    }
+
+    protected static <K, V> void put(Map<K, List<V>> map, K key, V value) {
+        List<V> list = map.get(key);
+        if (list == null) {
+            list = new ArrayList<>();
+            map.put(key, list);
+        }
+        list.add(value);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
new file mode 100644
index 0000000..fc7e819
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -0,0 +1,595 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerWakeupException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment;
+import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetCommitResponse;
+import org.apache.kafka.common.requests.OffsetFetchRequest;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class manages the coordination process with the consumer coordinator.
+ */
+public final class ConsumerCoordinator extends AbstractCoordinator implements Closeable {
+
+    private static final Logger log = LoggerFactory.getLogger(ConsumerCoordinator.class);
+
+    private final Map<String, PartitionAssignor> protocolMap;
+    private final org.apache.kafka.clients.Metadata metadata;
+    private final MetadataSnapshot metadataSnapshot;
+    private final ConsumerCoordinatorMetrics sensors;
+    private final SubscriptionState subscriptions;
+    private final OffsetCommitCallback defaultOffsetCommitCallback;
+    private final boolean autoCommitEnabled;
+
+    /**
+     * Initialize the coordination manager.
+     */
+    public ConsumerCoordinator(ConsumerNetworkClient client,
+                               String groupId,
+                               int sessionTimeoutMs,
+                               int heartbeatIntervalMs,
+                               List<PartitionAssignor> assignors,
+                               Metadata metadata,
+                               SubscriptionState subscriptions,
+                               Metrics metrics,
+                               String metricGrpPrefix,
+                               Map<String, String> metricTags,
+                               Time time,
+                               long requestTimeoutMs,
+                               long retryBackoffMs,
+                               OffsetCommitCallback defaultOffsetCommitCallback,
+                               boolean autoCommitEnabled,
+                               long autoCommitIntervalMs) {
+        super(client,
+                groupId,
+                sessionTimeoutMs,
+                heartbeatIntervalMs,
+                metrics,
+                metricGrpPrefix,
+                metricTags,
+                time,
+                requestTimeoutMs,
+                retryBackoffMs);
+        this.metadata = metadata;
+
+        this.metadata.requestUpdate();
+        this.metadataSnapshot = new MetadataSnapshot();
+        this.subscriptions = subscriptions;
+        this.defaultOffsetCommitCallback = defaultOffsetCommitCallback;
+        this.autoCommitEnabled = autoCommitEnabled;
+
+        this.protocolMap = new HashMap<>();
+        for (PartitionAssignor assignor : assignors)
+            this.protocolMap.put(assignor.name(), assignor);
+
+        addMetadataListener();
+
+        if (autoCommitEnabled)
+            scheduleAutoCommitTask(autoCommitIntervalMs);
+
+        this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
+    }
+
+    @Override
+    public String protocolType() {
+        return "consumer";
+    }
+
+    @Override
+    public LinkedHashMap<String, ByteBuffer> metadata() {
+        LinkedHashMap<String, ByteBuffer> metadata = new LinkedHashMap<>();
+        for (PartitionAssignor assignor : protocolMap.values()) {
+            Subscription subscription = assignor.subscription(subscriptions.subscription());
+            metadata.put(assignor.name(), ConsumerProtocol.serializeSubscription(subscription));
+        }
+        return metadata;
+    }
+
+    private void addMetadataListener() {
+        this.metadata.addListener(new Metadata.Listener() {
+            @Override
+            public void onMetadataUpdate(Cluster cluster) {
+                if (subscriptions.hasPatternSubscription()) {
+                    final List<String> topicsToSubscribe = new ArrayList<>();
+
+                    for (String topic : cluster.topics())
+                        if (subscriptions.getSubscribedPattern().matcher(topic).matches())
+                            topicsToSubscribe.add(topic);
+
+                    subscriptions.changeSubscription(topicsToSubscribe);
+                    metadata.setTopics(subscriptions.groupSubscription());
+                }
+
+                // check if there are any changes to the metadata which should trigger a rebalance
+                if (metadataSnapshot.update(subscriptions, cluster) && subscriptions.partitionsAutoAssigned())
+                    subscriptions.needReassignment();
+            }
+        });
+    }
+
+    @Override
+    protected void onJoin(int generation,
+                          String memberId,
+                          String assignmentStrategy,
+                          ByteBuffer assignmentBuffer) {
+        PartitionAssignor assignor = protocolMap.get(assignmentStrategy);
+        if (assignor == null)
+            throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
+
+        Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
+
+        // set the flag to refresh last committed offsets
+        subscriptions.needRefreshCommits();
+
+        // update partition assignment
+        subscriptions.changePartitionAssignment(assignment.partitions());
+
+        // give the assignor a chance to update internal state based on the received assignment
+        assignor.onAssignment(assignment);
+
+        // execute the user's callback after rebalance
+        ConsumerRebalanceListener listener = subscriptions.listener();
+        log.debug("Setting newly assigned partitions {}", subscriptions.assignedPartitions());
+        try {
+            Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
+            listener.onPartitionsAssigned(assigned);
+        } catch (Exception e) {
+            log.error("User provided listener " + listener.getClass().getName()
+                    + " failed on partition assignment: ", e);
+        }
+    }
+
+    @Override
+    protected Map<String, ByteBuffer> doSync(String leaderId,
+                                             String assignmentStrategy,
+                                             Map<String, ByteBuffer> allSubscriptions) {
+        PartitionAssignor assignor = protocolMap.get(protocol);
+        if (assignor == null)
+            throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
+
+        Set<String> allSubscribedTopics = new HashSet<>();
+        Map<String, Subscription> subscriptions = new HashMap<>();
+        for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) {
+            Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue());
+            subscriptions.put(subscriptionEntry.getKey(), subscription);
+            allSubscribedTopics.addAll(subscription.topics());
+        }
+
+        // the leader will begin watching for changes to any of the topics the group is interested in,
+        // which ensures that all metadata changes will eventually be seen
+        this.subscriptions.groupSubscribe(allSubscribedTopics);
+        metadata.setTopics(this.subscriptions.groupSubscription());
+        client.ensureFreshMetadata();
+
+        log.debug("Performing {} assignment for subscriptions {}", assignor.name(), subscriptions);
+
+        Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions);
+
+        log.debug("Finished assignment: {}", assignment);
+
+        Map<String, ByteBuffer> groupAssignment = new HashMap<>();
+        for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) {
+            ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
+            groupAssignment.put(assignmentEntry.getKey(), buffer);
+        }
+
+        return groupAssignment;
+    }
+
+    @Override
+    protected void onLeave(int generation, String memberId) {
+        // commit offsets prior to rebalance if auto-commit enabled
+        maybeAutoCommitOffsetsSync();
+
+        // execute the user's callback before rebalance
+        ConsumerRebalanceListener listener = subscriptions.listener();
+        log.debug("Revoking previously assigned partitions {}", subscriptions.assignedPartitions());
+        try {
+            Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());
+            listener.onPartitionsRevoked(revoked);
+        } catch (Exception e) {
+            log.error("User provided listener " + listener.getClass().getName()
+                    + " failed on partition revocation: ", e);
+        }
+
+        subscriptions.needReassignment();
+    }
+
+    @Override
+    public boolean needRejoin() {
+        return subscriptions.partitionsAutoAssigned() &&
+                (super.needRejoin() || subscriptions.partitionAssignmentNeeded());
+    }
+
+    /**
+     * Refresh the committed offsets for provided partitions.
+     */
+    public void refreshCommittedOffsetsIfNeeded() {
+        if (subscriptions.refreshCommitsNeeded()) {
+            Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(subscriptions.assignedPartitions());
+            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
+                TopicPartition tp = entry.getKey();
+                // verify assignment is still active
+                if (subscriptions.isAssigned(tp))
+                    this.subscriptions.committed(tp, entry.getValue());
+            }
+            this.subscriptions.commitsRefreshed();
+        }
+    }
+
+    /**
+     * Fetch the current committed offsets from the coordinator for a set of partitions.
+     * @param partitions The partitions to fetch offsets for
+     * @return A map from partition to the committed offset
+     */
+    public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions) {
+        while (true) {
+            ensureCoordinatorKnown();
+
+            // contact coordinator to fetch committed offsets
+            RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = sendOffsetFetchRequest(partitions);
+            client.poll(future);
+
+            if (future.succeeded())
+                return future.value();
+
+            if (!future.isRetriable())
+                throw future.exception();
+
+            Utils.sleep(retryBackoffMs);
+        }
+    }
+
+    /**
+     * Ensure that we have a valid partition assignment from the coordinator.
+     */
+    public void ensurePartitionAssignment() {
+        if (subscriptions.partitionsAutoAssigned())
+            ensureActiveGroup();
+    }
+
+    @Override
+    public void close() {
+        // commit offsets prior to closing if auto-commit enabled
+        while (true) {
+            try {
+                maybeAutoCommitOffsetsSync();
+                return;
+            } catch (ConsumerWakeupException e) {
+                // ignore wakeups while closing to ensure we have a chance to commit
+                continue;
+            }
+        }
+    }
+
+    public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
+        this.subscriptions.needRefreshCommits();
+        RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
+        final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
+        future.addListener(new RequestFutureListener<Void>() {
+            @Override
+            public void onSuccess(Void value) {
+                cb.onComplete(offsets, null);
+            }
+
+            @Override
+            public void onFailure(RuntimeException e) {
+                cb.onComplete(offsets, e);
+            }
+        });
+    }
+
+    public void commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
+        if (offsets.isEmpty())
+            return;
+
+        while (true) {
+            ensureCoordinatorKnown();
+
+            RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
+            client.poll(future);
+
+            if (future.succeeded()) {
+                return;
+            }
+
+            if (!future.isRetriable()) {
+                throw future.exception();
+            }
+
+            Utils.sleep(retryBackoffMs);
+        }
+    }
+
+    private void scheduleAutoCommitTask(final long interval) {
+        DelayedTask task = new DelayedTask() {
+            public void run(long now) {
+                commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
+                    @Override
+                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
+                        if (exception != null)
+                            log.error("Auto offset commit failed.", exception);
+                    }
+                });
+                client.schedule(this, now + interval);
+            }
+        };
+        client.schedule(task, time.milliseconds() + interval);
+    }
+
+    private void maybeAutoCommitOffsetsSync() {
+        if (autoCommitEnabled) {
+            try {
+                commitOffsetsSync(subscriptions.allConsumed());
+            } catch (ConsumerWakeupException e) {
+                // rethrow wakeups since they are triggered by the user
+                throw e;
+            } catch (Exception e) {
+                // consistent with async auto-commit failures, we do not propagate the exception
+                log.error("Auto offset commit failed.", e);
+            }
+        }
+    }
+
+    /**
+     * Commit offsets for the specified list of topics and partitions. This is a non-blocking call
+     * which returns a request future that can be polled in the case of a synchronous commit or ignored in the
+     * asynchronous case.
+     *
+     * @param offsets The list of offsets per partition that should be committed.
+     * @return A request future whose value indicates whether the commit was successful or not
+     */
+    private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
+        if (coordinatorUnknown())
+            return RequestFuture.coordinatorNotAvailable();
+
+        if (offsets.isEmpty())
+            return RequestFuture.voidSuccess();
+
+        // create the offset commit request
+        Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());
+        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
+            OffsetAndMetadata offsetAndMetadata = entry.getValue();
+            offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(
+                    offsetAndMetadata.offset(), offsetAndMetadata.metadata()));
+        }
+
+        OffsetCommitRequest req = new OffsetCommitRequest(this.groupId,
+                this.generation,
+                this.memberId,
+                OffsetCommitRequest.DEFAULT_RETENTION_TIME,
+                offsetData);
+
+        return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req)
+                .compose(new OffsetCommitResponseHandler(offsets));
+    }
+
+    public static class DefaultOffsetCommitCallback implements OffsetCommitCallback {
+        @Override
+        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
+            if (exception != null)
+                log.error("Offset commit failed.", exception);
+        }
+    }
+
+    private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> {
+
+        private final Map<TopicPartition, OffsetAndMetadata> offsets;
+
+        public OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> offsets) {
+            this.offsets = offsets;
+        }
+
+        @Override
+        public OffsetCommitResponse parse(ClientResponse response) {
+            return new OffsetCommitResponse(response.responseBody());
+        }
+
+        @Override
+        public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
+            sensors.commitLatency.record(response.requestLatencyMs());
+            for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
+                TopicPartition tp = entry.getKey();
+                OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
+                long offset = offsetAndMetadata.offset();
+
+                short errorCode = entry.getValue();
+                if (errorCode == Errors.NONE.code()) {
+                    log.debug("Committed offset {} for partition {}", offset, tp);
+                    if (subscriptions.isAssigned(tp))
+                        // update the local cache only if the partition is still assigned
+                        subscriptions.committed(tp, offsetAndMetadata);
+                } else {
+                    if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()
+                            || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
+                        coordinatorDead();
+                    } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()
+                            || errorCode == Errors.ILLEGAL_GENERATION.code()) {
+                        // need to re-join group
+                        subscriptions.needReassignment();
+                    }
+
+                    log.error("Error committing partition {} at offset {}: {}",
+                            tp,
+                            offset,
+                            Errors.forCode(errorCode).exception().getMessage());
+
+                    future.raise(Errors.forCode(errorCode));
+                    return;
+                }
+            }
+
+            future.complete(null);
+        }
+    }
+
+    /**
+     * Fetch the committed offsets for a set of partitions. This is a non-blocking call. The
+     * returned future can be polled to get the actual offsets returned from the broker.
+     *
+     * @param partitions The set of partitions to get offsets for.
+     * @return A request future containing the committed offsets.
+     */
+    private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> partitions) {
+        if (coordinatorUnknown())
+            return RequestFuture.coordinatorNotAvailable();
+
+        log.debug("Fetching committed offsets for partitions: {}",  Utils.join(partitions, ", "));
+        // construct the request
+        OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<TopicPartition>(partitions));
+
+        // send the request with a callback
+        return client.send(coordinator, ApiKeys.OFFSET_FETCH, request)
+                .compose(new OffsetFetchResponseHandler());
+    }
+
+    private class OffsetFetchResponseHandler extends CoordinatorResponseHandler<OffsetFetchResponse, Map<TopicPartition, OffsetAndMetadata>> {
+
+        @Override
+        public OffsetFetchResponse parse(ClientResponse response) {
+            return new OffsetFetchResponse(response.responseBody());
+        }
+
+        @Override
+        public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
+            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(response.responseData().size());
+            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
+                TopicPartition tp = entry.getKey();
+                OffsetFetchResponse.PartitionData data = entry.getValue();
+                if (data.hasError()) {
+                    log.debug("Error fetching offset for topic-partition {}: {}", tp, Errors.forCode(data.errorCode)
+                            .exception()
+                            .getMessage());
+                    if (data.errorCode == Errors.OFFSET_LOAD_IN_PROGRESS.code()) {
+                        // just retry
+                        future.raise(Errors.OFFSET_LOAD_IN_PROGRESS);
+                    } else if (data.errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
+                        // re-discover the coordinator and retry
+                        coordinatorDead();
+                        future.raise(Errors.NOT_COORDINATOR_FOR_GROUP);
+                    } else if (data.errorCode == Errors.UNKNOWN_MEMBER_ID.code()
+                            || data.errorCode == Errors.ILLEGAL_GENERATION.code()) {
+                        // need to re-join group
+                        subscriptions.needReassignment();
+                        future.raise(Errors.forCode(data.errorCode));
+                    } else {
+                        future.raise(new KafkaException("Unexpected error in fetch offset response: "
+                                + Errors.forCode(data.errorCode).exception().getMessage()));
+                    }
+                    return;
+                } else if (data.offset >= 0) {
+                    // record the position with the offset (-1 indicates no committed offset to fetch)
+                    offsets.put(tp, new OffsetAndMetadata(data.offset, data.metadata));
+                } else {
+                    log.debug("No committed offset for partition " + tp);
+                }
+            }
+
+            future.complete(offsets);
+        }
+    }
+
+    private class ConsumerCoordinatorMetrics {
+        public final Metrics metrics;
+        public final String metricGrpName;
+
+        public final Sensor commitLatency;
+
+        public ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
+            this.metrics = metrics;
+            this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
+
+            this.commitLatency = metrics.sensor("commit-latency");
+            this.commitLatency.add(new MetricName("commit-latency-avg",
+                this.metricGrpName,
+                "The average time taken for a commit request",
+                tags), new Avg());
+            this.commitLatency.add(new MetricName("commit-latency-max",
+                this.metricGrpName,
+                "The max time taken for a commit request",
+                tags), new Max());
+            this.commitLatency.add(new MetricName("commit-rate",
+                this.metricGrpName,
+                "The number of commit calls per second",
+                tags), new Rate(new Count()));
+
+            Measurable numParts =
+                new Measurable() {
+                    public double measure(MetricConfig config, long now) {
+                        return subscriptions.assignedPartitions().size();
+                    }
+                };
+            metrics.addMetric(new MetricName("assigned-partitions",
+                this.metricGrpName,
+                "The number of partitions currently assigned to this consumer",
+                tags),
+                numParts);
+        }
+    }
+
+    private static class MetadataSnapshot {
+        private Map<String, Integer> partitionsPerTopic = new HashMap<>();
+
+        public boolean update(SubscriptionState subscription, Cluster cluster) {
+            Map<String, Integer> partitionsPerTopic = new HashMap<>();
+            for (String topic : subscription.groupSubscription())
+                partitionsPerTopic.put(topic, cluster.partitionCountForTopic(topic));
+
+            if (!partitionsPerTopic.equals(this.partitionsPerTopic)) {
+                this.partitionsPerTopic = partitionsPerTopic;
+                return true;
+            }
+
+            return false;
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 4153eb3..fbfe54a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -127,6 +127,15 @@ public class ConsumerNetworkClient implements Closeable {
     }
 
     /**
+     * Ensure our metadata is fresh (if an update is expected, this will block
+     * until it has completed).
+     */
+    public void ensureFreshMetadata() {
+        if (this.metadata.timeToNextUpdate(time.milliseconds()) == 0)
+            awaitMetadataUpdate();
+    }
+
+    /**
      * Wakeup an active poll. This will cause the polling thread to throw an exception either
      * on the current poll if one is active, or the next poll.
      */