You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2015/01/30 03:39:33 UTC

[3/7] kafka git commit: KAFKA-1760: New consumer.

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 121e880..ee1f78f 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.protocol.types;
 
@@ -124,14 +120,6 @@ public class Struct {
         return (Long) get(name);
     }
 
-    public ByteBuffer getBytes(Field field) {
-        return (ByteBuffer) get(field);
-    }
-
-    public ByteBuffer getBytes(String name) {
-        return (ByteBuffer) get(name);
-    }
-
     public Object[] getArray(Field field) {
         return (Object[]) get(field);
     }
@@ -148,6 +136,14 @@ public class Struct {
         return (String) get(name);
     }
 
+    public ByteBuffer getBytes(Field field) {
+        return (ByteBuffer) get(field);
+    }
+
+    public ByteBuffer getBytes(String name) {
+        return (ByteBuffer) get(name);
+    }
+
     /**
      * Set the given field to the specified value
      * 
@@ -175,9 +171,9 @@ public class Struct {
     }
 
     /**
-     * Create a struct for the schema of a container type (struct or array).
-     * Note that for array type, this method assumes that the type is an array of schema and creates a struct
-     * of that schema. Arrays of other types can't be instantiated with this method.
+     * Create a struct for the schema of a container type (struct or array). Note that for array type, this method
+     * assumes that the type is an array of schema and creates a struct of that schema. Arrays of other types can't be
+     * instantiated with this method.
      * 
      * @param field The field to create an instance of
      * @return The struct

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java b/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
index e4d688c..2e54b56 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
@@ -41,4 +41,8 @@ public final class LogEntry {
     public String toString() {
         return "LogEntry(" + offset + ", " + record + ")";
     }
+    
+    public int size() {
+        return record.size() + Records.LOG_OVERHEAD;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 040e5b9..cc4084f 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -55,7 +55,7 @@ public class MemoryRecords implements Records {
         return emptyRecords(buffer, type, buffer.capacity());
     }
 
-    public static MemoryRecords iterableRecords(ByteBuffer buffer) {
+    public static MemoryRecords readableRecords(ByteBuffer buffer) {
         return new MemoryRecords(buffer, CompressionType.NONE, false, buffer.capacity());
     }
 
@@ -94,22 +94,21 @@ public class MemoryRecords implements Records {
      * Note that the return value is based on the estimate of the bytes written to the compressor, which may not be
      * accurate if compression is really used. When this happens, the following append may cause dynamic buffer
      * re-allocation in the underlying byte buffer stream.
-     *
+     * 
      * Also note that besides the records' capacity, there is also a size limit for the batch. This size limit may be
      * smaller than the capacity (e.g. when appending a single message whose size is larger than the batch size, the
-     * capacity will be the message size, but the size limit will still be the batch size), and when the records' size has
-     * exceed this limit we also mark this record as full.
+     * capacity will be the message size, but the size limit will still be the batch size), and when the records' size
+     * has exceed this limit we also mark this record as full.
      */
     public boolean hasRoomFor(byte[] key, byte[] value) {
-        return this.writable &&
-            this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value) &&
-            this.sizeLimit >= this.compressor.estimatedBytesWritten();
+        return this.writable && this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD +
+                                                 Record.recordSize(key, value) &&
+               this.sizeLimit >= this.compressor.estimatedBytesWritten();
     }
 
     public boolean isFull() {
-        return !this.writable ||
-            this.capacity <= this.compressor.estimatedBytesWritten() ||
-            this.sizeLimit <= this.compressor.estimatedBytesWritten();
+        return !this.writable || this.capacity <= this.compressor.estimatedBytesWritten() ||
+               this.sizeLimit <= this.compressor.estimatedBytesWritten();
     }
 
     /**
@@ -132,7 +131,7 @@ public class MemoryRecords implements Records {
     public int sizeInBytes() {
         return compressor.buffer().position();
     }
-    
+
     /**
      * The compression rate of this record set
      */
@@ -162,6 +161,25 @@ public class MemoryRecords implements Records {
         ByteBuffer copy = (ByteBuffer) this.buffer.duplicate().flip();
         return new RecordsIterator(copy, CompressionType.NONE, false);
     }
+    
+    @Override
+    public String toString() {
+      Iterator<LogEntry> iter = iterator();
+      StringBuilder builder = new StringBuilder();
+      builder.append('[');
+      while(iter.hasNext()) {
+        LogEntry entry = iter.next();
+        builder.append('(');
+        builder.append("offset=");
+        builder.append(entry.offset());
+        builder.append(",");
+        builder.append("record=");
+        builder.append(entry.record());
+        builder.append(")");
+      }
+      builder.append(']');
+      return builder.toString();
+    }
 
     public static class RecordsIterator extends AbstractIterator<LogEntry> {
         private final ByteBuffer buffer;
@@ -174,7 +192,7 @@ public class MemoryRecords implements Records {
             this.type = type;
             this.buffer = buffer;
             this.shallow = shallow;
-            stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type);
+            this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type);
         }
 
         /*
@@ -199,7 +217,10 @@ public class MemoryRecords implements Records {
                     ByteBuffer rec;
                     if (type == CompressionType.NONE) {
                         rec = buffer.slice();
-                        buffer.position(buffer.position() + size);
+                        int newPos = buffer.position() + size;
+                        if(newPos > buffer.limit())
+                          return allDone();
+                        buffer.position(newPos);
                         rec.limit(size);
                     } else {
                         byte[] recordBuffer = new byte[size];
@@ -207,7 +228,6 @@ public class MemoryRecords implements Records {
                         rec = ByteBuffer.wrap(recordBuffer);
                     }
                     LogEntry entry = new LogEntry(offset, new Record(rec));
-                    entry.record().ensureValid();
 
                     // decide whether to go shallow or deep iteration if it is compressed
                     CompressionType compression = entry.record().compressionType();

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
index 99b52c2..4c99d4a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
@@ -20,13 +20,14 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 
 public class ConsumerMetadataRequest extends AbstractRequestResponse {
-    public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.CONSUMER_METADATA.id);
-    private static String GROUP_ID_KEY_NAME = "group_id";
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.CONSUMER_METADATA.id);
+    private static final String GROUP_ID_KEY_NAME = "group_id";
 
     private final String groupId;
 
     public ConsumerMetadataRequest(String groupId) {
-        super(new Struct(curSchema));
+        super(new Struct(CURRENT_SCHEMA));
 
         struct.set(GROUP_ID_KEY_NAME, groupId);
         this.groupId = groupId;
@@ -42,6 +43,6 @@ public class ConsumerMetadataRequest extends AbstractRequestResponse {
     }
 
     public static ConsumerMetadataRequest parse(ByteBuffer buffer) {
-        return new ConsumerMetadataRequest(((Struct) curSchema.read(buffer)));
+        return new ConsumerMetadataRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
index 8b8f591..173333b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
@@ -21,20 +21,21 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 
 public class ConsumerMetadataResponse extends AbstractRequestResponse {
-    private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.CONSUMER_METADATA.id);
-    private static String ERROR_CODE_KEY_NAME = "error_code";
-    private static String COORDINATOR_KEY_NAME = "coordinator";
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.CONSUMER_METADATA.id);
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final String COORDINATOR_KEY_NAME = "coordinator";
 
     // coordinator level field names
-    private static String NODE_ID_KEY_NAME = "node_id";
-    private static String HOST_KEY_NAME = "host";
-    private static String PORT_KEY_NAME = "port";
+    private static final String NODE_ID_KEY_NAME = "node_id";
+    private static final String HOST_KEY_NAME = "host";
+    private static final String PORT_KEY_NAME = "port";
 
     private final short errorCode;
     private final Node node;
 
     public ConsumerMetadataResponse(short errorCode, Node node) {
-        super(new Struct(curSchema));
+        super(new Struct(CURRENT_SCHEMA));
         struct.set(ERROR_CODE_KEY_NAME, errorCode);
         Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
         coordinator.set(NODE_ID_KEY_NAME, node.id());
@@ -64,6 +65,6 @@ public class ConsumerMetadataResponse extends AbstractRequestResponse {
     }
 
     public static ConsumerMetadataResponse parse(ByteBuffer buffer) {
-        return new ConsumerMetadataResponse(((Struct) curSchema.read(buffer)));
+        return new ConsumerMetadataResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 2fc471f..2529a09 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -1,21 +1,23 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.requests;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ProtoUtils;
@@ -23,27 +25,23 @@ import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 public class FetchRequest extends AbstractRequestResponse {
-    public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.FETCH.id);
-    private static String REPLICA_ID_KEY_NAME = "replica_id";
-    private static String MAX_WAIT_KEY_NAME = "max_wait_time";
-    private static String MIN_BYTES_KEY_NAME = "min_bytes";
-    private static String TOPICS_KEY_NAME = "topics";
+    
+    public static final int CONSUMER_REPLICA_ID = -1;
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.FETCH.id);
+    private static final String REPLICA_ID_KEY_NAME = "replica_id";
+    private static final String MAX_WAIT_KEY_NAME = "max_wait_time";
+    private static final String MIN_BYTES_KEY_NAME = "min_bytes";
+    private static final String TOPICS_KEY_NAME = "topics";
 
     // topic level field names
-    private static String TOPIC_KEY_NAME = "topic";
-    private static String PARTITIONS_KEY_NAME = "partitions";
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
 
     // partition level field names
-    private static String PARTITION_KEY_NAME = "partition";
-    private static String FETCH_OFFSET_KEY_NAME = "fetch_offset";
-    private static String MAX_BYTES_KEY_NAME = "max_bytes";
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String FETCH_OFFSET_KEY_NAME = "fetch_offset";
+    private static final String MAX_BYTES_KEY_NAME = "max_bytes";
 
     private final int replicaId;
     private final int maxWait;
@@ -60,15 +58,25 @@ public class FetchRequest extends AbstractRequestResponse {
         }
     }
 
+    /**
+     * Create a non-replica fetch request
+     */
+    public FetchRequest(int maxWait, int minBytes, Map<TopicPartition, PartitionData> fetchData) {
+        this(CONSUMER_REPLICA_ID, maxWait, minBytes, fetchData);
+    }
+
+    /**
+     * Create a replica fetch request
+     */
     public FetchRequest(int replicaId, int maxWait, int minBytes, Map<TopicPartition, PartitionData> fetchData) {
-        super(new Struct(curSchema));
+        super(new Struct(CURRENT_SCHEMA));
         Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(fetchData);
 
         struct.set(REPLICA_ID_KEY_NAME, replicaId);
         struct.set(MAX_WAIT_KEY_NAME, maxWait);
         struct.set(MIN_BYTES_KEY_NAME, minBytes);
         List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
+        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry : topicsData.entrySet()) {
             Struct topicData = struct.instance(TOPICS_KEY_NAME);
             topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
             List<Struct> partitionArray = new ArrayList<Struct>();
@@ -127,6 +135,6 @@ public class FetchRequest extends AbstractRequestResponse {
     }
 
     public static FetchRequest parse(ByteBuffer buffer) {
-        return new FetchRequest(((Struct) curSchema.read(buffer)));
+        return new FetchRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index f719010..c1e5f44 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -30,18 +30,19 @@ import java.util.List;
 import java.util.Map;
 
 public class FetchResponse extends AbstractRequestResponse {
-    public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.FETCH.id);
-    private static String RESPONSES_KEY_NAME = "responses";
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.FETCH.id);
+    private static final String RESPONSES_KEY_NAME = "responses";
 
     // topic level field names
-    private static String TOPIC_KEY_NAME = "topic";
-    private static String PARTITIONS_KEY_NAME = "partition_responses";
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partition_responses";
 
     // partition level field names
-    private static String PARTITION_KEY_NAME = "partition";
-    private static String ERROR_CODE_KEY_NAME = "error_code";
-    private static String HIGH_WATERMARK_KEY_NAME = "high_watermark";
-    private static String RECORD_SET_KEY_NAME = "record_set";
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final String HIGH_WATERMARK_KEY_NAME = "high_watermark";
+    private static final String RECORD_SET_KEY_NAME = "record_set";
 
     private final Map<TopicPartition, PartitionData> responseData;
 
@@ -58,7 +59,7 @@ public class FetchResponse extends AbstractRequestResponse {
     }
 
     public FetchResponse(Map<TopicPartition, PartitionData> responseData) {
-        super(new Struct(curSchema));
+        super(new Struct(CURRENT_SCHEMA));
         Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
 
         List<Struct> topicArray = new ArrayList<Struct>();
@@ -105,6 +106,6 @@ public class FetchResponse extends AbstractRequestResponse {
     }
 
     public static FetchResponse parse(ByteBuffer buffer) {
-        return new FetchResponse(((Struct) curSchema.read(buffer)));
+        return new FetchResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
index 9512db2..cfdb5de 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
@@ -20,17 +20,18 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 
 public class HeartbeatRequest extends AbstractRequestResponse {
-    public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.HEARTBEAT.id);
-    private static String GROUP_ID_KEY_NAME = "group_id";
-    private static String GROUP_GENERATION_ID_KEY_NAME = "group_generation_id";
-    private static String CONSUMER_ID_KEY_NAME = "consumer_id";
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.HEARTBEAT.id);
+    private static final String GROUP_ID_KEY_NAME = "group_id";
+    private static final String GROUP_GENERATION_ID_KEY_NAME = "group_generation_id";
+    private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
 
     private final String groupId;
     private final int groupGenerationId;
     private final String consumerId;
 
     public HeartbeatRequest(String groupId, int groupGenerationId, String consumerId) {
-        super(new Struct(curSchema));
+        super(new Struct(CURRENT_SCHEMA));
         struct.set(GROUP_ID_KEY_NAME, groupId);
         struct.set(GROUP_GENERATION_ID_KEY_NAME, groupGenerationId);
         struct.set(CONSUMER_ID_KEY_NAME, consumerId);
@@ -59,6 +60,6 @@ public class HeartbeatRequest extends AbstractRequestResponse {
     }
 
     public static HeartbeatRequest parse(ByteBuffer buffer) {
-        return new HeartbeatRequest(((Struct) curSchema.read(buffer)));
+        return new HeartbeatRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index 8997ffc..ea964f7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -20,12 +20,13 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 
 public class HeartbeatResponse extends AbstractRequestResponse {
-    private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.HEARTBEAT.id);
-    private static String ERROR_CODE_KEY_NAME = "error_code";
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.HEARTBEAT.id);
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     private final short errorCode;
     public HeartbeatResponse(short errorCode) {
-        super(new Struct(curSchema));
+        super(new Struct(CURRENT_SCHEMA));
         struct.set(ERROR_CODE_KEY_NAME, errorCode);
         this.errorCode = errorCode;
     }
@@ -40,6 +41,6 @@ public class HeartbeatResponse extends AbstractRequestResponse {
     }
 
     public static HeartbeatResponse parse(ByteBuffer buffer) {
-        return new HeartbeatResponse(((Struct) curSchema.read(buffer)));
+        return new HeartbeatResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index d6e91f3..a1d48c9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -22,12 +22,13 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class JoinGroupRequest extends AbstractRequestResponse {
-    public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.JOIN_GROUP.id);
-    private static String GROUP_ID_KEY_NAME = "group_id";
-    private static String SESSION_TIMEOUT_KEY_NAME = "session_timeout";
-    private static String TOPICS_KEY_NAME = "topics";
-    private static String CONSUMER_ID_KEY_NAME = "consumer_id";
-    private static String STRATEGY_KEY_NAME = "partition_assignment_strategy";
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.JOIN_GROUP.id);
+    private static final String GROUP_ID_KEY_NAME = "group_id";
+    private static final String SESSION_TIMEOUT_KEY_NAME = "session_timeout";
+    private static final String TOPICS_KEY_NAME = "topics";
+    private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
+    private static final String STRATEGY_KEY_NAME = "partition_assignment_strategy";
 
     private final String groupId;
     private final int sessionTimeout;
@@ -36,7 +37,7 @@ public class JoinGroupRequest extends AbstractRequestResponse {
     private final String strategy;
 
     public JoinGroupRequest(String groupId, int sessionTimeout, List<String> topics, String consumerId, String strategy) {
-        super(new Struct(curSchema));
+        super(new Struct(CURRENT_SCHEMA));
         struct.set(GROUP_ID_KEY_NAME, groupId);
         struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout);
         struct.set(TOPICS_KEY_NAME, topics.toArray());
@@ -82,6 +83,6 @@ public class JoinGroupRequest extends AbstractRequestResponse {
     }
 
     public static JoinGroupRequest parse(ByteBuffer buffer) {
-        return new JoinGroupRequest(((Struct) curSchema.read(buffer)));
+        return new JoinGroupRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index efe8979..1e9f349 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -23,16 +23,17 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 public class JoinGroupResponse extends AbstractRequestResponse {
-    public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.JOIN_GROUP.id);
-    private static String ERROR_CODE_KEY_NAME = "error_code";
-    private static String GENERATION_ID_KEY_NAME = "group_generation_id";
-    private static String CONSUMER_ID_KEY_NAME = "consumer_id";
-    private static String ASSIGNED_PARTITIONS_KEY_NAME = "assigned_partitions";
-    private static String TOPIC_KEY_NAME = "topic";
-    private static String PARTITIONS_KEY_NAME = "partitions";
-
-    public static int UNKNOWN_GENERATION_ID = -1;
-    public static String UNKNOWN_CONSUMER_ID = "";
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.JOIN_GROUP.id);
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final String GENERATION_ID_KEY_NAME = "group_generation_id";
+    private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
+    private static final String ASSIGNED_PARTITIONS_KEY_NAME = "assigned_partitions";
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+
+    public static final int UNKNOWN_GENERATION_ID = -1;
+    public static final String UNKNOWN_CONSUMER_ID = "";
 
     private final short errorCode;
     private final int generationId;
@@ -40,7 +41,7 @@ public class JoinGroupResponse extends AbstractRequestResponse {
     private final List<TopicPartition> assignedPartitions;
 
     public JoinGroupResponse(short errorCode, int generationId, String consumerId, List<TopicPartition> assignedPartitions) {
-        super(new Struct(curSchema));
+        super(new Struct(CURRENT_SCHEMA));
 
         Map<String, List<Integer>> partitionsByTopic = CollectionUtils.groupDataByTopic(assignedPartitions);
 
@@ -97,6 +98,6 @@ public class JoinGroupResponse extends AbstractRequestResponse {
     }
 
     public static JoinGroupResponse parse(ByteBuffer buffer) {
-        return new JoinGroupResponse(((Struct) curSchema.read(buffer)));
+        return new JoinGroupResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index 99364c1..05c5fed 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -30,18 +30,19 @@ import java.util.List;
 import java.util.Map;
 
 public class ListOffsetRequest extends AbstractRequestResponse {
-    public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.LIST_OFFSETS.id);
-    private static String REPLICA_ID_KEY_NAME = "replica_id";
-    private static String TOPICS_KEY_NAME = "topics";
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LIST_OFFSETS.id);
+    private static final String REPLICA_ID_KEY_NAME = "replica_id";
+    private static final String TOPICS_KEY_NAME = "topics";
 
     // topic level field names
-    private static String TOPIC_KEY_NAME = "topic";
-    private static String PARTITIONS_KEY_NAME = "partitions";
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
 
     // partition level field names
-    private static String PARTITION_KEY_NAME = "partition";
-    private static String TIMESTAMP_KEY_NAME = "timestamp";
-    private static String MAX_NUM_OFFSETS_KEY_NAME = "max_num_offsets";
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String TIMESTAMP_KEY_NAME = "timestamp";
+    private static final String MAX_NUM_OFFSETS_KEY_NAME = "max_num_offsets";
 
     private final int replicaId;
     private final Map<TopicPartition, PartitionData> offsetData;
@@ -55,9 +56,13 @@ public class ListOffsetRequest extends AbstractRequestResponse {
             this.maxNumOffsets = maxNumOffsets;
         }
     }
+    
+    public ListOffsetRequest(Map<TopicPartition, PartitionData> offsetData) {
+    	this(-1, offsetData);
+    }
 
     public ListOffsetRequest(int replicaId, Map<TopicPartition, PartitionData> offsetData) {
-        super(new Struct(curSchema));
+        super(new Struct(CURRENT_SCHEMA));
         Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(offsetData);
 
         struct.set(REPLICA_ID_KEY_NAME, replicaId);
@@ -109,6 +114,6 @@ public class ListOffsetRequest extends AbstractRequestResponse {
     }
 
     public static ListOffsetRequest parse(ByteBuffer buffer) {
-        return new ListOffsetRequest(((Struct) curSchema.read(buffer)));
+        return new ListOffsetRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index ac23971..b2e473e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -30,17 +30,18 @@ import java.util.List;
 import java.util.Map;
 
 public class ListOffsetResponse extends AbstractRequestResponse {
-    public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.LIST_OFFSETS.id);
-    private static String RESPONSES_KEY_NAME = "responses";
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LIST_OFFSETS.id);
+    private static final String RESPONSES_KEY_NAME = "responses";
 
     // topic level field names
-    private static String TOPIC_KEY_NAME = "topic";
-    private static String PARTITIONS_KEY_NAME = "partition_responses";
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partition_responses";
 
     // partition level field names
-    private static String PARTITION_KEY_NAME = "partition";
-    private static String ERROR_CODE_KEY_NAME = "error_code";
-    private static String OFFSETS_KEY_NAME = "offsets";
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final String OFFSETS_KEY_NAME = "offsets";
 
     private final Map<TopicPartition, PartitionData> responseData;
 
@@ -55,7 +56,7 @@ public class ListOffsetResponse extends AbstractRequestResponse {
     }
 
     public ListOffsetResponse(Map<TopicPartition, PartitionData> responseData) {
-        super(new Struct(curSchema));
+        super(new Struct(CURRENT_SCHEMA));
         Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
 
         List<Struct> topicArray = new ArrayList<Struct>();
@@ -103,6 +104,6 @@ public class ListOffsetResponse extends AbstractRequestResponse {
     }
 
     public static ListOffsetResponse parse(ByteBuffer buffer) {
-        return new ListOffsetResponse(((Struct) curSchema.read(buffer)));
+        return new ListOffsetResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index b22ca1d..0186783 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -22,13 +22,14 @@ import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 public class MetadataRequest extends AbstractRequestResponse {
-    public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id);
-    private static String TOPICS_KEY_NAME = "topics";
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id);
+    private static final String TOPICS_KEY_NAME = "topics";
 
     private final List<String> topics;
 
     public MetadataRequest(List<String> topics) {
-        super(new Struct(curSchema));
+        super(new Struct(CURRENT_SCHEMA));
         struct.set(TOPICS_KEY_NAME, topics.toArray());
         this.topics = topics;
     }
@@ -47,6 +48,6 @@ public class MetadataRequest extends AbstractRequestResponse {
     }
 
     public static MetadataRequest parse(ByteBuffer buffer) {
-        return new MetadataRequest(((Struct) curSchema.read(buffer)));
+        return new MetadataRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index d97962d..13daf59 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -28,32 +28,33 @@ import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 public class MetadataResponse extends AbstractRequestResponse {
-    private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.METADATA.id);
-    private static String BROKERS_KEY_NAME = "brokers";
-    private static String TOPIC_METATDATA_KEY_NAME = "topic_metadata";
+    
+    private static Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.METADATA.id);
+    private static final String BROKERS_KEY_NAME = "brokers";
+    private static final String TOPIC_METATDATA_KEY_NAME = "topic_metadata";
 
     // broker level field names
-    private static String NODE_ID_KEY_NAME = "node_id";
-    private static String HOST_KEY_NAME = "host";
-    private static String PORT_KEY_NAME = "port";
+    private static final String NODE_ID_KEY_NAME = "node_id";
+    private static final String HOST_KEY_NAME = "host";
+    private static final String PORT_KEY_NAME = "port";
 
     // topic level field names
-    private static String TOPIC_ERROR_CODE_KEY_NAME = "topic_error_code";
-    private static String TOPIC_KEY_NAME = "topic";
-    private static String PARTITION_METADATA_KEY_NAME = "partition_metadata";
+    private static final String TOPIC_ERROR_CODE_KEY_NAME = "topic_error_code";
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITION_METADATA_KEY_NAME = "partition_metadata";
 
     // partition level field names
-    private static String PARTITION_ERROR_CODE_KEY_NAME = "partition_error_code";
-    private static String PARTITION_KEY_NAME = "partition_id";
-    private static String LEADER_KEY_NAME = "leader";
-    private static String REPLICAS_KEY_NAME = "replicas";
-    private static String ISR_KEY_NAME = "isr";
+    private static final String PARTITION_ERROR_CODE_KEY_NAME = "partition_error_code";
+    private static final String PARTITION_KEY_NAME = "partition_id";
+    private static final String LEADER_KEY_NAME = "leader";
+    private static final String REPLICAS_KEY_NAME = "replicas";
+    private static final String ISR_KEY_NAME = "isr";
 
     private final Cluster cluster;
     private final Map<String, Errors> errors;
 
     public MetadataResponse(Cluster cluster) {
-        super(new Struct(curSchema));
+        super(new Struct(CURRENT_SCHEMA));
 
         List<Struct> brokerArray = new ArrayList<Struct>();
         for (Node node: cluster.nodes()) {
@@ -147,6 +148,6 @@ public class MetadataResponse extends AbstractRequestResponse {
     }
 
     public static MetadataResponse parse(ByteBuffer buffer) {
-        return new MetadataResponse(((Struct) curSchema.read(buffer)));
+        return new MetadataResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 3ee5cba..4fb48c8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -3,15 +3,21 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- *
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.requests;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ProtoUtils;
@@ -19,31 +25,26 @@ import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 /**
  * This wrapper supports both v0 and v1 of OffsetCommitRequest.
  */
 public class OffsetCommitRequest extends AbstractRequestResponse {
-    public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_COMMIT.id);
-    private static String GROUP_ID_KEY_NAME = "group_id";
-    private static String GENERATION_ID_KEY_NAME = "group_generation_id";
-    private static String CONSUMER_ID_KEY_NAME = "consumer_id";
-    private static String TOPICS_KEY_NAME = "topics";
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_COMMIT.id);
+    private static final String GROUP_ID_KEY_NAME = "group_id";
+    private static final String GENERATION_ID_KEY_NAME = "group_generation_id";
+    private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
+    private static final String TOPICS_KEY_NAME = "topics";
 
     // topic level field names
-    private static String TOPIC_KEY_NAME = "topic";
-    private static String PARTITIONS_KEY_NAME = "partitions";
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
 
     // partition level field names
-    private static String PARTITION_KEY_NAME = "partition";
-    private static String COMMIT_OFFSET_KEY_NAME = "offset";
-    private static String TIMESTAMP_KEY_NAME = "timestamp";
-    private static String METADATA_KEY_NAME = "metadata";
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String COMMIT_OFFSET_KEY_NAME = "offset";
+    private static final String TIMESTAMP_KEY_NAME = "timestamp";
+    private static final String METADATA_KEY_NAME = "metadata";
 
     public static final int DEFAULT_GENERATION_ID = -1;
     public static final String DEFAULT_CONSUMER_ID = "";
@@ -88,7 +89,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
      * @param offsetData
      */
     public OffsetCommitRequest(String groupId, int generationId, String consumerId, Map<TopicPartition, PartitionData> offsetData) {
-        super(new Struct(curSchema));
+        super(new Struct(CURRENT_SCHEMA));
 
         initCommonFields(groupId, offsetData);
         struct.set(GENERATION_ID_KEY_NAME, generationId);
@@ -104,7 +105,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
 
         struct.set(GROUP_ID_KEY_NAME, groupId);
         List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
+        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry : topicsData.entrySet()) {
             Struct topicData = struct.instance(TOPICS_KEY_NAME);
             topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
             List<Struct> partitionArray = new ArrayList<Struct>();
@@ -175,6 +176,6 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
     }
 
     public static OffsetCommitRequest parse(ByteBuffer buffer) {
-        return new OffsetCommitRequest(((Struct) curSchema.read(buffer)));
+        return new OffsetCommitRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 711232a..2ab1dc6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -26,21 +26,22 @@ import java.util.List;
 import java.util.Map;
 
 public class OffsetCommitResponse extends AbstractRequestResponse {
-    public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_COMMIT.id);
-    private static String RESPONSES_KEY_NAME = "responses";
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_COMMIT.id);
+    private static final String RESPONSES_KEY_NAME = "responses";
 
     // topic level fields
-    private static String TOPIC_KEY_NAME = "topic";
-    private static String PARTITIONS_KEY_NAME = "partition_responses";
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partition_responses";
 
     // partition level fields
-    private static String PARTITION_KEY_NAME = "partition";
-    private static String ERROR_CODE_KEY_NAME = "error_code";
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     private final Map<TopicPartition, Short> responseData;
 
     public OffsetCommitResponse(Map<TopicPartition, Short> responseData) {
-        super(new Struct(curSchema));
+        super(new Struct(CURRENT_SCHEMA));
 
         Map<String, Map<Integer, Short>> topicsData = CollectionUtils.groupDataByTopic(responseData);
 
@@ -82,6 +83,6 @@ public class OffsetCommitResponse extends AbstractRequestResponse {
     }
 
     public static OffsetCommitResponse parse(ByteBuffer buffer) {
-        return new OffsetCommitResponse(((Struct) curSchema.read(buffer)));
+        return new OffsetCommitResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 90d5135..333483f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -28,16 +28,17 @@ import java.util.Map;
  * This wrapper supports both v0 and v1 of OffsetFetchRequest.
  */
 public class OffsetFetchRequest extends AbstractRequestResponse {
-    public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_FETCH.id);
-    private static String GROUP_ID_KEY_NAME = "group_id";
-    private static String TOPICS_KEY_NAME = "topics";
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_FETCH.id);
+    private static final String GROUP_ID_KEY_NAME = "group_id";
+    private static final String TOPICS_KEY_NAME = "topics";
 
     // topic level field names
-    private static String TOPIC_KEY_NAME = "topic";
-    private static String PARTITIONS_KEY_NAME = "partitions";
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
 
     // partition level field names
-    private static String PARTITION_KEY_NAME = "partition";
+    private static final String PARTITION_KEY_NAME = "partition";
 
     public static final int DEFAULT_GENERATION_ID = -1;
     public static final String DEFAULT_CONSUMER_ID = "";
@@ -46,7 +47,7 @@ public class OffsetFetchRequest extends AbstractRequestResponse {
     private final List<TopicPartition> partitions;
 
     public OffsetFetchRequest(String groupId, List<TopicPartition> partitions) {
-        super(new Struct(curSchema));
+        super(new Struct(CURRENT_SCHEMA));
 
         Map<String, List<Integer>> topicsData = CollectionUtils.groupDataByTopic(partitions);
 
@@ -93,6 +94,6 @@ public class OffsetFetchRequest extends AbstractRequestResponse {
     }
 
     public static OffsetFetchRequest parse(ByteBuffer buffer) {
-        return new OffsetFetchRequest(((Struct) curSchema.read(buffer)));
+        return new OffsetFetchRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 6b7c269..04c88c0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -3,43 +3,45 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- *
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.requests;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 public class OffsetFetchResponse extends AbstractRequestResponse {
-    public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_FETCH.id);
-    private static String RESPONSES_KEY_NAME = "responses";
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_FETCH.id);
+    private static final String RESPONSES_KEY_NAME = "responses";
 
     // topic level fields
-    private static String TOPIC_KEY_NAME = "topic";
-    private static String PARTITIONS_KEY_NAME = "partition_responses";
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partition_responses";
 
     // partition level fields
-    private static String PARTITION_KEY_NAME = "partition";
-    private static String COMMIT_OFFSET_KEY_NAME = "offset";
-    private static String METADATA_KEY_NAME = "metadata";
-    private static String ERROR_CODE_KEY_NAME = "error_code";
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String COMMIT_OFFSET_KEY_NAME = "offset";
+    private static final String METADATA_KEY_NAME = "metadata";
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
 
-    private final Map<TopicPartition,PartitionData> responseData;
+    private final Map<TopicPartition, PartitionData> responseData;
 
     public static final class PartitionData {
         public final long offset;
@@ -51,15 +53,19 @@ public class OffsetFetchResponse extends AbstractRequestResponse {
             this.metadata = metadata;
             this.errorCode = errorCode;
         }
+
+        public boolean hasError() {
+            return this.errorCode != Errors.NONE.code();
+        }
     }
 
     public OffsetFetchResponse(Map<TopicPartition, PartitionData> responseData) {
-        super(new Struct(curSchema));
+        super(new Struct(CURRENT_SCHEMA));
 
         Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
 
         List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, Map<Integer, PartitionData>> entries: topicsData.entrySet()) {
+        for (Map.Entry<String, Map<Integer, PartitionData>> entries : topicsData.entrySet()) {
             Struct topicData = struct.instance(RESPONSES_KEY_NAME);
             topicData.set(TOPIC_KEY_NAME, entries.getKey());
             List<Struct> partitionArray = new ArrayList<Struct>();
@@ -102,6 +108,6 @@ public class OffsetFetchResponse extends AbstractRequestResponse {
     }
 
     public static OffsetFetchResponse parse(ByteBuffer buffer) {
-        return new OffsetFetchResponse(((Struct) curSchema.read(buffer)));
+        return new OffsetFetchResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 3dbba8a..03a0ab1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -27,25 +27,26 @@ import java.util.List;
 import java.util.Map;
 
 public class ProduceRequest  extends AbstractRequestResponse {
-    public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id);
-    private static String ACKS_KEY_NAME = "acks";
-    private static String TIMEOUT_KEY_NAME = "timeout";
-    private static String TOPIC_DATA_KEY_NAME = "topic_data";
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id);
+    private static final String ACKS_KEY_NAME = "acks";
+    private static final String TIMEOUT_KEY_NAME = "timeout";
+    private static final String TOPIC_DATA_KEY_NAME = "topic_data";
 
     // topic level field names
-    private static String TOPIC_KEY_NAME = "topic";
-    private static String PARTITION_DATA_KEY_NAME = "data";
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITION_DATA_KEY_NAME = "data";
 
     // partition level field names
-    private static String PARTITION_KEY_NAME = "partition";
-    private static String RECORD_SET_KEY_NAME = "record_set";
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String RECORD_SET_KEY_NAME = "record_set";
 
     private final short acks;
     private final int timeout;
     private final Map<TopicPartition, ByteBuffer> partitionRecords;
 
     public ProduceRequest(short acks, int timeout, Map<TopicPartition, ByteBuffer> partitionRecords) {
-        super(new Struct(curSchema));
+        super(new Struct(CURRENT_SCHEMA));
         Map<String, Map<Integer, ByteBuffer>> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords);
         struct.set(ACKS_KEY_NAME, acks);
         struct.set(TIMEOUT_KEY_NAME, timeout);
@@ -100,6 +101,6 @@ public class ProduceRequest  extends AbstractRequestResponse {
     }
 
     public static ProduceRequest parse(ByteBuffer buffer) {
-        return new ProduceRequest(((Struct) curSchema.read(buffer)));
+        return new ProduceRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 5220464..e42d7db 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -26,22 +26,23 @@ import java.util.List;
 import java.util.Map;
 
 public class ProduceResponse extends AbstractRequestResponse {
-    private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id);
-    private static String RESPONSES_KEY_NAME = "responses";
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id);
+    private static final String RESPONSES_KEY_NAME = "responses";
 
     // topic level field names
-    private static String TOPIC_KEY_NAME = "topic";
-    private static String PARTITION_RESPONSES_KEY_NAME = "partition_responses";
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITION_RESPONSES_KEY_NAME = "partition_responses";
 
     // partition level field names
-    private static String PARTITION_KEY_NAME = "partition";
-    private static String ERROR_CODE_KEY_NAME = "error_code";
-    private static String BASE_OFFSET_KEY_NAME = "base_offset";
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final String BASE_OFFSET_KEY_NAME = "base_offset";
 
     private final Map<TopicPartition, PartitionResponse> responses;
 
     public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) {
-        super(new Struct(curSchema));
+        super(new Struct(CURRENT_SCHEMA));
         Map<String, Map<Integer, PartitionResponse>> responseByTopic = CollectionUtils.groupDataByTopic(responses);
         List<Struct> topicDatas = new ArrayList<Struct>(responseByTopic.size());
         for (Map.Entry<String, Map<Integer, PartitionResponse>> entry : responseByTopic.entrySet()) {
@@ -107,6 +108,6 @@ public class ProduceResponse extends AbstractRequestResponse {
     }
 
     public static ProduceResponse parse(ByteBuffer buffer) {
-        return new ProduceResponse(((Struct) curSchema.read(buffer)));
+        return new ProduceResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
index 3c001d3..13be6a3 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
@@ -29,7 +29,7 @@ public interface Deserializer<T> {
      * @param isKey whether is for key or value
      */
     public void configure(Map<String, ?> configs, boolean isKey);
-
+    
     /**
      *
      * @param topic topic associated with the data

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 527dd0f..8a305b0 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -17,6 +17,9 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -224,6 +227,18 @@ public class Utils {
     }
 
     /**
+     * Sleep for a bit
+     * @param ms The duration of the sleep
+     */
+    public static void sleep(long ms) {
+        try {
+            Thread.sleep(ms);
+        } catch (InterruptedException e) {
+            // this is okay, we just wake up early
+        }
+    }
+
+    /**
      * Instantiate the class
      */
     public static Object newInstance(Class<?> c) {
@@ -313,4 +328,31 @@ public class Utils {
                 ? "[" + host + "]:" + port // IPv6
                 : host + ":" + port;
     }
+    
+    /**
+     * Create a string representation of an array joined by the given separator
+     * @param strs The array of items
+     * @param seperator The separator
+     * @return The string representation.
+     */
+    public static <T> String join(T[] strs, String seperator) {
+        return join(Arrays.asList(strs), seperator);
+    }
+    
+    /**
+     * Create a string representation of a list joined by the given separator
+     * @param list The list of items
+     * @param seperator The separator
+     * @return The string representation.
+     */
+    public static <T> String join(Collection<T> list, String seperator) {
+        StringBuilder sb = new StringBuilder();
+        Iterator<T> iter = list.iterator();
+        while(iter.hasNext()) {
+            sb.append(iter.next());
+            if(iter.hasNext())
+            sb.append(seperator);  
+        }
+      return sb.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 47b5d4a..67bee40 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -59,13 +59,33 @@ public class MockClient implements KafkaClient {
     }
 
     @Override
-    public List<ClientResponse> poll(List<ClientRequest> requests, long timeoutMs, long now) {
-        this.requests.addAll(requests);
-        List<ClientResponse> copy = new ArrayList<ClientResponse>(this.responses);
+    public void send(ClientRequest request) {
+        this.requests.add(request);
+    }
+
+    @Override
+    public List<ClientResponse> poll(long timeoutMs, long now) {
+        for(ClientResponse response: this.responses)
+            if (response.request().hasCallback()) 
+                response.request().callback().onComplete(response);
+        List<ClientResponse> copy = new ArrayList<ClientResponse>();
         this.responses.clear();
         return copy;
     }
 
+    @Override
+    public List<ClientResponse> completeAll(int node, long now) {
+        return completeAll(now);
+    }
+
+    @Override
+    public List<ClientResponse> completeAll(long now) {
+        List<ClientResponse> responses = poll(0, now);
+        if (requests.size() > 0)
+            throw new IllegalStateException("Requests without responses remain.");
+        return responses;
+    }
+
     public Queue<ClientRequest> requests() {
         return this.requests;
     }
@@ -81,6 +101,11 @@ public class MockClient implements KafkaClient {
     }
 
     @Override
+    public int inFlightRequestCount(int nodeId) {
+        return requests.size();
+    }
+
+    @Override
     public RequestHeader nextRequestHeader(ApiKeys key) {
         return new RequestHeader(key.id, "mock", correlation++);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 1a55242..5debcd6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -5,7 +5,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -46,14 +45,13 @@ public class NetworkClientTest {
 
     @Test
     public void testReadyAndDisconnect() {
-        List<ClientRequest> reqs = new ArrayList<ClientRequest>();
         assertFalse("Client begins unready as it has no connection.", client.ready(node, time.milliseconds()));
         assertEquals("The connection is established as a side-effect of the readiness check", 1, selector.connected().size());
-        client.poll(reqs, 1, time.milliseconds());
+        client.poll(1, time.milliseconds());
         selector.clear();
         assertTrue("Now the client is ready", client.ready(node, time.milliseconds()));
         selector.disconnect(node.id());
-        client.poll(reqs, 1, time.milliseconds());
+        client.poll(1, time.milliseconds());
         selector.clear();
         assertFalse("After we forced the disconnection the client is no longer ready.", client.ready(node, time.milliseconds()));
         assertTrue("Metadata should get updated.", metadata.timeToNextUpdate(time.milliseconds()) == 0);
@@ -65,7 +63,8 @@ public class NetworkClientTest {
                                            client.nextRequestHeader(ApiKeys.METADATA),
                                            new MetadataRequest(Arrays.asList("test")).toStruct());
         ClientRequest request = new ClientRequest(time.milliseconds(), false, send, null);
-        client.poll(Arrays.asList(request), 1, time.milliseconds());
+        client.send(request);
+        client.poll(1, time.milliseconds());
     }
 
     @Test
@@ -73,9 +72,11 @@ public class NetworkClientTest {
         ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, ByteBuffer>emptyMap());
         RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE);
         RequestSend send = new RequestSend(node.id(), reqHeader, produceRequest.toStruct());
-        ClientRequest request = new ClientRequest(time.milliseconds(), true, send, null);
+        TestCallbackHandler handler = new TestCallbackHandler();
+        ClientRequest request = new ClientRequest(time.milliseconds(), true, send, handler);
         awaitReady(client, node);
-        client.poll(Arrays.asList(request), 1, time.milliseconds());
+        client.send(request);
+        client.poll(1, time.milliseconds());
         assertEquals(1, client.inFlightRequestCount());
         ResponseHeader respHeader = new ResponseHeader(reqHeader.correlationId());
         Struct resp = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id));
@@ -86,16 +87,26 @@ public class NetworkClientTest {
         resp.writeTo(buffer);
         buffer.flip();
         selector.completeReceive(new NetworkReceive(node.id(), buffer));
-        List<ClientResponse> responses = client.poll(new ArrayList<ClientRequest>(), 1, time.milliseconds());
+        List<ClientResponse> responses = client.poll(1, time.milliseconds());
         assertEquals(1, responses.size());
-        ClientResponse response = responses.get(0);
-        assertTrue("Should have a response body.", response.hasResponse());
-        assertEquals("Should be correlated to the original request", request, response.request());
+        assertTrue("The handler should have executed.", handler.executed);
+        assertTrue("Should have a response body.", handler.response.hasResponse());
+        assertEquals("Should be correlated to the original request", request, handler.response.request());
     }
 
     private void awaitReady(NetworkClient client, Node node) {
         while (!client.ready(node, time.milliseconds()))
-            client.poll(new ArrayList<ClientRequest>(), 1, time.milliseconds());
+            client.poll(1, time.milliseconds());
+    }
+    
+    private static class TestCallbackHandler implements RequestCompletionHandler {
+        public boolean executed = false;
+        public ClientResponse response;
+        
+        public void onComplete(ClientResponse response) {
+            this.executed = true;
+            this.response = response;
+        }
     }
 
 }