You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2015/01/30 03:39:32 UTC

[2/7] kafka git commit: KAFKA-1760: New consumer.

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java
deleted file mode 100644
index 29ad25e..0000000
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
-*/
-package org.apache.kafka.clients.consumer;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-
-import org.apache.kafka.common.TopicPartition;
-import org.junit.Test;
-
-/**
- * TODO: Clean this after the consumer implementation is complete. Until then, it is useful to write some sample test code using the new APIs
- *
- */
-public class ConsumerExampleTest {
-    /**
-     * This example demonstrates how to use the consumer to leverage Kafka's group management functionality for automatic consumer load 
-     * balancing and failure detection. This example assumes that the offsets are stored in Kafka and are automatically committed periodically, 
-     * as controlled by the auto.commit.interval.ms config 
-     */
-//    @Test
-//    public void testConsumerGroupManagementWithAutoOffsetCommits() {
-//        Properties props = new Properties();
-//        props.put("metadata.broker.list", "localhost:9092");
-//        props.put("group.id", "test");
-//        props.put("session.timeout.ms", "1000");
-//        props.put("auto.commit.enable", "true");
-//        props.put("auto.commit.interval.ms", "10000");
-//        KafkaConsumer consumer = new KafkaConsumer(props);
-//        // subscribe to some topics
-//        consumer.subscribe("foo", "bar");
-//        boolean isRunning = true;
-//        while(isRunning) {
-//            Map<String, ConsumerRecords> records = consumer.poll(100);
-//            process(records);
-//        }
-//        consumer.close();
-//    }
-
-    /**
-     * This example demonstrates how to use the consumer to leverage Kafka's group management functionality for automatic consumer load 
-     * balancing and failure detection. This example assumes that the offsets are stored in Kafka and are manually committed using the 
-     * commit() API. This example also demonstrates rewinding the consumer's offsets if processing of consumed messages fails.
-     */
-//     @Test
-//     public void testConsumerGroupManagementWithManualOffsetCommit() {
-//         Properties props = new Properties();
-//         props.put("metadata.broker.list", "localhost:9092");
-//         props.put("group.id", "test");
-//         props.put("session.timeout.ms", "1000");
-//         props.put("auto.commit.enable", "false");
-//         KafkaConsumer consumer = new KafkaConsumer(props);
-//         // subscribe to some topics
-//         consumer.subscribe("foo", "bar");
-//         int commitInterval = 100;
-//         int numRecords = 0;
-//         boolean isRunning = true;
-//         Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
-//         while(isRunning) {
-//             Map<String, ConsumerRecords> records = consumer.poll(100);
-//             try {
-//                 Map<TopicPartition, Long> lastConsumedOffsets = process(records);
-//                 consumedOffsets.putAll(lastConsumedOffsets);
-//                 numRecords += records.size();
-//                 // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
-//                 if(numRecords % commitInterval == 0) 
-//                     consumer.commit(true);
-//             } catch(Exception e) {
-//                 // rewind consumer's offsets for failed partitions
-//                 List<TopicPartition> failedPartitions = getFailedPartitions();
-//                 Map<TopicPartition, Long> offsetsToRewindTo = new HashMap<TopicPartition, Long>();
-//                 for(TopicPartition failedPartition : failedPartitions) {
-//                     // rewind to the last consumed offset for the failed partition. Since process() failed for this partition, the consumed offset
-//                     // should still be pointing to the last successfully processed offset and hence is the right offset to rewind consumption to.
-//                     offsetsToRewindTo.put(failedPartition, consumedOffsets.get(failedPartition));
-//                 }
-//                 // seek to new offsets only for partitions that failed the last process()
-//                 consumer.seek(offsetsToRewindTo);
-//             }
-//         }         
-//         consumer.close();
-//     }
-
-     private List<TopicPartition> getFailedPartitions() { return null; }
-
-    /**
-     * This example demonstrates the consumer can be used to leverage Kafka's group management functionality along with custom offset storage. 
-     * In this example, the assumption made is that the user chooses to store the consumer offsets outside Kafka. This requires the user to 
-     * plugin logic for retrieving the offsets from a custom store and provide the offsets to the consumer in the ConsumerRebalanceCallback
-     * callback. The onPartitionsAssigned callback is invoked after the consumer is assigned a new set of partitions on rebalance <i>and</i>
-     * before the consumption restarts post rebalance. This is the right place to supply offsets from a custom store to the consumer.
-     */
-//    @Test
-//    public void testConsumerRebalanceWithCustomOffsetStore() {
-//        Properties props = new Properties();
-//        props.put("metadata.broker.list", "localhost:9092");
-//        props.put("group.id", "test");
-//        props.put("session.timeout.ms", "1000");
-//        props.put("auto.commit.enable", "true");
-//        props.put("auto.commit.interval.ms", "10000");
-//        KafkaConsumer consumer = new KafkaConsumer(props,
-//                                                   new ConsumerRebalanceCallback() {
-//                                                        public void onPartitionsAssigned(Consumer consumer, Collection<TopicPartition> partitions) {
-//                                                           Map<TopicPartition, Long> lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(partitions);
-//                                                           consumer.seek(lastCommittedOffsets);
-//                                                        }
-//                                                        public void onPartitionsRevoked(Consumer consumer, Collection<TopicPartition> partitions) {
-//                                                            Map<TopicPartition, Long> offsets = getLastConsumedOffsets(partitions); // implemented by the user
-//                                                           commitOffsetsToCustomStore(offsets);                                            // implemented by the user
-//                                                        }
-//                                                        private Map<TopicPartition, Long> getLastCommittedOffsetsFromCustomStore(Collection<TopicPartition> partitions) {
-//                                                            return null;
-//                                                        }
-//                                                        private Map<TopicPartition, Long> getLastConsumedOffsets(Collection<TopicPartition> partitions) { return null; }
-//                                                        private void commitOffsetsToCustomStore(Map<TopicPartition, Long> offsets) {}
-//                                                    });
-//        // subscribe to topics
-//        consumer.subscribe("foo", "bar");
-//        int commitInterval = 100;
-//        int numRecords = 0;
-//        boolean isRunning = true;
-//        while(isRunning) {
-//            Map<String, ConsumerRecords> records = consumer.poll(100);
-//            Map<TopicPartition, Long> consumedOffsets = process(records);
-//            numRecords += records.size();
-//            // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
-//            if(numRecords % commitInterval == 0) 
-//                commitOffsetsToCustomStore(consumedOffsets);
-//        }
-//        consumer.close();
-//    }
-
-    /**
-     * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality along with Kafka based offset storage. 
-     * In this example, the assumption made is that the user chooses to use Kafka based offset management. 
-     */
-//    @Test
-//    public void testConsumerRewindWithGroupManagementAndKafkaOffsetStorage() {
-//        Properties props = new Properties();
-//        props.put("metadata.broker.list", "localhost:9092");
-//        props.put("group.id", "test");
-//        props.put("session.timeout.ms", "1000");
-//        props.put("auto.commit.enable", "false");        
-//        KafkaConsumer consumer = new KafkaConsumer(props,
-//                                                   new ConsumerRebalanceCallback() {
-//                                                        boolean rewindOffsets = true;
-//                                                        public void onPartitionsAssigned(Consumer consumer, Collection<TopicPartition> partitions) {
-//                                                            if(rewindOffsets) {
-//                                                            Map<TopicPartition, Long> latestCommittedOffsets = consumer.committed(null);
-//                                                            Map<TopicPartition, Long> newOffsets = rewindOffsets(latestCommittedOffsets, 100);
-//                                                            consumer.seek(newOffsets);
-//                                                            }
-//                                                        }
-//                                                        public void onPartitionsRevoked(Consumer consumer, Collection<TopicPartition> partitions) {
-//                                                            consumer.commit(true);
-//                                                        }
-//                                                        // this API rewinds every partition back by numberOfMessagesToRewindBackTo messages 
-//                                                        private Map<TopicPartition, Long> rewindOffsets(Map<TopicPartition, Long> currentOffsets,
-//                                                                                                        long numberOfMessagesToRewindBackTo) {
-//                                                            Map<TopicPartition, Long> newOffsets = new HashMap<TopicPartition, Long>();
-//                                                            for(Map.Entry<TopicPartition, Long> offset : currentOffsets.entrySet()) {
-//                                                                newOffsets.put(offset.getKey(), offset.getValue() - numberOfMessagesToRewindBackTo);
-//                                                            }
-//                                                            return newOffsets;
-//                                                        }
-//                                                    });
-//        // subscribe to topics
-//        consumer.subscribe("foo", "bar");
-//        int commitInterval = 100;
-//        int numRecords = 0;
-//        boolean isRunning = true;
-//        while(isRunning) {
-//            Map<String, ConsumerRecords> records = consumer.poll(100);
-//            Map<TopicPartition, Long> consumedOffsets = process(records);
-//            numRecords += records.size();
-//            // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
-//            if(numRecords % commitInterval == 0) 
-//                commitOffsetsToCustomStore(consumedOffsets);
-//        }
-//        consumer.close();
-//    }
-
-    /**
-     * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest
-     * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes 
-     * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions.
-     * This example assumes that the user chooses to use Kafka based offset storage. The user still has to specify a group.id to use Kafka 
-     * based offset management. However, session.timeout.ms is not required since the Kafka consumer only does failure detection with group 
-     * management.
-     */
-//    @Test
-//    public void testConsumerWithKafkaBasedOffsetManagement() {
-//        Properties props = new Properties();
-//        props.put("metadata.broker.list", "localhost:9092");
-//        props.put("group.id", "test");
-//        props.put("auto.commit.enable", "true");
-//        props.put("auto.commit.interval.ms", "10000");
-//        KafkaConsumer consumer = new KafkaConsumer(props);
-//        // subscribe to some partitions of topic foo
-//        TopicPartition partition0 = new TopicPartition("foo", 0);
-//        TopicPartition partition1 = new TopicPartition("foo", 1);
-//        TopicPartition[] partitions = new TopicPartition[2];
-//        partitions[0] = partition0;
-//        partitions[1] = partition1;
-//        consumer.subscribe(partitions);
-//        // find the last committed offsets for partitions 0,1 of topic foo
-//        Map<TopicPartition, Long> lastCommittedOffsets = consumer.committed(null);
-//        // seek to the last committed offsets to avoid duplicates
-//        consumer.seek(lastCommittedOffsets);        
-//        // find the offsets of the latest available messages to know where to stop consumption
-//        Map<TopicPartition, Long> latestAvailableOffsets = consumer.offsetsBeforeTime(-2, null);
-//        boolean isRunning = true;
-//        while(isRunning) {
-//            Map<String, ConsumerRecords> records = consumer.poll(100);
-//            Map<TopicPartition, Long> consumedOffsets = process(records);
-//            for(TopicPartition partition : partitions) {
-//                if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
-//                    isRunning = false;
-//                else
-//                    isRunning = true;
-//            }
-//        }
-//        consumer.close();
-//    }
-
-    /**
-     * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest
-     * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes 
-     * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions.
-     * This example assumes that the user chooses to use custom offset storage.
-     */
-    @Test
-    public void testConsumerWithCustomOffsetManagement() {
-//        Properties props = new Properties();
-//        props.put("metadata.broker.list", "localhost:9092");
-//        KafkaConsumer consumer = new KafkaConsumer(props);
-//        // subscribe to some partitions of topic foo
-//        TopicPartition partition0 = new TopicPartition("foo", 0);
-//        TopicPartition partition1 = new TopicPartition("foo", 1);
-//        TopicPartition[] partitions = new TopicPartition[2];
-//        partitions[0] = partition0;
-//        partitions[1] = partition1;
-//        consumer.subscribe(partitions);
-//        Map<TopicPartition, Long> lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore();
-//        // seek to the last committed offsets to avoid duplicates
-//        consumer.seek(lastCommittedOffsets);        
-//        // find the offsets of the latest available messages to know where to stop consumption
-//        Map<TopicPartition, Long> latestAvailableOffsets = consumer.offsetsBeforeTime(-2, null);
-//        boolean isRunning = true;
-//        while(isRunning) {
-//            Map<String, ConsumerRecords> records = consumer.poll(100);
-//            Map<TopicPartition, Long> consumedOffsets = process(records);
-//            // commit offsets for partitions 0,1 for topic foo to custom store
-//            commitOffsetsToCustomStore(consumedOffsets);
-//            for(TopicPartition partition : partitions) {
-//                if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
-//                    isRunning = false;
-//                else
-//                    isRunning = true;
-//            }            
-//        }         
-//        consumer.close();
-    }
-
-    private Map<TopicPartition, Long> getLastCommittedOffsetsFromCustomStore() { return null; }
-    private void commitOffsetsToCustomStore(Map<TopicPartition, Long> consumedOffsets) {}
-    private Map<TopicPartition, Long> process(Map<String, ConsumerRecords> records) {
-        Map<TopicPartition, Long> processedOffsets = new HashMap<TopicPartition, Long>();
-        for(Entry<String, ConsumerRecords> recordMetadata : records.entrySet()) {
-            List<ConsumerRecord> recordsPerTopic = recordMetadata.getValue().records();
-            for(int i = 0;i < recordsPerTopic.size();i++) {
-                ConsumerRecord record = recordsPerTopic.get(i);
-                // process record
-                try {
-                    processedOffsets.put(record.topicAndPartition(), record.offset());
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }                
-            }
-        }
-        return processedOffsets; 
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
new file mode 100644
index 0000000..e51d2df
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -0,0 +1,32 @@
+package org.apache.kafka.clients.consumer;
+
+import static org.junit.Assert.*;
+
+import java.util.Iterator;
+
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+public class MockConsumerTest {
+    
+    private MockConsumer<String, String> consumer = new MockConsumer<String, String>();
+
+    @Test
+    public void testSimpleMock() {
+        consumer.subscribe("topic");
+        assertEquals(0, consumer.poll(1000).count());
+        ConsumerRecord<String, String> rec1 = new ConsumerRecord<String, String>("test", 0, 0, "key1", "value1");
+        ConsumerRecord<String, String> rec2 = new ConsumerRecord<String, String>("test", 0, 1, "key2", "value2");
+        consumer.addRecord(rec1);
+        consumer.addRecord(rec2);
+        ConsumerRecords<String, String> recs = consumer.poll(1);
+        Iterator<ConsumerRecord<String, String>> iter = recs.iterator();
+        assertEquals(rec1, iter.next());
+        assertEquals(rec2, iter.next());
+        assertFalse(iter.hasNext());
+        assertEquals(1L, consumer.position(new TopicPartition("test", 0)));
+        consumer.commit(CommitType.SYNC);
+        assertEquals(1L, consumer.committed(new TopicPartition("test", 0)));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
new file mode 100644
index 0000000..864f1c7
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -0,0 +1,61 @@
+package org.apache.kafka.clients.consumer.internals;
+
+import static org.junit.Assert.*;
+import static java.util.Arrays.asList;
+
+import java.util.Collections;
+
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+public class SubscriptionStateTest {
+    
+    private final SubscriptionState state = new SubscriptionState();
+    private final TopicPartition tp0 = new TopicPartition("test", 0);
+    private final TopicPartition tp1 = new TopicPartition("test", 1);
+
+    @Test
+    public void partitionSubscription() {      
+        state.subscribe(tp0);
+        assertEquals(Collections.singleton(tp0), state.assignedPartitions());
+        state.committed(tp0, 1);
+        state.fetched(tp0, 1);
+        state.consumed(tp0, 1);
+        assertAllPositions(tp0, 1L);
+        state.unsubscribe(tp0);
+        assertTrue(state.assignedPartitions().isEmpty());
+        assertAllPositions(tp0, null);
+    }
+    
+    public void topicSubscription() {
+        state.subscribe("test");
+        assertEquals(1, state.subscribedTopics().size());
+        assertTrue(state.assignedPartitions().isEmpty());
+        assertTrue(state.partitionsAutoAssigned());
+        state.changePartitionAssignment(asList(tp0));
+        state.committed(tp0, 1);
+        state.fetched(tp0, 1);
+        state.consumed(tp0, 1);
+        assertAllPositions(tp0, 1L);
+        state.changePartitionAssignment(asList(tp1));
+        assertAllPositions(tp0, null);
+        assertEquals(Collections.singleton(tp1), state.assignedPartitions());
+    }
+    
+    @Test(expected = IllegalArgumentException.class)
+    public void cantChangeFetchPositionForNonAssignedPartition() {
+        state.fetched(tp0, 1);
+    }
+    
+    @Test(expected = IllegalArgumentException.class)
+    public void cantChangeConsumedPositionForNonAssignedPartition() {
+        state.consumed(tp0, 1);
+    }
+    
+    public void assertAllPositions(TopicPartition tp, Long offset) {
+        assertEquals(offset, state.committed(tp));
+        assertEquals(offset, state.fetched(tp));
+        assertEquals(offset, state.consumed(tp));
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
index 1236803..77b23e7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
@@ -43,7 +43,7 @@ public class BufferPoolTest {
      */
     @Test
     public void testSimple() throws Exception {
-        int totalMemory = 64 * 1024;
+        long totalMemory = 64 * 1024;
         int size = 1024;
         BufferPool pool = new BufferPool(totalMemory, size, false, metrics, time, metricGroup, metricTags);
         ByteBuffer buffer = pool.allocate(size);
@@ -100,7 +100,7 @@ public class BufferPoolTest {
         ByteBuffer buffer = pool.allocate(1024);
         CountDownLatch doDealloc = asyncDeallocate(pool, buffer);
         CountDownLatch allocation = asyncAllocate(pool, 5 * 1024);
-        assertEquals("Allocation shouldn't have happened yet, waiting on memory.", 1, allocation.getCount());
+        assertEquals("Allocation shouldn't have happened yet, waiting on memory.", 1L, allocation.getCount());
         doDealloc.countDown(); // return the memory
         allocation.await();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index 3676b05..d3377ef 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -38,7 +38,7 @@ public class MockProducerTest {
         Future<RecordMetadata> metadata = producer.send(record);
         assertTrue("Send should be immediately complete", metadata.isDone());
         assertFalse("Send should be successful", isError(metadata));
-        assertEquals("Offset should be 0", 0, metadata.get().offset());
+        assertEquals("Offset should be 0", 0L, metadata.get().offset());
         assertEquals(topic, metadata.get().topic());
         assertEquals("We should have the record in our history", asList(record), producer.history());
         producer.clear();

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
index 1d077fd..82d8083 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
@@ -1,30 +1,23 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.clients.producer;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 
 import java.util.List;
 
-
-import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.internals.Partitioner;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
@@ -34,7 +27,6 @@ import org.junit.Test;
 public class PartitionerTest {
 
     private byte[] key = "key".getBytes();
-    private byte[] value = "value".getBytes();
     private Partitioner partitioner = new Partitioner();
     private Node node0 = new Node(0, "localhost", 99);
     private Node node1 = new Node(1, "localhost", 100);
@@ -48,33 +40,28 @@ public class PartitionerTest {
 
     @Test
     public void testUserSuppliedPartitioning() {
-        assertEquals("If the user supplies a partition we should use it.",
-                     0,
-                     partitioner.partition(new ProducerRecord<byte[], byte[]>("test", 0, key, value), cluster));
+        assertEquals("If the user supplies a partition we should use it.", 0, partitioner.partition("test", key, 0, cluster));
     }
 
     @Test
     public void testKeyPartitionIsStable() {
-        int partition = partitioner.partition(new ProducerRecord<byte[], byte[]>("test", key, value), cluster);
-        assertEquals("Same key should yield same partition",
-                     partition,
-                     partitioner.partition(new ProducerRecord<byte[], byte[]>("test", key, "value2".getBytes()), cluster));
+        int partition = partitioner.partition("test", key, null, cluster);
+        assertEquals("Same key should yield same partition", partition, partitioner.partition("test", key, null, cluster));
     }
 
     @Test
     public void testRoundRobinIsStable() {
-        int startPart = partitioner.partition(new ProducerRecord<byte[], byte[]>("test", value), cluster);
+        int startPart = partitioner.partition("test", null, null, cluster);
         for (int i = 1; i <= 100; i++) {
-            int partition = partitioner.partition(new ProducerRecord<byte[], byte[]>("test", value), cluster);
-            assertEquals("Should yield a different partition each call with round-robin partitioner",
-                partition, (startPart + i) % 2);
-      }
+            int partition = partitioner.partition("test", null, null, cluster);
+            assertEquals("Should yield a different partition each call with round-robin partitioner", partition, (startPart + i) % 2);
+        }
     }
 
     @Test
     public void testRoundRobinWithDownNode() {
         for (int i = 0; i < partitions.size(); i++) {
-            int part = partitioner.partition(new ProducerRecord<byte[], byte[]>("test", value), cluster);
+            int part = partitioner.partition("test", null, null, cluster);
             assertTrue("We should never choose a leader-less node in round robin", part >= 0 && part < 2);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
index 66cbdf5..888b929 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
@@ -72,14 +72,14 @@ public class SenderTest {
 
     @Test
     public void testSimple() throws Exception {
-        int offset = 0;
+        long offset = 0;
         Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future;
         sender.run(time.milliseconds()); // connect
         sender.run(time.milliseconds()); // send produce request
         assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
         client.respond(produceResponse(tp.topic(), tp.partition(), offset, Errors.NONE.code()));
         sender.run(time.milliseconds());
-        assertEquals("All requests completed.", offset, client.inFlightRequestCount());
+        assertEquals("All requests completed.", offset, (long) client.inFlightRequestCount());
         sender.run(time.milliseconds());
         assertTrue("Request should be completed", future.isDone());
         assertEquals(offset, future.get().offset());
@@ -110,7 +110,7 @@ public class SenderTest {
         sender.run(time.milliseconds()); // reconnect
         sender.run(time.milliseconds()); // resend
         assertEquals(1, client.inFlightRequestCount());
-        int offset = 0;
+        long offset = 0;
         client.respond(produceResponse(tp.topic(), tp.partition(), offset, Errors.NONE.code()));
         sender.run(time.milliseconds());
         assertTrue("Request should have retried and completed", future.isDone());

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index 3c442a2..16d3fed 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -110,18 +110,18 @@ public class ConfigDefTest {
 
     @Test(expected = ConfigException.class)
     public void testInvalidDefaultRange() {
-        ConfigDef def = new ConfigDef().define("name", Type.INT, -1, Range.between(0,10), Importance.HIGH, "docs");
+        new ConfigDef().define("name", Type.INT, -1, Range.between(0,10), Importance.HIGH, "docs");
     }
 
     @Test(expected = ConfigException.class)
     public void testInvalidDefaultString() {
-        ConfigDef def = new ConfigDef().define("name", Type.STRING, "bad", ValidString.in(Arrays.asList("valid", "values")), Importance.HIGH, "docs");
+        new ConfigDef().define("name", Type.STRING, "bad", ValidString.in("valid", "values"), Importance.HIGH, "docs");
     }
 
     @Test
     public void testValidators() {
         testValidators(Type.INT, Range.between(0,10), 5, new Object[]{1, 5, 9}, new Object[]{-1, 11});
-        testValidators(Type.STRING, ValidString.in(Arrays.asList("good", "values", "default")), "default",
+        testValidators(Type.STRING, ValidString.in("good", "values", "default"), "default",
                 new Object[]{"good", "values", "default"}, new Object[]{"bad", "inputs"});
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 74c1957..a14659a 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -12,7 +12,6 @@
  */
 package org.apache.kafka.common.network;
 
-import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -74,7 +73,7 @@ public class SelectorTest {
         // disconnect
         this.server.closeConnections();
         while (!selector.disconnected().contains(node))
-            selector.poll(1000L, EMPTY);
+            selector.poll(1000L);
 
         // reconnect and do another request
         blockingConnect(node);
@@ -89,7 +88,8 @@ public class SelectorTest {
         int node = 0;
         blockingConnect(node);
         selector.disconnect(node);
-        selector.poll(10, asList(createSend(node, "hello1")));
+        selector.send(createSend(node, "hello1"));
+        selector.poll(10);
         assertEquals("Request should not have succeeded", 0, selector.completedSends().size());
         assertEquals("There should be a disconnect", 1, selector.disconnected().size());
         assertTrue("The disconnect should be from our node", selector.disconnected().contains(node));
@@ -104,7 +104,9 @@ public class SelectorTest {
     public void testCantSendWithInProgress() throws Exception {
         int node = 0;
         blockingConnect(node);
-        selector.poll(1000L, asList(createSend(node, "test1"), createSend(node, "test2")));
+        selector.send(createSend(node, "test1"));
+        selector.send(createSend(node, "test2"));
+        selector.poll(1000L);
     }
 
     /**
@@ -112,7 +114,8 @@ public class SelectorTest {
      */
     @Test(expected = IllegalStateException.class)
     public void testCantSendWithoutConnecting() throws Exception {
-        selector.poll(1000L, asList(createSend(0, "test")));
+        selector.send(createSend(0, "test"));
+        selector.poll(1000L);
     }
 
     /**
@@ -131,7 +134,7 @@ public class SelectorTest {
         int node = 0;
         selector.connect(node, new InetSocketAddress("localhost", TestUtils.choosePort()), BUFFER_SIZE, BUFFER_SIZE);
         while (selector.disconnected().contains(node))
-            selector.poll(1000L, EMPTY);
+            selector.poll(1000L);
     }
 
     /**
@@ -152,14 +155,13 @@ public class SelectorTest {
         int[] requests = new int[conns];
         int[] responses = new int[conns];
         int responseCount = 0;
-        List<NetworkSend> sends = new ArrayList<NetworkSend>();
         for (int i = 0; i < conns; i++)
-            sends.add(createSend(i, i + "-" + 0));
+            selector.send(createSend(i, i + "-" + 0));
 
         // loop until we complete all requests
         while (responseCount < conns * reqs) {
             // do the i/o
-            selector.poll(0L, sends);
+            selector.poll(0L);
 
             assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size());
 
@@ -175,12 +177,11 @@ public class SelectorTest {
             }
 
             // prepare new sends for the next round
-            sends.clear();
             for (NetworkSend send : selector.completedSends()) {
                 int dest = send.destination();
                 requests[dest]++;
                 if (requests[dest] < reqs)
-                    sends.add(createSend(dest, dest + "-" + requests[dest]));
+                    selector.send(createSend(dest, dest + "-" + requests[dest]));
             }
         }
     }
@@ -212,10 +213,34 @@ public class SelectorTest {
         blockingConnect(0);
     }
 
+    @Test
+    public void testMute() throws Exception {
+        blockingConnect(0);
+        blockingConnect(1);
+
+        selector.send(createSend(0, "hello"));
+        selector.send(createSend(1, "hi"));
+
+        selector.mute(1);
+
+        while (selector.completedReceives().isEmpty())
+            selector.poll(5);
+        assertEquals("We should have only one response", 1, selector.completedReceives().size());
+        assertEquals("The response should not be from the muted node", 0, selector.completedReceives().get(0).source());
+
+        selector.unmute(1);
+        do {
+            selector.poll(5);
+        } while (selector.completedReceives().isEmpty());
+        assertEquals("We should have only one response", 1, selector.completedReceives().size());
+        assertEquals("The response should be from the previously muted node", 1, selector.completedReceives().get(0).source());
+    }
+
     private String blockingRequest(int node, String s) throws IOException {
-        selector.poll(1000L, asList(createSend(node, s)));
+        selector.send(createSend(node, s));
+        selector.poll(1000L);
         while (true) {
-            selector.poll(1000L, EMPTY);
+            selector.poll(1000L);
             for (NetworkReceive receive : selector.completedReceives())
                 if (receive.source() == node)
                     return asString(receive);
@@ -226,7 +251,7 @@ public class SelectorTest {
     private void blockingConnect(int node) throws IOException {
         selector.connect(node, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE);
         while (!selector.connected().contains(node))
-            selector.poll(10000L, EMPTY);
+            selector.poll(10000L);
     }
 
     private NetworkSend createSend(int node, String s) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index a39fab5..4c2ea34 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.common.utils;
 
+import java.util.Arrays;
+import java.util.Collections;
+
 import org.junit.Test;
 
 import static org.apache.kafka.common.utils.Utils.getHost;
@@ -48,4 +51,11 @@ public class UtilsTest {
         assertEquals("[::1]:1234", formatAddress("::1", 1234));
         assertEquals("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678", formatAddress("2001:db8:85a3:8d3:1319:8a2e:370:7348", 5678));
     }
+    
+    @Test
+    public void testJoin() {
+        assertEquals("", Utils.join(Collections.emptyList(), ","));
+        assertEquals("1", Utils.join(Arrays.asList("1"), ","));
+        assertEquals("1,2,3", Utils.join(Arrays.asList(1, 2, 3), ","));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/test/java/org/apache/kafka/test/MockSelector.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
index d61de52..ea89b06 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.test;
 
@@ -26,13 +22,13 @@ import org.apache.kafka.common.network.NetworkSend;
 import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.utils.Time;
 
-
 /**
  * A fake selector to use for testing
  */
 public class MockSelector implements Selectable {
 
     private final Time time;
+    private final List<NetworkSend> initiatedSends = new ArrayList<NetworkSend>();
     private final List<NetworkSend> completedSends = new ArrayList<NetworkSend>();
     private final List<NetworkReceive> completedReceives = new ArrayList<NetworkReceive>();
     private final List<Integer> disconnected = new ArrayList<Integer>();
@@ -68,8 +64,14 @@ public class MockSelector implements Selectable {
     }
 
     @Override
-    public void poll(long timeout, List<NetworkSend> sends) throws IOException {
-        this.completedSends.addAll(sends);
+    public void send(NetworkSend send) {
+        this.initiatedSends.add(send);
+    }
+
+    @Override
+    public void poll(long timeout) throws IOException {
+        this.completedSends.addAll(this.initiatedSends);
+        this.initiatedSends.clear();
         time.sleep(timeout);
     }
 
@@ -101,4 +103,20 @@ public class MockSelector implements Selectable {
         return connected;
     }
 
+    @Override
+    public void mute(int id) {
+    }
+
+    @Override
+    public void unmute(int id) {
+    }
+
+    @Override
+    public void muteAll() {
+    }
+
+    @Override
+    public void unmuteAll() {
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
index 6d00ed0..a3b1b78 100644
--- a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
@@ -63,7 +63,7 @@ case class ConsumerMetadataRequest(group: String,
 
   override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
     // return ConsumerCoordinatorNotAvailable for all uncaught errors
-    val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode)
+    val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, correlationId)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
index 84f6017..24aaf95 100644
--- a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
+++ b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
@@ -40,7 +40,7 @@ object ConsumerMetadataResponse {
   
 }
 
-case class ConsumerMetadataResponse (coordinatorOpt: Option[Broker], errorCode: Short, correlationId: Int = 0)
+case class ConsumerMetadataResponse (coordinatorOpt: Option[Broker], errorCode: Short, correlationId: Int)
   extends RequestOrResponse() {
 
   def sizeInBytes =

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index b230e9a..e6ad8be 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -90,7 +90,7 @@ class Partition(val topic: String,
           val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
           val offsetMap = checkpoint.read
           if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
-            warn("No checkpointed highwatermark is found for partition [%s,%d]".format(topic, partitionId))
+            info("No checkpointed highwatermark is found for partition [%s,%d]".format(topic, partitionId))
           val offset = offsetMap.getOrElse(TopicAndPartition(topic, partitionId), 0L).min(log.logEndOffset)
           val localReplica = new Replica(replicaId, this, time, offset, Some(log))
           addReplicaIfNotExists(localReplica)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index fbef34c..14b22ab 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -141,7 +141,7 @@ class RequestSendThread(val controllerId: Int,
               connectToBroker(toBroker, channel)
               isSendSuccessful = false
               // backoff before retrying the connection and send
-              Utils.swallow(Thread.sleep(300))
+              Utils.swallowTrace(Thread.sleep(300))
           }
         }
         if (receive != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 4631bc7..8b67aee 100644
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -179,12 +179,12 @@ object LogConfig {
       .define(FileDeleteDelayMsProp, LONG, Defaults.FileDeleteDelayMs, atLeast(0), MEDIUM, FileDeleteDelayMsDoc)
       .define(MinCleanableDirtyRatioProp, DOUBLE, Defaults.MinCleanableDirtyRatio, between(0, 1), MEDIUM,
         MinCleanableRatioDoc)
-      .define(CleanupPolicyProp, STRING, if (Defaults.Compact) Compact else Delete, in(asList(Compact, Delete)), MEDIUM,
+      .define(CleanupPolicyProp, STRING, if (Defaults.Compact) Compact else Delete, in(Compact, Delete), MEDIUM,
         CompactDoc)
       .define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable,
         MEDIUM, UncleanLeaderElectionEnableDoc)
       .define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), MEDIUM, MinInSyncReplicasDoc)
-      .define(CompressionTypeProp, STRING, Defaults.CompressionType, in(seqAsJavaList(BrokerCompressionCodec.brokerCompressionOptions)), MEDIUM, CompressionTypeDoc)
+      .define(CompressionTypeProp, STRING, Defaults.CompressionType, in(BrokerCompressionCodec.brokerCompressionOptions:_*), MEDIUM, CompressionTypeDoc)
   }
 
   def configNames() = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index ec8d9f7..48bc435 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,6 +17,12 @@
 
 package kafka.server
 
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.JoinGroupResponse
+import org.apache.kafka.common.requests.HeartbeatResponse
+import org.apache.kafka.common.requests.ResponseHeader
+import org.apache.kafka.common.protocol.types.Struct
+
 import kafka.api._
 import kafka.common._
 import kafka.log._
@@ -26,6 +32,9 @@ import kafka.network.RequestChannel.Response
 import kafka.controller.KafkaController
 import kafka.utils.{SystemTime, Logging}
 
+import java.nio.ByteBuffer
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic._
 import scala.collection._
 
 import org.I0Itec.zkclient.ZkClient
@@ -43,6 +52,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
   val metadataCache = new MetadataCache
+  private var consumerGroupGenerationId = 0
 
   /**
    * Top-level method that handles all requests and multiplexes to the right api
@@ -62,6 +72,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
         case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
         case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request)
+        case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request)
+        case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request)
         case requestId => throw new KafkaException("Unknown api code " + requestId)
       }
     } catch {
@@ -442,6 +454,23 @@ class KafkaApis(val requestChannel: RequestChannel,
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 
+  def handleJoinGroupRequest(request: RequestChannel.Request) {
+    val joinGroupReq = request.requestObj.asInstanceOf[JoinGroupRequestAndHeader]
+    val topics = JavaConversions.asScalaIterable(joinGroupReq.body.topics()).toSet
+    val partitions = this.replicaManager.logManager.allLogs.filter(log => topics.contains(log.topicAndPartition.topic))
+    val partitionList = partitions.map(_.topicAndPartition).map(tp => new org.apache.kafka.common.TopicPartition(tp.topic, tp.partition)).toBuffer
+    this.consumerGroupGenerationId += 1
+    val response = new JoinGroupResponse(ErrorMapping.NoError, this.consumerGroupGenerationId, joinGroupReq.body.consumerId, JavaConversions.asJavaList(partitionList))
+    val send = new BoundedByteBufferSend(new JoinGroupResponseAndHeader(joinGroupReq.correlationId, response))
+    requestChannel.sendResponse(new RequestChannel.Response(request, send))
+  }
+  
+  def handleHeartbeatRequest(request: RequestChannel.Request) {
+    val hbReq = request.requestObj.asInstanceOf[HeartbeatRequestAndHeader]
+    val send = new BoundedByteBufferSend(new HeartbeatResponseAndHeader(hbReq.correlationId, new HeartbeatResponse(Errors.NONE.code)))
+    requestChannel.sendResponse(new RequestChannel.Response(request, send))
+  }
+  
   def close() {
     // TODO currently closing the API is an no-op since the API no longer maintain any modules
     // maybe removing the closing call in the end when KafkaAPI becomes a pure stateless layer

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index e58fbb9..fb948b9 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -327,7 +327,7 @@ class ReplicaManager(val config: KafkaConfig,
           BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages)
 
           trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"
-            .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset))
+            .format(messages.sizeInBytes, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset))
           (topicAndPartition, LogAppendResult(info))
         } catch {
           // NOTE: Failed produce requests metric is not incremented for known exceptions

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index e455cb9..910691e 100644
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -113,8 +113,6 @@ object ConsoleConsumer extends Logging {
       KafkaMetricsReporter.startReporters(verifiableProps)
     }
 
-
-
     val consumerProps = if (options.has(consumerConfigOpt))
       Utils.loadProps(options.valueOf(consumerConfigOpt))
     else

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 093c800..c39c067 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -17,14 +17,21 @@
 
 package kafka.tools
 
-import java.util.concurrent.CountDownLatch
+import scala.collection.JavaConversions._
 import java.util.concurrent.atomic.AtomicLong
 import java.nio.channels.ClosedByInterruptException
 import org.apache.log4j.Logger
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.record.Record
+import org.apache.kafka.common.record.Records
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import kafka.message.Message
 import kafka.utils.{ZkUtils, CommandLineUtils}
 import java.util.{ Random, Properties }
-import kafka.consumer._
+import kafka.consumer.Consumer
+import kafka.consumer.ConsumerConnector
+import kafka.consumer.KafkaStream
+import kafka.consumer.ConsumerTimeoutException
 import java.text.SimpleDateFormat
 
 /**
@@ -42,50 +49,98 @@ object ConsumerPerformance {
 
     if (!config.hideHeader) {
       if (!config.showDetailedStats)
-        println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
+        println("start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
       else
-        println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
+        println("time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
     }
 
-    // clean up zookeeper state for this group id for every perf run
-    ZkUtils.maybeDeletePath(config.consumerConfig.zkConnect, "/consumers/" + config.consumerConfig.groupId)
-
-    val consumerConnector: ConsumerConnector = Consumer.create(config.consumerConfig)
-
-    val topicMessageStreams = consumerConnector.createMessageStreams(Map(config.topic -> config.numThreads))
-    var threadList = List[ConsumerPerfThread]()
-    for ((topic, streamList) <- topicMessageStreams)
-      for (i <- 0 until streamList.length)
-        threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config,
-          totalMessagesRead, totalBytesRead)
-
-    logger.info("Sleeping for 1 second.")
-    Thread.sleep(1000)
-    logger.info("starting threads")
-    val startMs = System.currentTimeMillis
-    for (thread <- threadList)
-      thread.start
-
-    for (thread <- threadList)
-      thread.join
-
-    val endMs = System.currentTimeMillis
-    val elapsedSecs = (endMs - startMs - config.consumerConfig.consumerTimeoutMs) / 1000.0
+    var startMs, endMs = 0L
+    if(config.useNewConsumer) {
+      val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props)
+      consumer.subscribe(config.topic)
+      startMs = System.currentTimeMillis
+      consume(consumer, config.numMessages, 1000, config, totalMessagesRead, totalBytesRead)
+      endMs = System.currentTimeMillis
+    } else {
+      import kafka.consumer.ConsumerConfig
+      val consumerConfig = new ConsumerConfig(config.props)
+      val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
+      val topicMessageStreams = consumerConnector.createMessageStreams(Map(config.topic -> config.numThreads))
+      var threadList = List[ConsumerPerfThread]()
+      for ((topic, streamList) <- topicMessageStreams)
+        for (i <- 0 until streamList.length)
+          threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config, totalMessagesRead, totalBytesRead)
+
+      logger.info("Sleeping for 1 second.")
+      Thread.sleep(1000)
+      logger.info("starting threads")
+      startMs = System.currentTimeMillis
+      for (thread <- threadList)
+        thread.start
+      for (thread <- threadList)
+        thread.join
+      endMs = System.currentTimeMillis - consumerConfig.consumerTimeoutMs 
+    }
+    val elapsedSecs = (endMs - startMs) / 1000.0
     if (!config.showDetailedStats) {
       val totalMBRead = (totalBytesRead.get * 1.0) / (1024 * 1024)
-      println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), config.dateFormat.format(endMs),
-        config.consumerConfig.fetchMessageMaxBytes, totalMBRead, totalMBRead / elapsedSecs, totalMessagesRead.get,
-        totalMessagesRead.get / elapsedSecs))
+      println(("%s, %s, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), config.dateFormat.format(endMs),
+        totalMBRead, totalMBRead / elapsedSecs, totalMessagesRead.get, totalMessagesRead.get / elapsedSecs))
     }
     System.exit(0)
   }
+  
+  def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], count: Long, timeout: Long, config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) {
+    var bytesRead = 0L
+    var messagesRead = 0L
+    val startMs = System.currentTimeMillis
+    var lastReportTime: Long = startMs
+    var lastBytesRead = 0L
+    var lastMessagesRead = 0L
+    var lastConsumed = System.currentTimeMillis
+    while(messagesRead < count && lastConsumed >= System.currentTimeMillis - timeout) {
+      val records = consumer.poll(100)
+      if(records.count() > 0)
+        lastConsumed = System.currentTimeMillis
+      for(record <- records) {
+        messagesRead += 1
+        if(record.key != null)
+          bytesRead += record.key.size
+        if(record.value != null)
+          bytesRead += record.value.size 
+      
+        if (messagesRead % config.reportingInterval == 0) {
+          if (config.showDetailedStats)
+            printProgressMessage(0, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, System.currentTimeMillis, config.dateFormat)
+          lastReportTime = System.currentTimeMillis
+          lastMessagesRead = messagesRead
+          lastBytesRead = bytesRead
+        }
+      }
+    }
+    totalMessagesRead.set(messagesRead)
+    totalBytesRead.set(bytesRead)
+  }
+  
+  def printProgressMessage(id: Int, bytesRead: Long, lastBytesRead: Long, messagesRead: Long, lastMessagesRead: Long,
+    startMs: Long, endMs: Long, dateFormat: SimpleDateFormat) = {
+    val elapsedMs: Double = endMs - startMs
+    val totalMBRead = (bytesRead * 1.0) / (1024 * 1024)
+    val mbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024)
+    println(("%s, %d, %.4f, %.4f, %d, %.4f").format(dateFormat.format(endMs), id, totalMBRead,
+        1000.0 * (mbRead / elapsedMs), messagesRead, ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0))
+  }
 
   class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) {
-    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
-      "Multiple URLS can be given to allow fail-over.")
+    val zkConnectOpt = parser.accepts("zookeeper", "The connection string for the zookeeper connection in the form host:port. " +
+      "Multiple URLS can be given to allow fail-over. This option is only used with the old consumer.")
       .withRequiredArg
       .describedAs("urls")
       .ofType(classOf[String])
+    val bootstrapServersOpt = parser.accepts("broker-list", "A broker list to use for connecting if using the new consumer.")
+      .withRequiredArg()
+      .describedAs("host")
+      .ofType(classOf[String])
     val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
       .withRequiredArg
       .describedAs("topic")
@@ -117,20 +172,35 @@ object ConsumerPerformance {
       .describedAs("count")
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(1)
+    val useNewConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation.")
 
     val options = parser.parse(args: _*)
 
-    CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, zkConnectOpt)
-
+    CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
+   
+    val useNewConsumer = options.has(useNewConsumerOpt)
+    
     val props = new Properties
-    props.put("group.id", options.valueOf(groupIdOpt))
-    props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString)
-    props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString)
-    props.put("auto.offset.reset", if (options.has(resetBeginningOffsetOpt)) "largest" else "smallest")
-    props.put("zookeeper.connect", options.valueOf(zkConnectOpt))
-    props.put("consumer.timeout.ms", "5000")
-    props.put("num.consumer.fetchers", options.valueOf(numFetchersOpt).toString)
-    val consumerConfig = new ConsumerConfig(props)
+    if(useNewConsumer) {
+      import org.apache.kafka.clients.consumer.ConsumerConfig
+      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServersOpt))
+      props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt))
+      props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, options.valueOf(socketBufferSizeOpt).toString)
+      props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, options.valueOf(fetchSizeOpt).toString)
+      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (options.has(resetBeginningOffsetOpt)) "latest" else "earliest")
+      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer])
+      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer])
+      props.put(ConsumerConfig.CHECK_CRCS_CONFIG, "false")
+    } else {
+      CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
+      props.put("group.id", options.valueOf(groupIdOpt))
+      props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString)
+      props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString)
+      props.put("auto.offset.reset", if (options.has(resetBeginningOffsetOpt)) "largest" else "smallest")
+      props.put("zookeeper.connect", options.valueOf(zkConnectOpt))
+      props.put("consumer.timeout.ms", "1000")
+      props.put("num.consumer.fetchers", options.valueOf(numFetchersOpt).toString)
+    }
     val numThreads = options.valueOf(numThreadsOpt).intValue
     val topic = options.valueOf(topicOpt)
     val numMessages = options.valueOf(numMessagesOpt).longValue
@@ -161,7 +231,7 @@ object ConsumerPerformance {
 
           if (messagesRead % config.reportingInterval == 0) {
             if (config.showDetailedStats)
-              printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, System.currentTimeMillis)
+              printProgressMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, System.currentTimeMillis, config.dateFormat)
             lastReportTime = System.currentTimeMillis
             lastMessagesRead = messagesRead
             lastBytesRead = bytesRead
@@ -176,18 +246,9 @@ object ConsumerPerformance {
       totalMessagesRead.addAndGet(messagesRead)
       totalBytesRead.addAndGet(bytesRead)
       if (config.showDetailedStats)
-        printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, System.currentTimeMillis)
+        printProgressMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, System.currentTimeMillis, config.dateFormat)
     }
 
-    private def printMessage(id: Int, bytesRead: Long, lastBytesRead: Long, messagesRead: Long, lastMessagesRead: Long,
-      startMs: Long, endMs: Long) = {
-      val elapsedMs = endMs - startMs
-      val totalMBRead = (bytesRead * 1.0) / (1024 * 1024)
-      val mbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024)
-      println(("%s, %d, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(endMs), id,
-        config.consumerConfig.fetchMessageMaxBytes, totalMBRead,
-        1000.0 * (mbRead / elapsedMs), messagesRead, ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0))
-    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
index 7602b8d..900f7df 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
@@ -79,7 +79,7 @@ object SimpleConsumerPerformance {
         done = true
       else
         // we only did one fetch so we find the offset for the first (head) messageset
-        offset += messageSet.validBytes
+        offset = messageSet.last.nextOffset
 
       totalBytesRead += bytesRead
       totalMessagesRead += messagesRead

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/main/scala/kafka/utils/KafkaScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
index 9a16343..7ceadcc 100644
--- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -41,6 +41,11 @@ trait Scheduler {
   def shutdown()
   
   /**
+   * Check if the scheduler has been started
+   */
+  def isStarted: Boolean
+  
+  /**
    * Schedule a task
    * @param name The name of this task
    * @param delay The amount of time to wait before the first execution
@@ -63,13 +68,13 @@ trait Scheduler {
 class KafkaScheduler(val threads: Int, 
                      val threadNamePrefix: String = "kafka-scheduler-", 
                      daemon: Boolean = true) extends Scheduler with Logging {
-  @volatile private var executor: ScheduledThreadPoolExecutor = null
+  private var executor: ScheduledThreadPoolExecutor = null
   private val schedulerThreadId = new AtomicInteger(0)
   
   override def startup() {
     debug("Initializing task scheduler.")
     this synchronized {
-      if(executor != null)
+      if(isStarted)
         throw new IllegalStateException("This scheduler has already been started!")
       executor = new ScheduledThreadPoolExecutor(threads)
       executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
@@ -83,34 +88,45 @@ class KafkaScheduler(val threads: Int,
   
   override def shutdown() {
     debug("Shutting down task scheduler.")
-    ensureStarted
-    executor.shutdown()
-    executor.awaitTermination(1, TimeUnit.DAYS)
-    this.executor = null
+    this synchronized {
+      if(isStarted) {
+        executor.shutdown()
+        executor.awaitTermination(1, TimeUnit.DAYS)
+        this.executor = null
+      }
+    }
   }
 
   def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit) = {
     debug("Scheduling task %s with initial delay %d ms and period %d ms."
         .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit)))
-    ensureStarted
-    val runnable = Utils.runnable {
-      try {
-        trace("Begining execution of scheduled task '%s'.".format(name))
-        fun()
-      } catch {
-        case t: Throwable => error("Uncaught exception in scheduled task '" + name +"'", t)
-      } finally {
-        trace("Completed execution of scheduled task '%s'.".format(name))
+    this synchronized {
+      ensureStarted
+      val runnable = Utils.runnable {
+        try {
+          trace("Begining execution of scheduled task '%s'.".format(name))
+          fun()
+        } catch {
+          case t: Throwable => error("Uncaught exception in scheduled task '" + name +"'", t)
+        } finally {
+          trace("Completed execution of scheduled task '%s'.".format(name))
+        }
       }
+      if(period >= 0)
+        executor.scheduleAtFixedRate(runnable, delay, period, unit)
+      else
+        executor.schedule(runnable, delay, unit)
+    }
+  }
+  
+  def isStarted: Boolean = {
+    this synchronized {
+      executor != null
     }
-    if(period >= 0)
-      executor.scheduleAtFixedRate(runnable, delay, period, unit)
-    else
-      executor.schedule(runnable, delay, unit)
   }
   
   private def ensureStarted = {
-    if(executor == null)
+    if(!isStarted)
       throw new IllegalStateException("Kafka scheduler has not been started")
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
new file mode 100644
index 0000000..798f035
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package kafka.api
+
+import org.apache.kafka.common.KafkaException
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.clients.consumer.ConsumerRebalanceCallback
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.consumer.CommitType
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException
+import scala.collection.mutable.Buffer
+import scala.collection.JavaConversions._
+import java.util.ArrayList
+import java.util.Arrays
+import org.junit.Assert._
+import kafka.utils.TestUtils
+import kafka.utils.Logging
+import kafka.server.OffsetManager
+
+/**
+ * Integration tests for the new consumer that cover basic usage as well as server failures
+ */
+class ConsumerTest extends IntegrationTestHarness with Logging {
+
+  val producerCount = 1
+  val consumerCount = 2
+  val serverCount = 3
+
+  val topic = "topic"
+  val part = 0
+  val tp = new TopicPartition(topic, part)
+
+  // configure the servers and clients
+  this.serverConfig.setProperty("controlled.shutdown.enable", "false") // speed up shutdown
+  this.serverConfig.setProperty("offsets.topic.replication.factor", "3") // don't want to lose offset
+  this.serverConfig.setProperty("offsets.topic.num.partitions", "1")
+  this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
+  this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
+  this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
+  this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+  
+  override def setUp() {
+    super.setUp()
+    // this will trigger the creation of the consumer offsets topic
+    this.consumers(0).partitionsFor(OffsetManager.OffsetsTopicName)
+  }
+
+  def testSimpleConsumption() {
+    val numRecords = 10000
+    sendRecords(numRecords)
+
+    assertEquals(0, this.consumers(0).subscriptions.size)
+    this.consumers(0).subscribe(tp)
+    assertEquals(1, this.consumers(0).subscriptions.size)
+    
+    this.consumers(0).seek(tp, 0)
+    consumeRecords(this.consumers(0), numRecords = numRecords, startingOffset = 0)
+  }
+
+  def testAutoOffsetReset() {
+    sendRecords(1)
+    this.consumers(0).subscribe(tp)
+    consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
+  }
+
+  def testSeek() {
+    val consumer = this.consumers(0)
+    val totalRecords = 50L
+    sendRecords(totalRecords.toInt)
+    consumer.subscribe(tp)
+
+    consumer.seekToEnd(tp)
+    assertEquals(totalRecords, consumer.position(tp))
+    assertFalse(consumer.poll(totalRecords).iterator().hasNext())
+
+    consumer.seekToBeginning(tp)
+    assertEquals(0, consumer.position(tp), 0)
+    consumeRecords(consumer, numRecords = 1, startingOffset = 0)
+
+    val mid = totalRecords / 2
+    consumer.seek(tp, mid)
+    assertEquals(mid, consumer.position(tp))
+    consumeRecords(consumer, numRecords = 1, startingOffset = mid.toInt)
+  }
+
+  def testGroupConsumption() {
+    // we need to do this test with only one server since we have the hack join group 
+    // that just assigns the partition hosted on the local machine (with two we might get the wrong machine
+    this.servers.last.shutdown()
+    this.servers.head.shutdown()
+    sendRecords(10)
+    this.consumers(0).subscribe(topic)
+    consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
+  }
+
+  def testPositionAndCommit() {
+    sendRecords(5)
+
+    // committed() on a partition with no committed offset throws an exception
+    intercept[NoOffsetForPartitionException] {
+      this.consumers(0).committed(new TopicPartition(topic, 15))
+    }
+
+    // position() on a partition that we aren't subscribed to throws an exception
+    intercept[IllegalArgumentException] {
+      this.consumers(0).position(new TopicPartition(topic, 15))
+    }
+
+    this.consumers(0).subscribe(tp)
+
+    assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers(0).position(tp))
+    this.consumers(0).commit(CommitType.SYNC)
+    assertEquals(0L, this.consumers(0).committed(tp))
+
+    consumeRecords(this.consumers(0), 5, 0)
+    assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers(0).position(tp))
+    this.consumers(0).commit(CommitType.SYNC)
+    assertEquals("Committed offset should be returned", 5L, this.consumers(0).committed(tp));
+
+    sendRecords(1)
+
+    // another consumer in the same group should get the same position
+    this.consumers(1).subscribe(tp)
+    consumeRecords(this.consumers(1), 1, 5)
+  }
+
+  def testPartitionsFor() {
+    val numParts = 2;
+    TestUtils.createTopic(this.zkClient, topic, numParts, 1, this.servers)
+    val parts = this.consumers(0).partitionsFor(topic)
+    assertNotNull(parts)
+    assertEquals(2, parts.length)
+    assertNull(this.consumers(0).partitionsFor("non-existant-topic"))
+  }
+
+  def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(numRecords = 1000)
+
+  /*
+   * 1. Produce a bunch of messages
+   * 2. Then consume the messages while killing and restarting brokers at random
+   */
+  def consumeWithBrokerFailures(numRecords: Int) {
+    TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers)
+    sendRecords(numRecords)
+    this.producers.map(_.close)
+    var consumed = 0
+    val consumer = this.consumers(0)
+    consumer.subscribe(topic)
+    while (consumed < numRecords) {
+      // check that we are getting the messages in order
+      for (record <- consumer.poll(200)) {
+        assertEquals(consumed.toLong, record.offset())
+        consumed += 1
+      }
+      consumer.commit(CommitType.SYNC);
+
+      /* restart any dead brokers, and kill a broker (with probability 1/3) */
+      restartDeadBrokers()
+      if (TestUtils.random.nextInt(3) == 0) {
+        info("Killing broker")
+        killRandomBroker()
+      }
+    }
+  }
+
+  def testSeekAndCommitWithBrokerFailures() = seekAndCommitWithBrokerFailures(20)
+
+  def seekAndCommitWithBrokerFailures(numIters: Int) {
+    // create a topic and send it some data
+    val numRecords = 1000
+    TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers)
+    sendRecords(numRecords)
+    this.producers.map(_.close)
+
+    val consumer = this.consumers(0)
+    consumer.subscribe(tp)
+    consumer.seek(tp, 0)
+    for (iter <- 0 until numIters) {
+      val coin = TestUtils.random.nextInt(4)
+      if (coin == 0) {
+        info("Seeking to end of log")
+        consumer.seekToEnd()
+        assertEquals(1000.toLong, consumer.position(tp))
+      } else if (coin == 1) {
+        val pos = TestUtils.random.nextInt(numRecords).toLong
+        info("Seeking to " + pos)
+        consumer.seek(tp, pos)
+        assertEquals(pos, consumer.position(tp))
+      } else if (coin == 2) {
+        info("Committing offset.")
+        consumer.commit(CommitType.SYNC)
+        assertEquals(consumer.position(tp), consumer.committed(tp))
+      } else {
+        restartDeadBrokers()
+        killRandomBroker()
+      }
+    }
+  }
+  
+  def testPartitionReassignmentCallback() {
+    val callback = new TestConsumerReassignmentCallback()
+    this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "200"); // timeout quickly to avoid slow test
+    val consumer0 = new KafkaConsumer(this.consumerConfig, callback, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+    consumer0.subscribe("test")
+        
+    // the initial subscription should cause a callback execution
+    while(callback.callsToAssigned == 0)
+      consumer0.poll(50)
+    
+    // get metadata for the topic
+    var parts = consumer0.partitionsFor(OffsetManager.OffsetsTopicName)
+    while(parts == null)
+      parts = consumer0.partitionsFor(OffsetManager.OffsetsTopicName)
+    assertEquals(1, parts.size)
+    assertNotNull(parts(0).leader())
+    
+    // shutdown the co-ordinator
+    val coordinator = parts(0).leader().id()
+    this.servers(coordinator).shutdown()
+    
+    // this should cause another callback execution
+    while(callback.callsToAssigned < 2)
+      consumer0.poll(50)
+    assertEquals(2, callback.callsToAssigned)
+    assertEquals(2, callback.callsToRevoked)
+    
+    consumer0.close()
+  }
+  
+  class TestConsumerReassignmentCallback extends ConsumerRebalanceCallback {
+    var callsToAssigned = 0
+    var callsToRevoked = 0
+    def onPartitionsAssigned(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition]) {
+      info("onPartitionsAssigned called.")
+      callsToAssigned += 1
+    }
+    def onPartitionsRevoked(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition]) {
+      info("onPartitionsRevoked called.")
+      callsToRevoked += 1
+    } 
+  }
+
+  private def sendRecords(numRecords: Int) {
+    val futures = (0 until numRecords).map { i =>
+      this.producers(0).send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes))
+    }
+    futures.map(_.get)
+  }
+
+  private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int, startingOffset: Int) {
+    val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
+    val maxIters = numRecords * 300
+    var iters = 0
+    while (records.size < numRecords) {
+      for (record <- consumer.poll(50))
+        records.add(record)
+      if(iters > maxIters)
+        throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.");
+      iters += 1
+    }
+    for (i <- 0 until numRecords) {
+      val record = records.get(i)
+      val offset = startingOffset + i
+      assertEquals(topic, record.topic())
+      assertEquals(part, record.partition())
+      assertEquals(offset.toLong, record.offset())
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
new file mode 100644
index 0000000..5650b4a
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.scalatest.junit.JUnit3Suite
+import collection._
+import kafka.utils.TestUtils
+import java.util.Properties
+import java.util.Arrays
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.clients.producer.KafkaProducer
+import kafka.server.KafkaConfig
+import kafka.integration.KafkaServerTestHarness
+import scala.collection.mutable.Buffer
+
+/**
+ * A helper class for writing integration tests that involve producers, consumers, and servers
+ */
+trait IntegrationTestHarness extends KafkaServerTestHarness {
+
+  val producerCount: Int
+  val consumerCount: Int
+  val serverCount: Int
+  lazy val producerConfig = new Properties
+  lazy val consumerConfig = new Properties
+  lazy val serverConfig = new Properties
+  override lazy val configs = {
+    val cfgs = TestUtils.createBrokerConfigs(serverCount)
+    cfgs.map(_.putAll(serverConfig))
+    cfgs.map(new KafkaConfig(_))
+  }
+  
+  var consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
+  var producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
+  
+  override def setUp() {
+    super.setUp()
+    producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl)
+    producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
+    producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
+    consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl)
+    consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
+    consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
+    for(i <- 0 until producerCount)
+      producers += new KafkaProducer(producerConfig)
+    for(i <- 0 until consumerCount)
+      consumers += new KafkaConsumer(consumerConfig)
+  }
+  
+  override def tearDown() {
+    producers.map(_.close())
+    consumers.map(_.close())
+    super.tearDown()
+  }
+
+}