You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/05/21 01:52:19 UTC

[1/2] KAFKA-1328 New consumer APIs; reviewed by Jun Rao and Guozhang Wang

Repository: kafka
Updated Branches:
  refs/heads/trunk bf7fb6321 -> c24740c7b


http://git-wip-us.apache.org/repos/asf/kafka/blob/c24740c7/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java
new file mode 100644
index 0000000..0548fb4
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+*/
+package org.apache.kafka.clients.consumer;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+/**
+ * TODO: Clean this after the consumer implementation is complete. Until then, it is useful to write some sample test code using the new APIs
+ *
+ */
+public class ConsumerExampleTest {
+    /**
+     * This example demonstrates how to use the consumer to leverage Kafka's group management functionality for automatic consumer load 
+     * balancing and failure detection. This example assumes that the offsets are stored in Kafka and are automatically committed periodically, 
+     * as controlled by the auto.commit.interval.ms config 
+     */
+//    @Test
+//    public void testConsumerGroupManagementWithAutoOffsetCommits() {
+//        Properties props = new Properties();
+//        props.put("metadata.broker.list", "localhost:9092");
+//        props.put("group.id", "test");
+//        props.put("session.timeout.ms", "1000");
+//        props.put("auto.commit.enable", "true");
+//        props.put("auto.commit.interval.ms", "10000");
+//        KafkaConsumer consumer = new KafkaConsumer(props);
+//        // subscribe to some topics
+//        consumer.subscribe("foo", "bar");
+//        boolean isRunning = true;
+//        while(isRunning) {
+//            Map<String, ConsumerRecords> records = consumer.poll(100);
+//            process(records);
+//        }
+//        consumer.close();
+//    }
+
+    /**
+     * This example demonstrates how to use the consumer to leverage Kafka's group management functionality for automatic consumer load 
+     * balancing and failure detection. This example assumes that the offsets are stored in Kafka and are manually committed using 
+     * either the commit() or commitAsync() APIs. This example also demonstrates rewinding the consumer's offsets if processing of consumed
+     * messages fails.
+     */
+//     @Test
+//     public void testConsumerGroupManagementWithManualOffsetCommit() {
+//         Properties props = new Properties();
+//         props.put("metadata.broker.list", "localhost:9092");
+//         props.put("group.id", "test");
+//         props.put("session.timeout.ms", "1000");
+//         props.put("auto.commit.enable", "false");
+//         KafkaConsumer consumer = new KafkaConsumer(props);
+//         // subscribe to some topics
+//         consumer.subscribe("foo", "bar");
+//         int commitInterval = 100;
+//         int numRecords = 0;
+//         boolean isRunning = true;
+//         Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
+//         while(isRunning) {
+//             Map<String, ConsumerRecords> records = consumer.poll(100);
+//             try {
+//                 Map<TopicPartition, Long> lastConsumedOffsets = process(records);
+//                 consumedOffsets.putAll(lastConsumedOffsets);
+//                 numRecords += records.size();
+//                 // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
+//                 if(numRecords % commitInterval == 0) 
+//                     consumer.commit(true);
+//             } catch(Exception e) {
+//                 // rewind consumer's offsets for failed partitions
+//                 List<TopicPartition> failedPartitions = getFailedPartitions();
+//                 Map<TopicPartition, Long> offsetsToRewindTo = new HashMap<TopicPartition, Long>();
+//                 for(TopicPartition failedPartition : failedPartitions) {
+//                     // rewind to the last consumed offset for the failed partition. Since process() failed for this partition, the consumed offset
+//                     // should still be pointing to the last successfully processed offset and hence is the right offset to rewind consumption to.
+//                     offsetsToRewindTo.put(failedPartition, consumedOffsets.get(failedPartition));
+//                 }
+//                 // seek to new offsets only for partitions that failed the last process()
+//                 consumer.seek(offsetsToRewindTo);
+//             }
+//         }         
+//         consumer.close();
+//     }
+
+     private List<TopicPartition> getFailedPartitions() { return null; }
+
+    /**
+     * This example demonstrates the consumer can be used to leverage Kafka's group management functionality along with custom offset storage. 
+     * In this example, the assumption made is that the user chooses to store the consumer offsets outside Kafka. This requires the user to 
+     * plugin logic for retrieving the offsets from a custom store and provide the offsets to the consumer in the ConsumerRebalanceCallback
+     * callback. The onPartitionsAssigned callback is invoked after the consumer is assigned a new set of partitions on rebalance <i>and</i>
+     * before the consumption restarts post rebalance. This is the right place to supply offsets from a custom store to the consumer.
+     */
+//    @Test
+//    public void testConsumerRebalanceWithCustomOffsetStore() {
+//        Properties props = new Properties();
+//        props.put("metadata.broker.list", "localhost:9092");
+//        props.put("group.id", "test");
+//        props.put("session.timeout.ms", "1000");
+//        props.put("auto.commit.enable", "true");
+//        props.put("auto.commit.interval.ms", "10000");
+//        KafkaConsumer consumer = new KafkaConsumer(props,
+//                                                   new ConsumerRebalanceCallback() {
+//                                                        public void onPartitionsAssigned(Consumer consumer, Collection<TopicPartition> partitions) {
+//                                                           Map<TopicPartition, Long> lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(partitions);
+//                                                           consumer.seek(lastCommittedOffsets);
+//                                                        }
+//                                                        public void onPartitionsRevoked(Consumer consumer, Collection<TopicPartition> partitions) {
+//                                                            Map<TopicPartition, Long> offsets = getLastConsumedOffsets(partitions); // implemented by the user
+//                                                           commitOffsetsToCustomStore(offsets);                                            // implemented by the user
+//                                                        }
+//                                                        private Map<TopicPartition, Long> getLastCommittedOffsetsFromCustomStore(Collection<TopicPartition> partitions) {
+//                                                            return null;
+//                                                        }
+//                                                        private Map<TopicPartition, Long> getLastConsumedOffsets(Collection<TopicPartition> partitions) { return null; }
+//                                                        private void commitOffsetsToCustomStore(Map<TopicPartition, Long> offsets) {}
+//                                                    });
+//        // subscribe to topics
+//        consumer.subscribe("foo", "bar");
+//        int commitInterval = 100;
+//        int numRecords = 0;
+//        boolean isRunning = true;
+//        while(isRunning) {
+//            Map<String, ConsumerRecords> records = consumer.poll(100);
+//            Map<TopicPartition, Long> consumedOffsets = process(records);
+//            numRecords += records.size();
+//            // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
+//            if(numRecords % commitInterval == 0) 
+//                commitOffsetsToCustomStore(consumedOffsets);
+//        }
+//        consumer.close();
+//    }
+
+    /**
+     * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality along with Kafka based offset storage. 
+     * In this example, the assumption made is that the user chooses to use Kafka based offset management. 
+     */
+//    @Test
+//    public void testConsumerRewindWithGroupManagementAndKafkaOffsetStorage() {
+//        Properties props = new Properties();
+//        props.put("metadata.broker.list", "localhost:9092");
+//        props.put("group.id", "test");
+//        props.put("session.timeout.ms", "1000");
+//        props.put("auto.commit.enable", "false");        
+//        KafkaConsumer consumer = new KafkaConsumer(props,
+//                                                   new ConsumerRebalanceCallback() {
+//                                                        boolean rewindOffsets = true;
+//                                                        public void onPartitionsAssigned(Consumer consumer, Collection<TopicPartition> partitions) {
+//                                                            if(rewindOffsets) {
+//                                                            Map<TopicPartition, Long> latestCommittedOffsets = consumer.committed(null);
+//                                                            Map<TopicPartition, Long> newOffsets = rewindOffsets(latestCommittedOffsets, 100);
+//                                                            consumer.seek(newOffsets);
+//                                                            }
+//                                                        }
+//                                                        public void onPartitionsRevoked(Consumer consumer, Collection<TopicPartition> partitions) {
+//                                                            consumer.commit(true);
+//                                                        }
+//                                                        // this API rewinds every partition back by numberOfMessagesToRewindBackTo messages 
+//                                                        private Map<TopicPartition, Long> rewindOffsets(Map<TopicPartition, Long> currentOffsets,
+//                                                                                                        long numberOfMessagesToRewindBackTo) {
+//                                                            Map<TopicPartition, Long> newOffsets = new HashMap<TopicPartition, Long>();
+//                                                            for(Map.Entry<TopicPartition, Long> offset : currentOffsets.entrySet()) {
+//                                                                newOffsets.put(offset.getKey(), offset.getValue() - numberOfMessagesToRewindBackTo);
+//                                                            }
+//                                                            return newOffsets;
+//                                                        }
+//                                                    });
+//        // subscribe to topics
+//        consumer.subscribe("foo", "bar");
+//        int commitInterval = 100;
+//        int numRecords = 0;
+//        boolean isRunning = true;
+//        while(isRunning) {
+//            Map<String, ConsumerRecords> records = consumer.poll(100);
+//            Map<TopicPartition, Long> consumedOffsets = process(records);
+//            numRecords += records.size();
+//            // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
+//            if(numRecords % commitInterval == 0) 
+//                commitOffsetsToCustomStore(consumedOffsets);
+//        }
+//        consumer.close();
+//    }
+
+    /**
+     * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest
+     * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes 
+     * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions.
+     * This example assumes that the user chooses to use Kafka based offset storage. The user still has to specify a group.id to use Kafka 
+     * based offset management. However, session.timeout.ms is not required since the Kafka consumer only does failure detection with group 
+     * management.
+     */
+//    @Test
+//    public void testConsumerWithKafkaBasedOffsetManagement() {
+//        Properties props = new Properties();
+//        props.put("metadata.broker.list", "localhost:9092");
+//        props.put("group.id", "test");
+//        props.put("auto.commit.enable", "true");
+//        props.put("auto.commit.interval.ms", "10000");
+//        KafkaConsumer consumer = new KafkaConsumer(props);
+//        // subscribe to some partitions of topic foo
+//        TopicPartition partition0 = new TopicPartition("foo", 0);
+//        TopicPartition partition1 = new TopicPartition("foo", 1);
+//        TopicPartition[] partitions = new TopicPartition[2];
+//        partitions[0] = partition0;
+//        partitions[1] = partition1;
+//        consumer.subscribe(partitions);
+//        // find the last committed offsets for partitions 0,1 of topic foo
+//        Map<TopicPartition, Long> lastCommittedOffsets = consumer.committed(null);
+//        // seek to the last committed offsets to avoid duplicates
+//        consumer.seek(lastCommittedOffsets);        
+//        // find the offsets of the latest available messages to know where to stop consumption
+//        Map<TopicPartition, Long> latestAvailableOffsets = consumer.offsetsBeforeTime(-2, null);
+//        boolean isRunning = true;
+//        while(isRunning) {
+//            Map<String, ConsumerRecords> records = consumer.poll(100);
+//            Map<TopicPartition, Long> consumedOffsets = process(records);
+//            for(TopicPartition partition : partitions) {
+//                if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
+//                    isRunning = false;
+//                else
+//                    isRunning = true;
+//            }
+//        }
+//        consumer.close();
+//    }
+
+    /**
+     * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest
+     * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes 
+     * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions.
+     * This example assumes that the user chooses to use custom offset storage.
+     */
+    @Test
+    public void testConsumerWithCustomOffsetManagement() {
+//        Properties props = new Properties();
+//        props.put("metadata.broker.list", "localhost:9092");
+//        KafkaConsumer consumer = new KafkaConsumer(props);
+//        // subscribe to some partitions of topic foo
+//        TopicPartition partition0 = new TopicPartition("foo", 0);
+//        TopicPartition partition1 = new TopicPartition("foo", 1);
+//        TopicPartition[] partitions = new TopicPartition[2];
+//        partitions[0] = partition0;
+//        partitions[1] = partition1;
+//        consumer.subscribe(partitions);
+//        Map<TopicPartition, Long> lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore();
+//        // seek to the last committed offsets to avoid duplicates
+//        consumer.seek(lastCommittedOffsets);        
+//        // find the offsets of the latest available messages to know where to stop consumption
+//        Map<TopicPartition, Long> latestAvailableOffsets = consumer.offsetsBeforeTime(-2, null);
+//        boolean isRunning = true;
+//        while(isRunning) {
+//            Map<String, ConsumerRecords> records = consumer.poll(100);
+//            Map<TopicPartition, Long> consumedOffsets = process(records);
+//            // commit offsets for partitions 0,1 for topic foo to custom store
+//            commitOffsetsToCustomStore(consumedOffsets);
+//            for(TopicPartition partition : partitions) {
+//                if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
+//                    isRunning = false;
+//                else
+//                    isRunning = true;
+//            }            
+//        }         
+//        consumer.close();
+    }
+
+    private Map<TopicPartition, Long> getLastCommittedOffsetsFromCustomStore() { return null; }
+    private void commitOffsetsToCustomStore(Map<TopicPartition, Long> consumedOffsets) {}
+    private Map<TopicPartition, Long> process(Map<String, ConsumerRecords> records) {
+        Map<TopicPartition, Long> processedOffsets = new HashMap<TopicPartition, Long>();
+        for(Entry<String, ConsumerRecords> recordMetadata : records.entrySet()) {
+            List<ConsumerRecord> recordsPerTopic = recordMetadata.getValue().records();
+            for(int i = 0;i < recordsPerTopic.size();i++) {
+                ConsumerRecord record = recordsPerTopic.get(i);
+                // process record
+                try {
+                    processedOffsets.put(record.topicAndPartition(), record.offset());
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }                
+            }
+        }
+        return processedOffsets; 
+    }
+}


[2/2] git commit: KAFKA-1328 New consumer APIs; reviewed by Jun Rao and Guozhang Wang

Posted by ne...@apache.org.
KAFKA-1328 New consumer APIs; reviewed by Jun Rao and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c24740c7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c24740c7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c24740c7

Branch: refs/heads/trunk
Commit: c24740c7b0f6a6e7c66659da786a346650b76766
Parents: bf7fb63
Author: Neha Narkhede <ne...@gmail.com>
Authored: Tue May 20 16:49:31 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue May 20 16:49:31 2014 -0700

----------------------------------------------------------------------
 .../apache/kafka/clients/consumer/Consumer.java | 125 ++++
 .../kafka/clients/consumer/ConsumerConfig.java  | 187 ++++++
 .../consumer/ConsumerRebalanceCallback.java     |  50 ++
 .../kafka/clients/consumer/ConsumerRecord.java  | 127 ++++
 .../kafka/clients/consumer/ConsumerRecords.java |  61 ++
 .../kafka/clients/consumer/KafkaConsumer.java   | 575 +++++++++++++++++++
 .../kafka/clients/consumer/MockConsumer.java    | 192 +++++++
 .../kafka/clients/consumer/OffsetMetadata.java  |  59 ++
 .../kafka/clients/producer/KafkaProducer.java   |  26 +-
 .../apache/kafka/common/utils/ClientUtils.java  |  44 ++
 .../clients/consumer/ConsumerExampleTest.java   | 298 ++++++++++
 11 files changed, 1720 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c24740c7/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
new file mode 100644
index 0000000..227f564
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+*/
+package org.apache.kafka.clients.consumer;
+
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * @see KafkaConsumer
+ * @see MockConsumer
+ */
+public interface Consumer extends Closeable {
+
+    /**
+     * Incrementally subscribe to the given list of topics. This API is mutually exclusive to 
+     * {@link #subscribe(TopicPartition...) subscribe(partitions)} 
+     * @param topics A variable list of topics that the consumer subscribes to
+     */ 
+    public void subscribe(String...topics);
+
+    /**
+     * Incrementally subscribes to a specific topic and partition. This API is mutually exclusive to 
+     * {@link #subscribe(String...) subscribe(topics)}
+     * @param partitions Partitions to subscribe to
+     */ 
+    public void subscribe(TopicPartition... partitions);
+
+    /**
+     * Unsubscribe from the specific topics. Messages for this topic will not be returned from the next {@link #poll(long) poll()}
+     * onwards. This should be used in conjunction with {@link #subscribe(String...) subscribe(topics)}. It is an error to
+     * unsubscribe from a topic that was never subscribed to using {@link #subscribe(String...) subscribe(topics)} 
+     * @param topics Topics to unsubscribe from
+     */
+    public void unsubscribe(String... topics);
+
+    /**
+     * Unsubscribe from the specific topic partitions. Messages for these partitions will not be returned from the next 
+     * {@link #poll(long) poll()} onwards. This should be used in conjunction with 
+     * {@link #subscribe(TopicPartition...) subscribe(topic, partitions)}. It is an error to
+     * unsubscribe from a partition that was never subscribed to using {@link #subscribe(TopicPartition...) subscribe(partitions)}
+     * @param partitions Partitions to unsubscribe from
+     */
+    public void unsubscribe(TopicPartition... partitions);
+    
+    /**
+     * Fetches data for the subscribed list of topics and partitions
+     * @param timeout  The time, in milliseconds, spent waiting in poll if data is not available. If 0, waits indefinitely. Must not be negative
+     * @return Map of topic to records for the subscribed topics and partitions as soon as data is available for a topic partition. Availability
+     *         of data is controlled by {@link ConsumerConfig#FETCH_MIN_BYTES_CONFIG} and {@link ConsumerConfig#FETCH_MAX_WAIT_MS_CONFIG}.
+     *         If no data is available for timeout ms, returns an empty list
+     */
+    public Map<String, ConsumerRecords> poll(long timeout);
+
+    /**
+     * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions.
+     * @param sync If true, the commit should block until the consumer receives an acknowledgment 
+     * @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. Returns null
+     * if the sync flag is set to false 
+     */
+    public OffsetMetadata commit(boolean sync);
+
+    /**
+     * Commits the specified offsets for the specified list of topics and partitions to Kafka.
+     * @param offsets The map of offsets to commit for the given topic partitions
+     * @param sync If true, commit will block until the consumer receives an acknowledgment 
+     * @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. Returns null
+     * if the sync flag is set to false. 
+     */
+    public OffsetMetadata commit(Map<TopicPartition, Long> offsets, boolean sync);
+    
+    /**
+     * Overrides the fetch positions that the consumer will use on the next fetch request. If the consumer subscribes to a list of topics
+     * using {@link #subscribe(String...) subscribe(topics)}, an exception will be thrown if the specified topic partition is not owned by
+     * the consumer.  
+     * @param offsets The map of fetch positions per topic and partition
+     */
+    public void seek(Map<TopicPartition, Long> offsets);
+
+    /**
+     * Returns the fetch position of the <i>next message</i> for the specified topic partition to be used on the next {@link #poll(long) poll()}
+     * @param partitions Partitions for which the fetch position will be returned
+     * @return The position from which data will be fetched for the specified partition on the next {@link #poll(long) poll()}
+     */
+    public Map<TopicPartition, Long> position(Collection<TopicPartition> partitions);
+    
+    /**
+     * Fetches the last committed offsets for the input list of partitions 
+     * @param partitions The list of partitions to return the last committed offset for
+     * @return  The list of offsets for the specified list of partitions
+     */
+    public Map<TopicPartition, Long> committed(Collection<TopicPartition> partitions);
+    
+    /**
+     * Fetches offsets before a certain timestamp
+     * @param timestamp The unix timestamp. Value -1 indicates earliest available timestamp. Value -2 indicates latest available timestamp. 
+     * @param partitions The list of partitions for which the offsets are returned
+     * @return The offsets for messages that were written to the server before the specified timestamp.
+     */
+    public Map<TopicPartition, Long> offsetsBeforeTime(long timestamp, Collection<TopicPartition> partitions);
+    
+    /**
+     * Return a map of metrics maintained by the consumer
+     */
+    public Map<String, ? extends Metric> metrics();
+
+    /**
+     * Close this consumer
+     */
+    public void close();
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c24740c7/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
new file mode 100644
index 0000000..46efc0c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+*/
+package org.apache.kafka.clients.consumer;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+
+/**
+ * The consumer configuration keys
+ */
+public class ConsumerConfig extends AbstractConfig {
+    private static final ConfigDef config;
+
+    /**
+     * The identifier of the group this consumer belongs to. This is required if the consumer uses either the
+     * group management functionality by using {@link Consumer#subscribe(String...) subscribe(topics)}. This is also required
+     * if the consumer uses the default Kafka based offset management strategy. 
+     */
+    public static final String GROUP_ID_CONFIG = "group.id";
+    
+    /**
+     * The timeout after which, if the {@link Consumer#poll(long) poll(timeout)} is not invoked, the consumer is
+     * marked dead and a rebalance operation is triggered for the group identified by {@link #GROUP_ID_CONFIG}. Relevant 
+     * if the consumer uses the group management functionality by invoking {@link Consumer#subscribe(String...) subscribe(topics)} 
+     */
+    public static final String SESSION_TIMEOUT_MS = "session.timeout.ms";
+
+    /**
+     * The number of times a consumer sends a heartbeat to the co-ordinator broker within a {@link #SESSION_TIMEOUT_MS} time window.
+     * This frequency affects the latency of a rebalance operation since the co-ordinator broker notifies a consumer of a rebalance 
+     * in the heartbeat response. Relevant if the consumer uses the group management functionality by invoking 
+     * {@link Consumer#subscribe(String...) subscribe(topics)} 
+     */
+    public static final String HEARTBEAT_FREQUENCY = "heartbeat.frequency";
+
+    /**
+     * A list of URLs to use for establishing the initial connection to the cluster. This list should be in the form
+     * <code>host1:port1,host2:port2,...</code>. These urls are just used for the initial connection to discover the
+     * full cluster membership (which may change dynamically) so this list need not contain the full set of servers (you
+     * may want more than one, though, in case a server is down).
+     */
+    public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
+
+    /**
+     * If true, periodically commit to Kafka the offsets of messages already returned by the consumer. This committed 
+     * offset will be used when the process fails as the position from which the consumption will begin.
+     */
+    public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit";
+    
+    /**
+     * The friendly name of the partition assignment strategy that the server will use to distribute partition ownership
+     * amongst consumer instances when group management is used
+     */
+    public static final String PARTITION_ASSIGNMENT_STRATEGY = "partition.assignment.strategy";
+    
+    /**
+     * The frequency in milliseconds that the consumer offsets are committed to Kafka. Relevant if {@link #ENABLE_AUTO_COMMIT_CONFIG}
+     * is turned on.
+     */
+    public static final String AUTO_COMMIT_INTERVAL_MS_CONFIG = "auto.commit.interval.ms";
+    
+    /**
+     * What to do when there is no initial offset in Kafka or if an offset is out of range:
+     * <ul>
+     * <li> smallest:      automatically reset the offset to the smallest offset
+     * <li> largest:       automatically reset the offset to the largest offset
+     * <li> disable:       throw exception to the consumer if no previous offset is found for the consumer's group
+     * <li> anything else: throw exception to the consumer. 
+     * </ul> 
+     */
+    public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
+    
+    /**
+     * The minimum amount of data the server should return for a fetch request. If insufficient data is available the 
+     * request will wait for that much data to accumulate before answering the request.
+     */
+    public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes";
+    
+    /**
+     * The maximum amount of time the server will block before answering the fetch request if there isn't sufficient 
+     * data to immediately satisfy {@link #FETCH_MIN_BYTES_CONFIG}. This should be less than or equal to the timeout used in 
+     * {@link KafkaConsumer#poll(long) poll(timeout)}
+     */
+    public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";
+    
+    /**
+     * The maximum amount of time to block waiting to fetch metadata about a topic the first time a record is received 
+     * from that topic. The consumer will throw a TimeoutException if it could not successfully fetch metadata within
+     * this timeout.
+     */
+    public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms";
+
+    /**
+     * The total memory used by the consumer to buffer records received from the server. This config is meant to control
+     * the consumer's memory usage, so it is the size of the global fetch buffer that will be shared across all partitions. 
+     */
+    public static final String TOTAL_BUFFER_MEMORY_CONFIG = "total.memory.bytes";
+
+    /**
+     * The minimum amount of memory that should be used to fetch at least one message for a partition. This puts a lower
+     * bound on the consumer's memory utilization when there is at least one message for a partition available on the server.
+     * This size must be at least as large as the maximum message size the server allows or else it is possible for the producer 
+     * to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large 
+     * message on a certain partition. 
+     */
+    public static final String FETCH_BUFFER_CONFIG = "fetch.buffer.bytes";
+    
+    /**
+     * The id string to pass to the server when making requests. The purpose of this is to be able to track the source
+     * of requests beyond just ip/port by allowing a logical application name to be included.
+     */
+    public static final String CLIENT_ID_CONFIG = "client.id";
+
+    /**
+     * The size of the TCP send buffer to use when fetching data
+     */
+    public static final String SOCKET_RECEIVE_BUFFER_CONFIG = "socket.receive.buffer.bytes";
+
+    /**
+     * The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a
+     * host in a tight loop. This backoff applies to all requests sent by the consumer to the broker.
+     */
+    public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
+
+    /** <code>metrics.sample.window.ms</code> */
+    public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
+    private static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The metrics system maintains a configurable number of samples over a fixed window size. This configuration " + "controls the size of the window. For example we might maintain two samples each measured over a 30 second period. "
+                                                               + "When a window expires we erase and overwrite the oldest window.";
+
+    /** <code>metrics.num.samples</code> */
+    public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";
+    private static final String METRICS_NUM_SAMPLES_DOC = "The number of samples maintained to compute metrics.";
+
+    /** <code>metric.reporters</code> */
+    public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
+    private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows " + "plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";
+
+    static {
+        /* TODO: add config docs */
+        config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, "blah blah")
+                                .define(GROUP_ID_CONFIG, Type.STRING, Importance.HIGH, "blah blah")
+                                .define(SESSION_TIMEOUT_MS, Type.LONG, 1000, Importance.HIGH, "blah blah")
+                                .define(HEARTBEAT_FREQUENCY, Type.INT, 3, Importance.MEDIUM, "blah blah")
+                                .define(PARTITION_ASSIGNMENT_STRATEGY, Type.STRING, Importance.MEDIUM, "blah blah")
+                                .define(METADATA_FETCH_TIMEOUT_CONFIG, Type.LONG, 60 * 1000, atLeast(0), Importance.MEDIUM, "blah blah")
+                                .define(ENABLE_AUTO_COMMIT_CONFIG, Type.BOOLEAN, true, Importance.MEDIUM, "blah blah")
+                                .define(AUTO_COMMIT_INTERVAL_MS_CONFIG, Type.LONG, 5000, atLeast(0), Importance.LOW, "blah blah")
+                                .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.LOW, "blah blah")
+                                .define(TOTAL_BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.LOW, "blah blah")
+                                .define(FETCH_BUFFER_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), Importance.HIGH, "blah blah")
+                                .define(SOCKET_RECEIVE_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.LOW, "blah blah")
+                                .define(FETCH_MIN_BYTES_CONFIG, Type.LONG, 1024, atLeast(0), Importance.HIGH, "blah blah")
+                                .define(FETCH_MAX_WAIT_MS_CONFIG, Type.LONG, 500, atLeast(0), Importance.LOW, "blah blah")
+                                .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), Importance.LOW, "blah blah")
+                                .define(AUTO_OFFSET_RESET_CONFIG, Type.STRING, "largest", Importance.MEDIUM, "blah blah")
+                                .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
+                                        Type.LONG,
+                                        30000,
+                                        atLeast(0),
+                                        Importance.LOW,
+                                        METRICS_SAMPLE_WINDOW_MS_DOC)
+                                .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC)
+                                .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC);
+                                
+    }
+
+    ConsumerConfig(Map<? extends Object, ? extends Object> props) {
+        super(config, props);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c24740c7/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
new file mode 100644
index 0000000..05eb6ce
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+*/
+package org.apache.kafka.clients.consumer;
+
+import java.util.Collection;
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * A callback interface that the user can implement to manage customized offsets on the start and end of 
+ * every rebalance operation. This callback will execute in the user thread as part of the 
+ * {@link Consumer#poll(long) poll(long)} API on every rebalance attempt.
+ * Default implementation of the callback will {@link Consumer#seek(java.util.Map) seek(offsets)} to the last committed offsets in the
+ * {@link #onPartitionsAssigned(Consumer, TopicPartition...) onPartitionsAssigned()} callback. And will commit offsets synchronously 
+ * for the specified list of partitions to Kafka in the {@link #onPartitionsRevoked(Consumer, TopicPartition...) onPartitionsRevoked()} 
+ * callback.
+ */
+public interface ConsumerRebalanceCallback {
+
+    /**
+     * A callback method the user can implement to provide handling of customized offsets on completion of a successful 
+     * rebalance operation. This method will be called after a rebalance operation completes and before the consumer 
+     * starts fetching data.
+     * <p> 
+     * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}  
+     * @param partitions The list of partitions that are assigned to the consumer after rebalance
+     */
+    public void onPartitionsAssigned(Consumer consumer, Collection<TopicPartition> partitions);
+    
+    /**
+     * A callback method the user can implement to provide handling of offset commits to a customized store on the 
+     * start of a rebalance operation. This method will be called before a rebalance operation starts and after the 
+     * consumer stops fetching data. It is recommended that offsets should be committed in this callback to 
+     * either Kafka or a custom offset store to prevent duplicate data 
+     * <p> 
+     * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}  
+     * @param partitions The list of partitions that were assigned to the consumer on the last rebalance
+     */
+    public void onPartitionsRevoked(Consumer consumer, Collection<TopicPartition> partitions);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c24740c7/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
new file mode 100644
index 0000000..436d8a4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+*/
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * A key/value pair to be received from Kafka. This consists of a topic name and a partition number, from which the 
+ * record is being received and an offset that points to the record in a Kafka partition. 
+ */
+public final class ConsumerRecord {
+    private final TopicPartition partition; 
+    private final byte[] key;
+    private final byte[] value;
+    private final long offset;
+    private volatile Exception error;
+    
+    /**
+     * Creates a record to be received from a specified topic and partition
+     * 
+     * @param topic     The topic this record is received from
+     * @param partitionId The partition of the topic this record is received from
+     * @param key       The key of the record, if one exists
+     * @param value     The record contents
+     * @param offset    The offset of this record in the corresponding Kafka partition
+     */
+    public ConsumerRecord(String topic, int partitionId, byte[] key, byte[] value, long offset) {
+        this(topic, partitionId, key, value, offset, null);
+    }
+
+    /**
+     * Create a record with no key
+     * 
+     * @param topic The topic this record is received from
+     * @param partitionId The partition of the topic this record is received from
+     * @param value The record contents
+     * @param offset The offset of this record in the corresponding Kafka partition
+     */
+    public ConsumerRecord(String topic, int partitionId, byte[] value, long offset) {
+        this(topic, partitionId, null, value, offset);
+    }
+
+    /**
+     * Creates a record with an error code
+     * @param topic     The topic this record is received from
+     * @param partitionId The partition of the topic this record is received from
+     * @param error     The exception corresponding to the error code returned by the server for this topic partition
+     */
+    public ConsumerRecord(String topic, int partitionId, Exception error) {
+        this(topic, partitionId, null, null, -1L, error);
+    }
+    
+    private ConsumerRecord(String topic, int partitionId, byte[] key, byte[] value, long offset, Exception error) {
+        if (topic == null)
+            throw new IllegalArgumentException("Topic cannot be null");
+        this.partition = new TopicPartition(topic, partitionId);
+        this.key = key;
+        this.value = value;
+        this.offset = offset;  
+        this.error = error;
+    }
+    
+    /**
+     * The topic this record is received from
+     */
+    public String topic() {
+        return partition.topic();
+    }
+
+    /**
+     * The partition from which this record is received 
+     */
+    public int partition() {
+        return partition.partition();
+    }
+    
+    /**
+     * The TopicPartition object containing the topic and partition
+     */
+    public TopicPartition topicAndPartition() {
+        return partition;
+    }
+    
+    /**
+     * The key (or null if no key is specified)
+     * @throws Exception The exception thrown while fetching this record.
+     */
+    public byte[] key() throws Exception {
+        if (this.error != null)
+            throw this.error;
+        return key;
+    }
+
+    /**
+     * The value
+     * @throws Exception The exception thrown while fetching this record.
+     */
+    public byte[] value() throws Exception {
+        if (this.error != null)
+            throw this.error;
+        return value;
+    }
+
+    /**
+     * The position of this record in the corresponding Kafka partition.
+     * @throws Exception The exception thrown while fetching this record.
+     */
+    public long offset() throws Exception {
+        if (this.error != null)
+            throw this.error;
+        return offset;
+    }
+
+    public Exception error() {
+        return this.error;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c24740c7/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
new file mode 100644
index 0000000..2ecfc8a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+*/
+package org.apache.kafka.clients.consumer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * A container that holds the list {@link ConsumerRecord} per partition for a particular topic. There is one for every topic returned by a 
+ * {@link Consumer#poll(long)} operation. 
+ */
+public class ConsumerRecords {
+
+    private final String topic;
+    private final Map<Integer, List<ConsumerRecord>> recordsPerPartition;
+    
+    public ConsumerRecords(String topic, Map<Integer, List<ConsumerRecord>> records) {
+        this.topic = topic;
+        this.recordsPerPartition = records;
+    }
+    
+    /**
+     * @param partitions The input list of partitions for a particular topic. If no partitions are 
+     * specified, returns records for all partitions
+     * @return The list of {@link ConsumerRecord}s associated with the given partitions.
+     */
+    public List<ConsumerRecord> records(int... partitions) {
+        List<ConsumerRecord> recordsToReturn = new ArrayList<ConsumerRecord>(); 
+        if(partitions.length == 0) {
+            // return records for all partitions
+            for(Entry<Integer, List<ConsumerRecord>> record : recordsPerPartition.entrySet()) {
+                recordsToReturn.addAll(record.getValue());
+            }
+        } else {
+           for(int partition : partitions) {
+               List<ConsumerRecord> recordsForThisPartition = recordsPerPartition.get(partition);
+               recordsToReturn.addAll(recordsForThisPartition);
+           }
+        }
+        return recordsToReturn;
+    }
+
+    /**
+     * @return The topic of all records associated with this instance
+     */
+    public String topic() {
+        return this.topic;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c24740c7/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
new file mode 100644
index 0000000..18bcc90
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+*/
+package org.apache.kafka.clients.consumer;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.Future;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.utils.ClientUtils;
+import org.apache.kafka.common.utils.SystemTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Kafka client that consumes records from a Kafka cluster.
+ * <P>
+ * The consumer is <i>thread safe</i> and should generally be shared among all threads for best performance.
+ * <p>
+ * The consumer is single threaded and multiplexes I/O over TCP connections to each of the brokers it
+ * needs to communicate with. Failure to close the consumer after use will leak these resources.
+ * <h3>Usage Examples</h3>
+ * The consumer APIs offer flexibility to cover a variety of consumption use cases. Following are some examples to demonstrate the correct use of 
+ * the available APIs. Each of the examples assumes the presence of a user implemented process() method that processes a given batch of messages
+ * and returns the offset of the latest processed message per partition. Note that process() is not part of the consumer API and is only used as
+ * a convenience method to demonstrate the different use cases of the consumer APIs. Here is a sample implementation of such a process() method.
+ * <pre>
+ * {@code
+ * private Map<TopicPartition, Long> process(Map<String, ConsumerRecords> records) {
+ *     Map<TopicPartition, Long> processedOffsets = new HashMap<TopicPartition, Long>();
+ *     for(Entry<String, ConsumerRecords> recordMetadata : records.entrySet()) {
+ *          List<ConsumerRecord> recordsPerTopic = recordMetadata.getValue().records();
+ *          for(int i = 0;i < recordsPerTopic.size();i++) {
+ *               ConsumerRecord record = recordsPerTopic.get(i);
+ *               // process record
+ *               processedOffsets.put(record.partition(), record.offset());                
+ *          }
+ *     }
+ *     return processedOffsets; 
+ * }
+ * }
+ * </pre>
+ * <p>
+ * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality for automatic consumer load 
+ * balancing and failover. This example assumes that the offsets are stored in Kafka and are automatically committed periodically, 
+ * as controlled by the auto.commit.interval.ms config
+ * <pre>
+ * {@code  
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * props.put("group.id", "test");
+ * props.put("session.timeout.ms", "1000");
+ * props.put("enable.auto.commit", "true");
+ * props.put("auto.commit.interval.ms", "10000");
+ * KafkaConsumer consumer = new KafkaConsumer(props);
+ * consumer.subscribe("foo", "bar");
+ * boolean isRunning = true;
+ * while(isRunning) {
+ *   Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ *   process(records);
+ * }
+ * consumer.close();
+ * }
+ * </pre>
+ * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality for automatic consumer load 
+ * balancing and failover. This example assumes that the offsets are stored in Kafka and are manually committed using 
+ * the commit() API. This example also demonstrates rewinding the consumer's offsets if processing of the consumed
+ * messages fails. Note that this method of rewinding offsets using {@link #seek(Map) seek(offsets)} is only useful for rewinding the offsets
+ * of the current consumer instance. As such, this will not trigger a rebalance or affect the fetch offsets for the other consumer instances.
+ * <pre>
+ * {@code  
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * props.put("group.id", "test");
+ * props.put("session.timeout.ms", "1000");
+ * props.put("enable.auto.commit", "false");
+ * KafkaConsumer consumer = new KafkaConsumer(props);
+ * consumer.subscribe("foo", "bar");
+ * int commitInterval = 100;
+ * int numRecords = 0;
+ * boolean isRunning = true;
+ * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
+ * while(isRunning) {
+ *     Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ *     try {
+ *         Map<TopicPartition, Long> lastConsumedOffsets = process(records);
+ *         consumedOffsets.putAll(lastConsumedOffsets);
+ *         numRecords += records.size();
+ *         // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
+ *         if(numRecords % commitInterval == 0) 
+ *           consumer.commit();
+ *     } catch(Exception e) {
+ *         try {
+ *             // rewind consumer's offsets for failed partitions
+ *             // assume failedPartitions() returns the list of partitions for which the processing of the last batch of messages failed
+ *             List<TopicPartition> failedPartitions = failedPartitions();   
+ *             Map<TopicPartition, Long> offsetsToRewindTo = new HashMap<TopicPartition, Long>();
+ *             for(TopicPartition failedPartition : failedPartitions) {
+ *                 // rewind to the last consumed offset for the failed partition. Since process() failed for this partition, the consumed offset
+ *                 // should still be pointing to the last successfully processed offset and hence is the right offset to rewind consumption to.
+ *                 offsetsToRewindTo.put(failedPartition, consumedOffsets.get(failedPartition));
+ *             }
+ *             // seek to new offsets only for partitions that failed the last process()
+ *             consumer.seek(offsetsToRewindTo);
+ *         } catch(Exception e) {  break; } // rewind failed
+ *     }
+ * }         
+ * consumer.close();
+ * }
+ * </pre>
+ * <p>
+ * This example demonstrates how to rewind the offsets of the entire consumer group. It is assumed that the user has chosen to use Kafka's 
+ * group management functionality for automatic consumer load balancing and failover. This example also assumes that the offsets are stored in 
+ * Kafka. If group management is used, the right place to systematically rewind offsets for <i>every</i> consumer instance is inside the 
+ * ConsumerRebalanceCallback. The onPartitionsAssigned callback is invoked after the consumer is assigned a new set of partitions on rebalance 
+ * <i>and</i> before the consumption restarts post rebalance. This is the right place to supply the newly rewound offsets to the consumer. It 
+ * is recommended that if you foresee the requirement to ever reset the consumer's offsets in the presence of group management, that you 
+ * always configure the consumer to use the ConsumerRebalanceCallback with a flag that protects whether or not the offset rewind logic is used.
+ * This method of rewinding offsets is useful if you notice an issue with your message processing after successful consumption and offset commit.
+ * And you would like to rewind the offsets for the entire consumer group as part of rolling out a fix to your processing logic. In this case,
+ * you would configure each of your consumer instances with the offset rewind configuration flag turned on and bounce each consumer instance 
+ * in a rolling restart fashion. Each restart will trigger a rebalance and eventually all consumer instances would have rewound the offsets for 
+ * the partitions they own, effectively rewinding the offsets for the entire consumer group.   
+ * <pre>
+ * {@code  
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * props.put("group.id", "test");
+ * props.put("session.timeout.ms", "1000");
+ * props.put("enable.auto.commit", "false");
+ * KafkaConsumer consumer = new KafkaConsumer(props,
+ *                                            new ConsumerRebalanceCallback() {
+ *                                                boolean rewindOffsets = true;  // should be retrieved from external application config
+ *                                                public void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) {
+ *                                                    Map<TopicPartition, Long> latestCommittedOffsets = consumer.committed(partitions);
+ *                                                    if(rewindOffsets)
+ *                                                        Map<TopicPartition, Long> newOffsets = rewindOffsets(latestCommittedOffsets, 100);
+ *                                                    consumer.seek(newOffsets);
+ *                                                }
+ *                                                public void onPartitionsRevoked(Consumer consumer, TopicPartition...partitions) {
+ *                                                    consumer.commit();
+ *                                                }
+ *                                                // this API rewinds every partition back by numberOfMessagesToRewindBackTo messages 
+ *                                                private Map<TopicPartition, Long> rewindOffsets(Map<TopicPartition, Long> currentOffsets,
+ *                                                                                                long numberOfMessagesToRewindBackTo) {
+ *                                                    Map<TopicPartition, Long> newOffsets = new HashMap<TopicPartition, Long>();
+ *                                                    for(Map.Entry<TopicPartition, Long> offset : currentOffsets.entrySet()) 
+ *                                                        newOffsets.put(offset.getKey(), offset.getValue() - numberOfMessagesToRewindBackTo);
+ *                                                    return newOffsets;
+ *                                                }
+ *                                            });
+ * consumer.subscribe("foo", "bar");
+ * int commitInterval = 100;
+ * int numRecords = 0;
+ * boolean isRunning = true;
+ * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
+ * while(isRunning) {
+ *     Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ *     Map<TopicPartition, Long> lastConsumedOffsets = process(records);
+ *     consumedOffsets.putAll(lastConsumedOffsets);
+ *     numRecords += records.size();
+ *     // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
+ *     if(numRecords % commitInterval == 0) 
+ *         consumer.commit(consumedOffsets);
+ * }
+ * consumer.close();
+ * }
+ * </pre>
+ * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality along with custom offset storage. 
+ * In this example, the assumption made is that the user chooses to store the consumer offsets outside Kafka. This requires the user to 
+ * plugin logic for retrieving the offsets from a custom store and provide the offsets to the consumer in the ConsumerRebalanceCallback
+ * callback. The onPartitionsAssigned callback is invoked after the consumer is assigned a new set of partitions on rebalance <i>and</i>
+ * before the consumption restarts post rebalance. This is the right place to supply offsets from a custom store to the consumer.
+ * <p>
+ * Similarly, the user would also be required to plugin logic for storing the consumer's offsets to a custom store. The onPartitionsRevoked 
+ * callback is invoked right after the consumer has stopped fetching data and before the partition ownership changes. This is the right place 
+ * to commit the offsets for the current set of partitions owned by the consumer.  
+ * <pre>
+ * {@code  
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * props.put("group.id", "test");
+ * props.put("session.timeout.ms", "1000");
+ * props.put("enable.auto.commit", "false"); // since enable.auto.commit only applies to Kafka based offset storage
+ * KafkaConsumer consumer = new KafkaConsumer(props,
+ *                                            new ConsumerRebalanceCallback() {
+ *                                                public void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) {
+ *                                                    Map<TopicPartition, Long> lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(partitions);
+ *                                                    consumer.seek(lastCommittedOffsets);
+ *                                                }
+ *                                                public void onPartitionsRevoked(Consumer consumer, TopicPartition...partitions) {
+ *                                                    Map<TopicPartition, Long> offsets = getLastConsumedOffsets(partitions);
+ *                                                    commitOffsetsToCustomStore(offsets); 
+ *                                                }
+ *                                                // following APIs should be implemented by the user for custom offset management
+ *                                                private Map<TopicPartition, Long> getLastCommittedOffsetsFromCustomStore(TopicPartition... partitions) {
+ *                                                    return null;
+ *                                                }
+ *                                                private Map<TopicPartition, Long> getLastConsumedOffsets(TopicPartition... partitions) { return null; }
+ *                                                private void commitOffsetsToCustomStore(Map<TopicPartition, Long> offsets) {}
+ *                                            });
+ * consumer.subscribe("foo", "bar");
+ * int commitInterval = 100;
+ * int numRecords = 0;
+ * boolean isRunning = true;
+ * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
+ * while(isRunning) {
+ *     Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ *     Map<TopicPartition, Long> lastConsumedOffsets = process(records);
+ *     consumedOffsets.putAll(lastConsumedOffsets);
+ *     numRecords += records.size();
+ *     // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
+ *     if(numRecords % commitInterval == 0) 
+ *         commitOffsetsToCustomStore(consumedOffsets);
+ * }
+ * consumer.close();
+ * }
+ * </pre>
+ * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest
+ * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes 
+ * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions.
+ * This example assumes that the user chooses to use Kafka based offset storage. The user still has to specify a group.id to use Kafka 
+ * based offset management. However, session.timeout.ms is not required since the Kafka consumer only does automatic failover when group 
+ * management is used.
+ * <pre>
+ * {@code  
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * props.put("group.id", "test");
+ * props.put("enable.auto.commit", "true");
+ * props.put("auto.commit.interval.ms", "10000");
+ * KafkaConsumer consumer = new KafkaConsumer(props);
+ * // subscribe to some partitions of topic foo
+ * TopicPartition partition0 = new TopicPartition("foo", 0);
+ * TopicPartition partition1 = new TopicPartition("foo", 1);
+ * TopicPartition[] partitions = new TopicPartition[2];
+ * partitions[0] = partition0;
+ * partitions[1] = partition1;
+ * consumer.subscribe(partitions);
+ * // find the last committed offsets for partitions 0,1 of topic foo
+ * Map<TopicPartition, Long> lastCommittedOffsets = consumer.committed(partition0, partition1);
+ * // seek to the last committed offsets to avoid duplicates
+ * consumer.seek(lastCommittedOffsets);        
+ * // find the offsets of the latest available messages to know where to stop consumption
+ * Map<TopicPartition, Long> latestAvailableOffsets = consumer.offsetsBeforeTime(-2, partition0, partition1);
+ * boolean isRunning = true;
+ * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
+ * while(isRunning) {
+ *     Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ *     Map<TopicPartition, Long> lastConsumedOffsets = process(records);
+ *     consumedOffsets.putAll(lastConsumedOffsets);
+ *     for(TopicPartition partition : partitions) {
+ *         if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
+ *             isRunning = false;
+ *         else
+ *             isRunning = true;
+ *     }
+ * }
+ * consumer.commit();
+ * consumer.close();
+ * }
+ * </pre>
+ * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest
+ * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes 
+ * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions.
+ * This example assumes that the user chooses to use custom offset storage.
+ * <pre>
+ * {@code  
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * KafkaConsumer consumer = new KafkaConsumer(props);
+ * // subscribe to some partitions of topic foo
+ * TopicPartition partition0 = new TopicPartition("foo", 0);
+ * TopicPartition partition1 = new TopicPartition("foo", 1);
+ * TopicPartition[] partitions = new TopicPartition[2];
+ * partitions[0] = partition0;
+ * partitions[1] = partition1;
+ * consumer.subscribe(partitions);
+ * Map<TopicPartition, Long> lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore();
+ * // seek to the last committed offsets to avoid duplicates
+ * consumer.seek(lastCommittedOffsets);        
+ * // find the offsets of the latest available messages to know where to stop consumption
+ * Map<TopicPartition, Long> latestAvailableOffsets = consumer.offsetsBeforeTime(-2, partition0, partition1);
+ * boolean isRunning = true;
+ * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
+ * while(isRunning) {
+ *     Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ *     Map<TopicPartition, Long> lastConsumedOffsets = process(records);
+ *     consumedOffsets.putAll(lastConsumedOffsets);
+ *     // commit offsets for partitions 0,1 for topic foo to custom store
+ *     commitOffsetsToCustomStore(consumedOffsets);
+ *     for(TopicPartition partition : partitions) {
+ *         if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
+ *             isRunning = false;
+ *         else
+ *             isRunning = true;
+ *     }            
+ * }      
+ * commitOffsetsToCustomStore(consumedOffsets);   
+ * consumer.close();
+ * }
+ * </pre>
+ */
+public class KafkaConsumer implements Consumer {
+
+    private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
+
+    private final long metadataFetchTimeoutMs;
+    private final long totalMemorySize;
+    private final Metrics metrics;
+    private final Set<String> subscribedTopics;
+    private final Set<TopicPartition> subscribedPartitions;
+    
+    /**
+     * A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
+     * are documented <a href="http://kafka.apache.org/documentation.html#consumerconfigs">here</a>. Values can be
+     * either strings or Objects of the appropriate type (for example a numeric configuration would accept either the
+     * string "42" or the integer 42).
+     * <p>
+     * Valid configuration strings are documented at {@link ConsumerConfig}
+     * @param configs   The consumer configs
+     */
+    public KafkaConsumer(Map<String, Object> configs) {
+        this(new ConsumerConfig(configs), null);
+    }
+
+    /**
+     * A consumer is instantiated by providing a set of key-value pairs as configuration and a {@link ConsumerRebalanceCallback} 
+     * implementation 
+     * <p>
+     * Valid configuration strings are documented at {@link ConsumerConfig}
+     * @param configs   The consumer configs
+     * @param callback  A callback interface that the user can implement to manage customized offsets on the start and end of 
+     *                  every rebalance operation.  
+     */
+    public KafkaConsumer(Map<String, Object> configs, ConsumerRebalanceCallback callback) {
+        this(new ConsumerConfig(configs), callback);
+    }
+
+    /**
+     * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration.      
+     * Valid configuration strings are documented at {@link ConsumerConfig}
+     */
+    public KafkaConsumer(Properties properties) {
+        this(new ConsumerConfig(properties), null);
+    }
+
+    /**
+     * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration and a 
+     * {@link ConsumerRebalanceCallback} implementation. 
+     * <p>
+     * Valid configuration strings are documented at {@link ConsumerConfig}
+     * @param properties The consumer configuration properties
+     * @param callback   A callback interface that the user can implement to manage customized offsets on the start and end of 
+     *                   every rebalance operation.  
+     */
+    public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback) {
+        this(new ConsumerConfig(properties), callback);
+    }
+
+    private KafkaConsumer(ConsumerConfig config) {
+        this(config, null);
+    }
+
+    private KafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback) {
+        log.trace("Starting the Kafka consumer");
+        subscribedTopics = new HashSet<String>();
+        subscribedPartitions = new HashSet<TopicPartition>();
+        this.metrics = new Metrics(new MetricConfig(),
+                                   Collections.singletonList((MetricsReporter) new JmxReporter("kafka.consumer.")),
+                                   new SystemTime());
+        this.metadataFetchTimeoutMs = config.getLong(ConsumerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
+        this.totalMemorySize = config.getLong(ConsumerConfig.TOTAL_BUFFER_MEMORY_CONFIG);
+        List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+        config.logUnused();
+        log.debug("Kafka consumer started");
+    }
+
+    /**
+     * Incrementally subscribes to the given list of topics and uses the consumer's group management functionality
+     * <p>
+     * As part of group management, the consumer will keep track of the list of consumers that belong to a particular group and
+     * will trigger a rebalance operation if one of the following events trigger -
+     * <ul>
+     * <li> Number of partitions change for any of the subscribed list of topics 
+     * <li> Topic is created or deleted 
+     * <li> An existing member of the consumer group dies 
+     * <li> A new member is added to an existing consumer group via the join API 
+     * </ul> 
+     * @param topics A variable list of topics that the consumer wants to subscribe to
+     */
+    @Override
+    public void subscribe(String... topics) {
+        if(subscribedPartitions.size() > 0)
+            throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive");
+        for(String topic:topics)
+            subscribedTopics.add(topic);
+        // TODO: trigger a rebalance operation
+    }
+
+    /**
+     * Incrementally subscribes to a specific topic partition and does not use the consumer's group management functionality. As such,
+     * there will be no rebalance operation triggered when group membership or cluster and topic metadata change.
+     * <p>
+     * @param partitions Partitions to incrementally subscribe to
+     */
+    @Override
+    public void subscribe(TopicPartition... partitions) {
+        if(subscribedTopics.size() > 0)
+            throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive");
+        for(TopicPartition partition:partitions)
+            subscribedPartitions.add(partition);
+    }
+
+    /**
+     * Unsubscribe from the specific topics. This will trigger a rebalance operation and messages for this topic will not be returned 
+     * from the next {@link #poll(long) poll()} onwards
+     * @param topics Topics to unsubscribe from
+     */
+    public void unsubscribe(String... topics) {   
+        // throw an exception if the topic was never subscribed to
+        for(String topic:topics) {
+            if(!subscribedTopics.contains(topic))
+                throw new IllegalStateException("Topic " + topic + " was never subscribed to. subscribe(" + topic + ") should be called prior" +
+                		" to unsubscribe(" + topic + ")");
+            subscribedTopics.remove(topic);
+        }
+        // TODO trigger a rebalance operation
+    }
+
+    /**
+     * Unsubscribe from the specific topic partitions. Messages for these partitions will not be returned from the next 
+     * {@link #poll(long) poll()} onwards
+     * @param partitions Partitions to unsubscribe from
+     */
+    public void unsubscribe(TopicPartition... partitions) {        
+        // throw an exception if the partition was never subscribed to
+        for(TopicPartition partition:partitions) {
+            if(!subscribedPartitions.contains(partition))
+                throw new IllegalStateException("Partition " + partition + " was never subscribed to. subscribe(new TopicPartition(" + 
+                                                 partition.topic() + "," + partition.partition() + ") should be called prior" +
+                                                " to unsubscribe(new TopicPartition(" + partition.topic() + "," + partition.partition() + ")");
+            subscribedPartitions.remove(partition);                
+        }
+        // trigger a rebalance operation
+    }
+    
+    /**
+     * Fetches data for the topics or partitions specified using one of the subscribe APIs. It is an error to not have subscribed to
+     * any topics or partitions before polling for data.
+     * <p> 
+     * The offset used for fetching the data is governed by whether or not {@link #seek(Map) seek(offsets)}
+     * is used. If {@link #seek(Map) seek(offsets)} is used, it will use the specified offsets on startup and
+     * on every rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed offset 
+     * using {@link #commit(Map, boolean) commit(offsets, sync)} 
+     * for the subscribed list of partitions.
+     * @param timeout  The time, in milliseconds, spent waiting in poll if data is not available. If 0, waits indefinitely. Must not be negative
+     * @return map of topic to records since the last fetch for the subscribed list of topics and partitions
+     */
+    @Override
+    public Map<String, ConsumerRecords> poll(long timeout) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /**
+     * Commits the specified offsets for the specified list of topics and partitions to Kafka.
+     * <p>
+     * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance
+     * and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used.
+     * @param offsets The list of offsets per partition that should be committed to Kafka. 
+     * @param sync If true, commit will block until the consumer receives an acknowledgment  
+     * @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. Returns null
+     * if the sync flag is set to false.  
+     */
+    @Override
+    public OffsetMetadata commit(Map<TopicPartition, Long> offsets, boolean sync) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and 
+     * partitions. 
+     * <p>
+     * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance
+     * and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used.
+     * @param sync If true, commit will block until the consumer receives an acknowledgment  
+     * @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. Returns null
+     * if the sync flag is set to false.
+     */
+    @Override
+    public OffsetMetadata commit(boolean sync) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Overrides the fetch offsets that the consumer will use on the next {@link #poll(long) poll(timeout)}. If this API is invoked
+     * for the same partition more than once, the latest offset will be used on the next poll(). Note that you may lose data if this API is 
+     * arbitrarily used in the middle of consumption, to reset the fetch offsets  
+     */
+    @Override
+    public void seek(Map<TopicPartition, Long> offsets) {
+    }
+
+    /**
+     * Returns the fetch position of the <i>next message</i> for the specified topic partition to be used on the next {@link #poll(long) poll()}
+     * @param partitions Partitions for which the fetch position will be returned
+     * @return The position from which data will be fetched for the specified partition on the next {@link #poll(long) poll()}
+     */
+    public Map<TopicPartition, Long> position(Collection<TopicPartition> partitions) {
+        return null;
+    }
+
+    /**
+     * Fetches the last committed offsets of partitions that the consumer currently consumes. This API is only relevant if Kafka based offset
+     * storage is used. This API can be used in conjunction with {@link #seek(Map) seek(offsets)} to rewind consumption of data.  
+     * @param partitions The list of partitions to return the last committed offset for
+     * @return The list of offsets committed on the last {@link #commit(boolean) commit(sync)} 
+     */
+    @Override
+    public Map<TopicPartition, Long> committed(Collection<TopicPartition> partitions) {
+        // TODO Auto-generated method stub
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Fetches offsets before a certain timestamp. Note that the offsets returned are approximately computed and do not correspond to the exact
+     * message at the given timestamp. As such, if the consumer is rewound to offsets returned by this API, there may be duplicate messages 
+     * returned by the consumer. 
+     * @param partitions The list of partitions for which the offsets are returned
+     * @param timestamp The unix timestamp. Value -1 indicates earliest available timestamp. Value -2 indicates latest available timestamp. 
+     * @return The offsets per partition before the specified timestamp.
+     */
+    public Map<TopicPartition, Long> offsetsBeforeTime(long timestamp, Collection<TopicPartition> partitions) {
+        return null;
+    }
+
+    @Override
+    public Map<String, ? extends Metric> metrics() {
+        return Collections.unmodifiableMap(this.metrics.metrics());
+    }
+
+    @Override
+    public void close() {
+        log.trace("Closing the Kafka consumer.");
+        subscribedTopics.clear();
+        subscribedPartitions.clear();
+        this.metrics.close();
+        log.debug("The Kafka consumer has closed.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c24740c7/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
new file mode 100644
index 0000000..c3aad3b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+*/
+package org.apache.kafka.clients.consumer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka.
+ * This class is <i> not threadsafe </i>
+ * <p>
+ * The consumer runs in the user thread and multiplexes I/O over TCP connections to each of the brokers it
+ * needs to communicate with. Failure to close the consumer after use will leak these resources.
+ */
+public class MockConsumer implements Consumer {
+
+    private final Set<TopicPartition> subscribedPartitions;
+    private final Set<String> subscribedTopics;
+    private final Map<TopicPartition, Long> committedOffsets; 
+    private final Map<TopicPartition, Long> consumedOffsets;
+    
+    public MockConsumer() {
+        subscribedPartitions = new HashSet<TopicPartition>();
+        subscribedTopics = new HashSet<String>();
+        committedOffsets = new HashMap<TopicPartition, Long>();
+        consumedOffsets = new HashMap<TopicPartition, Long>();
+    }
+    
+    @Override
+    public void subscribe(String... topics) {
+        if(subscribedPartitions.size() > 0)
+            throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive");
+        for(String topic : topics) {
+            subscribedTopics.add(topic);
+        }
+    }
+
+    @Override
+    public void subscribe(TopicPartition... partitions) {
+        if(subscribedTopics.size() > 0)
+            throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive");
+        for(TopicPartition partition : partitions) {
+            subscribedPartitions.add(partition);
+            consumedOffsets.put(partition, 0L);
+        }
+    }
+
+    public void unsubscribe(String... topics) {
+        // throw an exception if the topic was never subscribed to
+        for(String topic:topics) {
+            if(!subscribedTopics.contains(topic))
+                throw new IllegalStateException("Topic " + topic + " was never subscribed to. subscribe(" + topic + ") should be called prior" +
+                        " to unsubscribe(" + topic + ")");
+            subscribedTopics.remove(topic);
+        }
+    }
+
+    public void unsubscribe(TopicPartition... partitions) {
+        // throw an exception if the partition was never subscribed to
+        for(TopicPartition partition:partitions) {
+            if(!subscribedPartitions.contains(partition))
+                throw new IllegalStateException("Partition " + partition + " was never subscribed to. subscribe(new TopicPartition(" + 
+                                                 partition.topic() + "," + partition.partition() + ") should be called prior" +
+                                                " to unsubscribe(new TopicPartition(" + partition.topic() + "," + partition.partition() + ")");
+            subscribedPartitions.remove(partition);                    
+            committedOffsets.remove(partition);
+            consumedOffsets.remove(partition);
+        }
+    }
+
+    @Override
+    public Map<String, ConsumerRecords> poll(long timeout) {
+        // hand out one dummy record, 1 per topic
+        Map<String, List<ConsumerRecord>> records = new HashMap<String, List<ConsumerRecord>>();
+        Map<String, ConsumerRecords> recordMetadata = new HashMap<String, ConsumerRecords>();
+        for(TopicPartition partition : subscribedPartitions) {
+            // get the last consumed offset
+            long messageSequence = consumedOffsets.get(partition);
+            ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+            ObjectOutputStream outputStream;
+            try {
+                outputStream = new ObjectOutputStream(byteStream);
+                outputStream.writeLong(messageSequence++);
+                outputStream.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+            List<ConsumerRecord> recordsForTopic = records.get(partition.topic());
+            if(recordsForTopic == null) {
+                recordsForTopic = new ArrayList<ConsumerRecord>();
+                records.put(partition.topic(), recordsForTopic);
+            }
+            recordsForTopic.add(new ConsumerRecord(partition.topic(), partition.partition(), null, byteStream.toByteArray(), messageSequence));
+            consumedOffsets.put(partition, messageSequence);
+        }
+        for(Entry<String, List<ConsumerRecord>> recordsPerTopic : records.entrySet()) {
+            Map<Integer, List<ConsumerRecord>> recordsPerPartition = new HashMap<Integer, List<ConsumerRecord>>();
+            for(ConsumerRecord record : recordsPerTopic.getValue()) {
+                List<ConsumerRecord> recordsForThisPartition = recordsPerPartition.get(record.partition());
+                if(recordsForThisPartition == null) {
+                    recordsForThisPartition = new ArrayList<ConsumerRecord>();
+                    recordsPerPartition.put(record.partition(), recordsForThisPartition);
+                }
+                recordsForThisPartition.add(record);
+            }
+            recordMetadata.put(recordsPerTopic.getKey(), new ConsumerRecords(recordsPerTopic.getKey(), recordsPerPartition));
+        }
+        return recordMetadata;
+    }
+
+    @Override
+    public OffsetMetadata commit(Map<TopicPartition, Long> offsets, boolean sync) {
+        if(!sync)
+            return null;
+        for(Entry<TopicPartition, Long> partitionOffset : offsets.entrySet()) {
+            committedOffsets.put(partitionOffset.getKey(), partitionOffset.getValue());            
+        }        
+        return new OffsetMetadata(committedOffsets, null);
+    }
+
+    @Override
+    public OffsetMetadata commit(boolean sync) {
+        if(!sync)
+            return null;
+        return commit(consumedOffsets, sync);
+    }
+
+    @Override
+    public void seek(Map<TopicPartition, Long> offsets) {
+        // change the fetch offsets
+        for(Entry<TopicPartition, Long> partitionOffset : offsets.entrySet()) {
+            consumedOffsets.put(partitionOffset.getKey(), partitionOffset.getValue());            
+        }
+    }
+
+    @Override
+    public Map<TopicPartition, Long> committed(Collection<TopicPartition> partitions) {
+        Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
+        for(TopicPartition partition : partitions) {
+            offsets.put(new TopicPartition(partition.topic(), partition.partition()), committedOffsets.get(partition));
+        }
+        return offsets;
+    }
+
+    @Override
+    public Map<TopicPartition, Long> position(Collection<TopicPartition> partitions) {
+        Map<TopicPartition, Long> positions = new HashMap<TopicPartition, Long>();
+        for(TopicPartition partition : partitions) {
+            positions.put(partition, consumedOffsets.get(partition));
+        }
+        return positions;
+    }
+
+    @Override
+    public Map<TopicPartition, Long> offsetsBeforeTime(long timestamp,
+            Collection<TopicPartition> partitions) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Map<String, ? extends Metric> metrics() {        
+        return null;
+    }
+
+    @Override
+    public void close() {
+       // unsubscribe from all partitions
+        TopicPartition[] allPartitions = new TopicPartition[subscribedPartitions.size()];
+        unsubscribe(subscribedPartitions.toArray(allPartitions));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c24740c7/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java
new file mode 100644
index 0000000..ea423ad
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import java.util.Map;
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * The metadata for an offset commit that has been acknowledged by the server
+ */
+public final class OffsetMetadata {
+
+    private final Map<TopicPartition, Long> offsets;
+    private final Map<TopicPartition, RuntimeException> errors;
+    
+    public OffsetMetadata(Map<TopicPartition, Long> offsets, Map<TopicPartition, RuntimeException> errors) {
+        super();
+        this.offsets = offsets;
+        this.errors = errors;
+    }
+
+    public OffsetMetadata(Map<TopicPartition, Long> offsets) {
+        this(offsets, null);
+    }
+
+    /**
+     * The offset of the record in the topic/partition.
+     */
+    public long offset(TopicPartition partition) {
+        if(this.errors != null)
+            throw errors.get(partition);
+        return offsets.get(partition);
+    }
+
+    /**
+     * @return The exception corresponding to the error code returned by the server
+     */
+    public Exception error(TopicPartition partition) {
+        if(errors != null)
+            return errors.get(partition);
+        else
+            return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c24740c7/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 90cacbd..f1def50 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -13,7 +13,6 @@
 package org.apache.kafka.clients.producer;
 
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -44,6 +43,7 @@ import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.ClientUtils;
 import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
@@ -118,7 +118,7 @@ public class KafkaProducer implements Producer {
                                                  config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG),
                                                  metrics,
                                                  time);
-        List<InetSocketAddress> addresses = parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+        List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
         this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
         this.sender = new Sender(new Selector(this.metrics, time),
                                  this.metadata,
@@ -150,28 +150,6 @@ public class KafkaProducer implements Producer {
         }
     }
 
-    private static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) {
-        List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
-        for (String url : urls) {
-            if (url != null && url.length() > 0) {
-                String[] pieces = url.split(":");
-                if (pieces.length != 2)
-                    throw new ConfigException("Invalid url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
-                try {
-                    InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1]));
-                    if (address.isUnresolved())
-                        throw new ConfigException("DNS resolution failed for metadata bootstrap url: " + url);
-                    addresses.add(address);
-                } catch (NumberFormatException e) {
-                    throw new ConfigException("Invalid port in metadata.broker.list: " + url);
-                }
-            }
-        }
-        if (addresses.size() < 1)
-            throw new ConfigException("No bootstrap urls given in metadata.broker.list.");
-        return addresses;
-    }
-
     /**
      * Asynchronously send a record to a topic. Equivalent to {@link #send(ProducerRecord, Callback) send(record, null)}
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/c24740c7/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
new file mode 100644
index 0000000..cb33e34
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.ConfigException;
+
+public class ClientUtils {
+    public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) {
+        List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
+        for (String url : urls) {
+            if (url != null && url.length() > 0) {
+                String[] pieces = url.split(":");
+                if (pieces.length != 2)
+                    throw new ConfigException("Invalid url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
+                try {
+                    InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1]));
+                    if (address.isUnresolved())
+                        throw new ConfigException("DNS resolution failed for metadata bootstrap url: " + url);
+                    addresses.add(address);
+                } catch (NumberFormatException e) {
+                    throw new ConfigException("Invalid port in metadata.broker.list: " + url);
+                }
+            }
+        }
+        if (addresses.size() < 1)
+            throw new ConfigException("No bootstrap urls given in metadata.broker.list.");
+        return addresses;
+    }
+}
\ No newline at end of file