You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/11/15 00:46:30 UTC

[4/5] kafka git commit: KAFKA-2066; Use client-side FetchRequest/FetchResponse on server

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java
new file mode 100644
index 0000000..4a4d569
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.common.record;
+
+import java.io.IOException;
+
+/**
+ * An abstraction between an underlying input stream and record iterators, a LogInputStream
+ * returns only the shallow log entries, depending on {@link org.apache.kafka.common.record.RecordsIterator.DeepRecordsIterator}
+ * for the deep iteration.
+ */
+interface LogInputStream {
+
+    /**
+     * Get the next log entry from the underlying input stream.
+     *
+     * @return The next log entry or null if there is none
+     * @throws IOException for any IO errors
+     */
+    LogEntry nextEntry() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 3848ea9..65ccf98 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -13,21 +13,18 @@
 package org.apache.kafka.common.record;
 
 import java.io.DataInputStream;
-import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
+import java.nio.channels.GatheringByteChannel;
 import java.util.Iterator;
 
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.utils.AbstractIterator;
-import org.apache.kafka.common.utils.Utils;
-
 /**
  * A {@link Records} implementation backed by a ByteBuffer.
  */
 public class MemoryRecords implements Records {
 
+    public final static MemoryRecords EMPTY = MemoryRecords.readableRecords(ByteBuffer.allocate(0));
+
     private final static int WRITE_LIMIT_FOR_READABLE_ONLY = -1;
 
     // the compressor used for appends-only
@@ -148,6 +145,7 @@ public class MemoryRecords implements Records {
     /**
      * The size of this record set
      */
+    @Override
     public int sizeInBytes() {
         if (writable) {
             return compressor.buffer().position();
@@ -156,6 +154,15 @@ public class MemoryRecords implements Records {
         }
     }
 
+    @Override
+    public long writeTo(GatheringByteChannel channel, long offset, int length) throws IOException {
+        ByteBuffer dup = buffer.duplicate();
+        int position = (int) offset;
+        dup.position(position);
+        dup.limit(position + length);
+        return channel.write(dup);
+    }
+
     /**
      * The compression rate of this record set
      */
@@ -186,13 +193,11 @@ public class MemoryRecords implements Records {
 
     @Override
     public Iterator<LogEntry> iterator() {
-        if (writable) {
+        ByteBuffer input = this.buffer.duplicate();
+        if (writable)
             // flip on a duplicate buffer for reading
-            return new RecordsIterator((ByteBuffer) this.buffer.duplicate().flip(), false);
-        } else {
-            // do not need to flip for non-writable buffer
-            return new RecordsIterator(this.buffer.duplicate(), false);
-        }
+            input.flip();
+        return new RecordsIterator(new ByteBufferLogInputStream(input), false);
     }
     
     @Override
@@ -219,151 +224,44 @@ public class MemoryRecords implements Records {
         return writable;
     }
 
-    public static class RecordsIterator extends AbstractIterator<LogEntry> {
-        private final ByteBuffer buffer;
-        private final DataInputStream stream;
-        private final CompressionType type;
-        private final boolean shallow;
-        private RecordsIterator innerIter;
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
 
-        // The variables for inner iterator
-        private final ArrayDeque<LogEntry> logEntries;
-        private final long absoluteBaseOffset;
+        MemoryRecords that = (MemoryRecords) o;
 
-        public RecordsIterator(ByteBuffer buffer, boolean shallow) {
-            this.type = CompressionType.NONE;
-            this.buffer = buffer;
-            this.shallow = shallow;
-            this.stream = new DataInputStream(new ByteBufferInputStream(buffer));
-            this.logEntries = null;
-            this.absoluteBaseOffset = -1;
-        }
+        return buffer.equals(that.buffer);
 
-        // Private constructor for inner iterator.
-        private RecordsIterator(LogEntry entry) {
-            this.type = entry.record().compressionType();
-            this.buffer = entry.record().value();
-            this.shallow = true;
-            this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type, entry.record().magic());
-            long wrapperRecordOffset = entry.offset();
-
-            long wrapperRecordTimestamp = entry.record().timestamp();
-            this.logEntries = new ArrayDeque<>();
-            // If relative offset is used, we need to decompress the entire message first to compute
-            // the absolute offset. For simplicity and because it's a format that is on its way out, we
-            // do the same for message format version 0
-            try {
-                while (true) {
-                    try {
-                        LogEntry logEntry = getNextEntryFromStream();
-                        if (entry.record().magic() > Record.MAGIC_VALUE_V0) {
-                            Record recordWithTimestamp = new Record(
-                                    logEntry.record().buffer(),
-                                    wrapperRecordTimestamp,
-                                    entry.record().timestampType()
-                            );
-                            logEntry = new LogEntry(logEntry.offset(), recordWithTimestamp);
-                        }
-                        logEntries.add(logEntry);
-                    } catch (EOFException e) {
-                        break;
-                    }
-                }
-                if (entry.record().magic() > Record.MAGIC_VALUE_V0)
-                    this.absoluteBaseOffset = wrapperRecordOffset - logEntries.getLast().offset();
-                else
-                    this.absoluteBaseOffset = -1;
-            } catch (IOException e) {
-                throw new KafkaException(e);
-            } finally {
-                Utils.closeQuietly(stream, "records iterator stream");
-            }
-        }
+    }
 
-        /*
-         * Read the next record from the buffer.
-         * 
-         * Note that in the compressed message set, each message value size is set as the size of the un-compressed
-         * version of the message value, so when we do de-compression allocating an array of the specified size for
-         * reading compressed value data is sufficient.
-         */
-        @Override
-        protected LogEntry makeNext() {
-            if (innerDone()) {
-                try {
-                    LogEntry entry = getNextEntry();
-                    // No more record to return.
-                    if (entry == null)
-                        return allDone();
-
-                    // Convert offset to absolute offset if needed.
-                    if (absoluteBaseOffset >= 0) {
-                        long absoluteOffset = absoluteBaseOffset + entry.offset();
-                        entry = new LogEntry(absoluteOffset, entry.record());
-                    }
-
-                    // decide whether to go shallow or deep iteration if it is compressed
-                    CompressionType compression = entry.record().compressionType();
-                    if (compression == CompressionType.NONE || shallow) {
-                        return entry;
-                    } else {
-                        // init the inner iterator with the value payload of the message,
-                        // which will de-compress the payload to a set of messages;
-                        // since we assume nested compression is not allowed, the deep iterator
-                        // would not try to further decompress underlying messages
-                        // There will be at least one element in the inner iterator, so we don't
-                        // need to call hasNext() here.
-                        innerIter = new RecordsIterator(entry);
-                        return innerIter.next();
-                    }
-                } catch (EOFException e) {
-                    return allDone();
-                } catch (IOException e) {
-                    throw new KafkaException(e);
-                }
-            } else {
-                return innerIter.next();
-            }
-        }
+    @Override
+    public int hashCode() {
+        return buffer.hashCode();
+    }
 
-        private LogEntry getNextEntry() throws IOException {
-            if (logEntries != null)
-                return getNextEntryFromEntryList();
-            else
-                return getNextEntryFromStream();
-        }
+    private static class ByteBufferLogInputStream implements LogInputStream {
+        private final DataInputStream stream;
+        private final ByteBuffer buffer;
 
-        private LogEntry getNextEntryFromEntryList() {
-            return logEntries.isEmpty() ? null : logEntries.remove();
+        private ByteBufferLogInputStream(ByteBuffer buffer) {
+            this.stream = new DataInputStream(new ByteBufferInputStream(buffer));
+            this.buffer = buffer;
         }
 
-        private LogEntry getNextEntryFromStream() throws IOException {
-            // read the offset
+        public LogEntry nextEntry() throws IOException {
             long offset = stream.readLong();
-            // read record size
             int size = stream.readInt();
             if (size < 0)
                 throw new IllegalStateException("Record with size " + size);
-            // read the record, if compression is used we cannot depend on size
-            // and hence has to do extra copy
-            ByteBuffer rec;
-            if (type == CompressionType.NONE) {
-                rec = buffer.slice();
-                int newPos = buffer.position() + size;
-                if (newPos > buffer.limit())
-                    return null;
-                buffer.position(newPos);
-                rec.limit(size);
-            } else {
-                byte[] recordBuffer = new byte[size];
-                stream.readFully(recordBuffer, 0, size);
-                rec = ByteBuffer.wrap(recordBuffer);
-            }
-            return new LogEntry(offset, new Record(rec));
-        }
 
-        private boolean innerDone() {
-            return innerIter == null || !innerIter.hasNext();
+            ByteBuffer slice = buffer.slice();
+            int newPos = buffer.position() + size;
+            if (newPos > buffer.limit())
+                return null;
+            buffer.position(newPos);
+            slice.limit(size);
+            return new LogEntry(offset, new Record(slice));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/record/Records.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java
index d43cdab..3bc043f 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Records.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.common.record;
 
+import java.io.IOException;
+import java.nio.channels.GatheringByteChannel;
+
 /**
  * A binary format which consists of a 4 byte size, an 8 byte offset, and the record bytes. See {@link MemoryRecords}
  * for the in-memory representation.
@@ -28,7 +31,19 @@ public interface Records extends Iterable<LogEntry> {
 
     /**
      * The size of these records in bytes
+     * @return The size in bytes
      */
     int sizeInBytes();
 
+    /**
+     * Write the messages in this set to the given channel starting at the given offset byte.
+     * @param channel The channel to write to
+     * @param position The position within this record set to begin writing from
+     * @param length The number of bytes to write
+     * @return The number of bytes written to the channel (which may be fewer than requested)
+     * @throws IOException For any IO errors copying the
+     */
+    long writeTo(GatheringByteChannel channel, long position, int length) throws IOException;
+
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
new file mode 100644
index 0000000..1bc8a65
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+
+public class RecordsIterator extends AbstractIterator<LogEntry> {
+    private final LogInputStream logStream;
+    private final boolean shallow;
+    private DeepRecordsIterator innerIter;
+
+    public RecordsIterator(LogInputStream logStream, boolean shallow) {
+        this.logStream = logStream;
+        this.shallow = shallow;
+    }
+
+    /*
+     * Read the next record from the buffer.
+     *
+     * Note that in the compressed message set, each message value size is set as the size of the un-compressed
+     * version of the message value, so when we do de-compression allocating an array of the specified size for
+     * reading compressed value data is sufficient.
+     */
+    @Override
+    protected LogEntry makeNext() {
+        if (innerDone()) {
+            try {
+                LogEntry entry = logStream.nextEntry();
+                // No more record to return.
+                if (entry == null)
+                    return allDone();
+
+                // decide whether to go shallow or deep iteration if it is compressed
+                CompressionType compressionType = entry.record().compressionType();
+                if (compressionType == CompressionType.NONE || shallow) {
+                    return entry;
+                } else {
+                    // init the inner iterator with the value payload of the message,
+                    // which will de-compress the payload to a set of messages;
+                    // since we assume nested compression is not allowed, the deep iterator
+                    // would not try to further decompress underlying messages
+                    // There will be at least one element in the inner iterator, so we don't
+                    // need to call hasNext() here.
+                    innerIter = new DeepRecordsIterator(entry);
+                    return innerIter.next();
+                }
+            } catch (EOFException e) {
+                return allDone();
+            } catch (IOException e) {
+                throw new KafkaException(e);
+            }
+        } else {
+            return innerIter.next();
+        }
+    }
+
+    private boolean innerDone() {
+        return innerIter == null || !innerIter.hasNext();
+    }
+
+    private static class DataLogInputStream implements LogInputStream {
+        private final DataInputStream stream;
+
+        private DataLogInputStream(DataInputStream stream) {
+            this.stream = stream;
+        }
+
+        public LogEntry nextEntry() throws IOException {
+            long offset = stream.readLong();
+            int size = stream.readInt();
+            if (size < 0)
+                throw new IllegalStateException("Record with size " + size);
+
+            byte[] recordBuffer = new byte[size];
+            stream.readFully(recordBuffer, 0, size);
+            ByteBuffer buf = ByteBuffer.wrap(recordBuffer);
+            return new LogEntry(offset, new Record(buf));
+        }
+    }
+
+    private static class DeepRecordsIterator extends AbstractIterator<LogEntry> {
+        private final ArrayDeque<LogEntry> logEntries;
+        private final long absoluteBaseOffset;
+
+        private DeepRecordsIterator(LogEntry entry) {
+            CompressionType compressionType = entry.record().compressionType();
+            ByteBuffer buffer = entry.record().value();
+            DataInputStream stream = Compressor.wrapForInput(new ByteBufferInputStream(buffer), compressionType, entry.record().magic());
+            LogInputStream logStream = new DataLogInputStream(stream);
+
+            long wrapperRecordOffset = entry.offset();
+            long wrapperRecordTimestamp = entry.record().timestamp();
+            this.logEntries = new ArrayDeque<>();
+
+            // If relative offset is used, we need to decompress the entire message first to compute
+            // the absolute offset. For simplicity and because it's a format that is on its way out, we
+            // do the same for message format version 0
+            try {
+                while (true) {
+                    try {
+                        LogEntry logEntry = logStream.nextEntry();
+                        if (entry.record().magic() > Record.MAGIC_VALUE_V0) {
+                            Record recordWithTimestamp = new Record(
+                                    logEntry.record().buffer(),
+                                    wrapperRecordTimestamp,
+                                    entry.record().timestampType()
+                            );
+                            logEntry = new LogEntry(logEntry.offset(), recordWithTimestamp);
+                        }
+                        logEntries.add(logEntry);
+                    } catch (EOFException e) {
+                        break;
+                    }
+                }
+                if (entry.record().magic() > Record.MAGIC_VALUE_V0)
+                    this.absoluteBaseOffset = wrapperRecordOffset - logEntries.getLast().offset();
+                else
+                    this.absoluteBaseOffset = -1;
+            } catch (IOException e) {
+                throw new KafkaException(e);
+            } finally {
+                Utils.closeQuietly(stream, "records iterator stream");
+            }
+        }
+
+        @Override
+        protected LogEntry makeNext() {
+            if (logEntries.isEmpty())
+                return allDone();
+
+            LogEntry entry = logEntries.remove();
+
+            // Convert offset to absolute offset if needed.
+            if (absoluteBaseOffset >= 0) {
+                long absoluteOffset = absoluteBaseOffset + entry.offset();
+                entry = new LogEntry(absoluteOffset, entry.record());
+            }
+
+            // decide whether to go shallow or deep iteration if it is compressed
+            CompressionType compression = entry.record().compressionType();
+            if (compression != CompressionType.NONE)
+                throw new InvalidRecordException("Inner messages must not be compressed");
+
+            return entry;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index e6febe5..80182e6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.network.NetworkSend;
+import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.types.Struct;
 
@@ -27,10 +29,14 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
         super(struct);
     }
 
+    public Send toSend(String destination, RequestHeader header) {
+        return new NetworkSend(destination, serialize(header, this));
+    }
+
     /**
      * Get an error response for a request for a given api version
      */
-    public abstract AbstractRequestResponse getErrorResponse(int versionId, Throwable e);
+    public abstract AbstractResponse getErrorResponse(int versionId, Throwable e);
 
     /**
      * Factory method for getting a request object based on ApiKey ID and a buffer

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
index 37aff6c..3ad16a5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
@@ -19,7 +19,6 @@ import java.nio.ByteBuffer;
 public abstract class AbstractRequestResponse {
     protected final Struct struct;
 
-
     public AbstractRequestResponse(Struct struct) {
         this.struct = struct;
     }
@@ -63,4 +62,12 @@ public abstract class AbstractRequestResponse {
         AbstractRequestResponse other = (AbstractRequestResponse) obj;
         return struct.equals(other.struct);
     }
+
+    public static ByteBuffer serialize(AbstractRequestResponse header, AbstractRequestResponse body) {
+        ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());
+        header.writeTo(buffer);
+        body.writeTo(buffer);
+        buffer.rewind();
+        return buffer;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
new file mode 100644
index 0000000..8bbc25a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.network.NetworkSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+
+public abstract class AbstractResponse extends AbstractRequestResponse {
+
+    public AbstractResponse(Struct struct) {
+        super(struct);
+    }
+
+    public Send toSend(String destination, RequestHeader request) {
+        ResponseHeader responseHeader = new ResponseHeader(request.correlationId());
+        return new NetworkSend(destination, serialize(responseHeader, this));
+    }
+
+    public static AbstractResponse getResponse(int requestId, Struct struct) {
+        ApiKeys apiKey = ApiKeys.forId(requestId);
+        switch (apiKey) {
+            case PRODUCE:
+                return new ProduceResponse(struct);
+            case FETCH:
+                return new FetchResponse(struct);
+            case LIST_OFFSETS:
+                return new ListOffsetResponse(struct);
+            case METADATA:
+                return new MetadataResponse(struct);
+            case OFFSET_COMMIT:
+                return new OffsetCommitResponse(struct);
+            case OFFSET_FETCH:
+                return new OffsetFetchResponse(struct);
+            case GROUP_COORDINATOR:
+                return new GroupCoordinatorResponse(struct);
+            case JOIN_GROUP:
+                return new JoinGroupResponse(struct);
+            case HEARTBEAT:
+                return new HeartbeatResponse(struct);
+            case LEAVE_GROUP:
+                return new LeaveGroupResponse(struct);
+            case SYNC_GROUP:
+                return new SyncGroupResponse(struct);
+            case STOP_REPLICA:
+                return new StopReplicaResponse(struct);
+            case CONTROLLED_SHUTDOWN_KEY:
+                return new ControlledShutdownResponse(struct);
+            case UPDATE_METADATA_KEY:
+                return new UpdateMetadataResponse(struct);
+            case LEADER_AND_ISR:
+                return new LeaderAndIsrResponse(struct);
+            case DESCRIBE_GROUPS:
+                return new DescribeGroupsResponse(struct);
+            case LIST_GROUPS:
+                return new ListGroupsResponse(struct);
+            case SASL_HANDSHAKE:
+                return new SaslHandshakeResponse(struct);
+            case API_VERSIONS:
+                return new ApiVersionsResponse(struct);
+            case CREATE_TOPICS:
+                return new CreateTopicsResponse(struct);
+            case DELETE_TOPICS:
+                return new DeleteTopicsResponse(struct);
+            default:
+                throw new AssertionError(String.format("ApiKey %s is not currently handled in `getResponse`, the " +
+                        "code should be updated to do so.", apiKey));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
index b78c759..d9ef37e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
@@ -34,7 +34,7 @@ public class ApiVersionsRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
         switch (versionId) {
             case 0:
                 short errorCode = Errors.forException(e).code();

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index fe995b2..0bf1039 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -27,7 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class ApiVersionsResponse extends AbstractRequestResponse {
+public class ApiVersionsResponse extends AbstractResponse {
 
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.API_VERSIONS.id);
     private static final ApiVersionsResponse API_VERSIONS_RESPONSE = createApiVersionsResponse();

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
index 9ac127d..c2ace32 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
@@ -42,7 +42,7 @@ public class ControlledShutdownRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
         switch (versionId) {
             case 0:
                 throw new IllegalArgumentException(String.format("Version 0 is not supported. It is only supported by " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
index 862264f..1996f82 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
@@ -24,7 +24,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-public class ControlledShutdownResponse extends AbstractRequestResponse {
+public class ControlledShutdownResponse extends AbstractResponse {
 
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
index 3977835..7c440dd 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
@@ -194,7 +194,7 @@ public class CreateTopicsRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
         Map<String, Errors> topicErrors = new HashMap<>();
         for (String topic : topics.keySet()) {
             topicErrors.put(topic, Errors.forException(e));

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
index da8c3ce..303b779 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
@@ -29,7 +29,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class CreateTopicsResponse extends AbstractRequestResponse {
+public class CreateTopicsResponse extends AbstractResponse {
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.CREATE_TOPICS.id);
 
     private static final String TOPIC_ERROR_CODES_KEY_NAME = "topic_error_codes";

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
index f78c428..0632cc0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
@@ -58,7 +58,7 @@ public class DeleteTopicsRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
         Map<String, Errors> topicErrors = new HashMap<>();
         for (String topic : topics)
             topicErrors.put(topic, Errors.forException(e));

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
index e474feb..ed6a63d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
@@ -28,7 +28,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class DeleteTopicsResponse extends AbstractRequestResponse {
+public class DeleteTopicsResponse extends AbstractResponse {
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.DELETE_TOPICS.id);
     private static final String TOPIC_ERROR_CODES_KEY_NAME = "topic_error_codes";
     private static final String TOPIC_KEY_NAME = "topic";

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
index a870b8f..b965c91 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
@@ -46,7 +46,7 @@ public class DescribeGroupsRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
         switch (versionId) {
             case 0:
                 return DescribeGroupsResponse.fromError(Errors.forException(e), groupIds);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index 2d4faee..2eff628 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -25,7 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class DescribeGroupsResponse extends AbstractRequestResponse {
+public class DescribeGroupsResponse extends AbstractResponse {
 
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.DESCRIBE_GROUPS.id);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index dcdfd9c..fd4c747 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.MemoryRecords;
 
 public class FetchRequest extends AbstractRequest {
 
@@ -80,7 +81,7 @@ public class FetchRequest extends AbstractRequest {
                 int partition = topicEntry.getKey().partition();
                 T partitionData = topicEntry.getValue();
                 if (topics.isEmpty() || !topics.get(topics.size() - 1).topic.equals(topic))
-                    topics.add(new TopicAndPartitionData(topic));
+                    topics.add(new TopicAndPartitionData<T>(topic));
                 topics.get(topics.size() - 1).partitions.put(partition, partitionData);
             }
             return topics;
@@ -131,11 +132,11 @@ public class FetchRequest extends AbstractRequest {
         struct.set(MIN_BYTES_KEY_NAME, minBytes);
         if (version >= 3)
             struct.set(MAX_BYTES_KEY_NAME, maxBytes);
-        List<Struct> topicArray = new ArrayList<Struct>();
+        List<Struct> topicArray = new ArrayList<>();
         for (TopicAndPartitionData<PartitionData> topicEntry : topicsData) {
             Struct topicData = struct.instance(TOPICS_KEY_NAME);
             topicData.set(TOPIC_KEY_NAME, topicEntry.topic);
-            List<Struct> partitionArray = new ArrayList<Struct>();
+            List<Struct> partitionArray = new ArrayList<>();
             for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.partitions.entrySet()) {
                 PartitionData fetchPartitionData = partitionEntry.getValue();
                 Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
@@ -180,13 +181,13 @@ public class FetchRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
         Map<TopicPartition, FetchResponse.PartitionData> responseData = new LinkedHashMap<>();
 
         for (Map.Entry<TopicPartition, PartitionData> entry: fetchData.entrySet()) {
             FetchResponse.PartitionData partitionResponse = new FetchResponse.PartitionData(Errors.forException(e).code(),
-                    FetchResponse.INVALID_HIGHWATERMARK,
-                    FetchResponse.EMPTY_RECORD_SET);
+                    FetchResponse.INVALID_HIGHWATERMARK, MemoryRecords.EMPTY);
+
             responseData.put(entry.getKey(), partitionResponse);
         }
 
@@ -223,6 +224,10 @@ public class FetchRequest extends AbstractRequest {
         return fetchData;
     }
 
+    public boolean isFromFollower() {
+        return replicaId >= 0;
+    }
+
     public static FetchRequest parse(ByteBuffer buffer, int versionId) {
         return new FetchRequest(ProtoUtils.parseRequest(ApiKeys.FETCH.id, versionId, buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 111d197..ec2ab47 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -17,10 +17,15 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.MultiSend;
+import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.common.record.Records;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -29,9 +34,9 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * This wrapper supports both v0 and v1 of FetchResponse.
+ * This wrapper supports all versions of the Fetch API
  */
-public class FetchResponse extends AbstractRequestResponse {
+public class FetchResponse extends AbstractResponse {
     
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.FETCH.id);
     private static final String RESPONSES_KEY_NAME = "responses";
@@ -42,14 +47,15 @@ public class FetchResponse extends AbstractRequestResponse {
     private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
 
     // partition level field names
+    private static final String PARTITION_HEADER_KEY_NAME = "partition_header";
     private static final String PARTITION_KEY_NAME = "partition";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     // Default throttle time
     private static final int DEFAULT_THROTTLE_TIME = 0;
 
-  /**
-     * Possible error code:
+    /**
+     * Possible error codes:
      *
      *  OFFSET_OUT_OF_RANGE (1)
      *  UNKNOWN_TOPIC_OR_PARTITION (3)
@@ -62,7 +68,6 @@ public class FetchResponse extends AbstractRequestResponse {
     private static final String RECORD_SET_KEY_NAME = "record_set";
 
     public static final long INVALID_HIGHWATERMARK = -1L;
-    public static final ByteBuffer EMPTY_RECORD_SET = ByteBuffer.allocate(0);
 
     private final LinkedHashMap<TopicPartition, PartitionData> responseData;
     private final int throttleTime;
@@ -70,12 +75,12 @@ public class FetchResponse extends AbstractRequestResponse {
     public static final class PartitionData {
         public final short errorCode;
         public final long highWatermark;
-        public final ByteBuffer recordSet;
+        public final Records records;
 
-        public PartitionData(short errorCode, long highWatermark, ByteBuffer recordSet) {
+        public PartitionData(short errorCode, long highWatermark, Records records) {
             this.errorCode = errorCode;
             this.highWatermark = highWatermark;
-            this.recordSet = recordSet;
+            this.records = records;
         }
     }
 
@@ -106,32 +111,9 @@ public class FetchResponse extends AbstractRequestResponse {
         this(3, responseData, throttleTime);
     }
 
-    private FetchResponse(int version, LinkedHashMap<TopicPartition, PartitionData> responseData, int throttleTime) {
+    public FetchResponse(int version, LinkedHashMap<TopicPartition, PartitionData> responseData, int throttleTime) {
         super(new Struct(ProtoUtils.responseSchema(ApiKeys.FETCH.id, version)));
-
-        List<FetchRequest.TopicAndPartitionData<PartitionData>> topicsData = FetchRequest.TopicAndPartitionData.batchByTopic(responseData);
-        List<Struct> topicArray = new ArrayList<>();
-        for (FetchRequest.TopicAndPartitionData<PartitionData> topicEntry: topicsData) {
-            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, topicEntry.topic);
-            List<Struct> partitionArray = new ArrayList<>();
-            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.partitions.entrySet()) {
-                PartitionData fetchPartitionData = partitionEntry.getValue();
-                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(ERROR_CODE_KEY_NAME, fetchPartitionData.errorCode);
-                partitionData.set(HIGH_WATERMARK_KEY_NAME, fetchPartitionData.highWatermark);
-                partitionData.set(RECORD_SET_KEY_NAME, fetchPartitionData.recordSet);
-                partitionArray.add(partitionData);
-            }
-            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
-
-        if (version >= 1)
-            struct.set(THROTTLE_TIME_KEY_NAME, throttleTime);
-
+        writeStruct(struct, version, responseData, throttleTime);
         this.responseData = responseData;
         this.throttleTime = throttleTime;
     }
@@ -144,11 +126,12 @@ public class FetchResponse extends AbstractRequestResponse {
             String topic = topicResponse.getString(TOPIC_KEY_NAME);
             for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
                 Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
-                short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
-                long highWatermark = partitionResponse.getLong(HIGH_WATERMARK_KEY_NAME);
-                ByteBuffer recordSet = partitionResponse.getBytes(RECORD_SET_KEY_NAME);
-                PartitionData partitionData = new PartitionData(errorCode, highWatermark, recordSet);
+                Struct partitionResponseHeader = partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME);
+                int partition = partitionResponseHeader.getInt(PARTITION_KEY_NAME);
+                short errorCode = partitionResponseHeader.getShort(ERROR_CODE_KEY_NAME);
+                long highWatermark = partitionResponseHeader.getLong(HIGH_WATERMARK_KEY_NAME);
+                Records records = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
+                PartitionData partitionData = new PartitionData(errorCode, highWatermark, records);
                 responseData.put(new TopicPartition(topic, partition), partitionData);
             }
         }
@@ -156,6 +139,22 @@ public class FetchResponse extends AbstractRequestResponse {
         this.throttleTime = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
     }
 
+    @Override
+    public Send toSend(String dest, RequestHeader requestHeader) {
+        ResponseHeader responseHeader = new ResponseHeader(requestHeader.correlationId());
+
+        // write the total size and the response header
+        ByteBuffer buffer = ByteBuffer.allocate(responseHeader.sizeOf() + 4);
+        buffer.putInt(responseHeader.sizeOf() + struct.sizeOf());
+        responseHeader.writeTo(buffer);
+        buffer.rewind();
+
+        List<Send> sends = new ArrayList<>();
+        sends.add(new ByteBufferSend(dest, buffer));
+        addResponseData(dest, sends);
+        return new MultiSend(dest, sends);
+    }
+
     public LinkedHashMap<TopicPartition, PartitionData> responseData() {
         return responseData;
     }
@@ -171,4 +170,92 @@ public class FetchResponse extends AbstractRequestResponse {
     public static FetchResponse parse(ByteBuffer buffer, int version) {
         return new FetchResponse(ProtoUtils.responseSchema(ApiKeys.FETCH.id, version).read(buffer));
     }
+
+    private void addResponseData(String dest, List<Send> sends) {
+        Object[] allTopicData = struct.getArray(RESPONSES_KEY_NAME);
+
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME)) {
+            int throttleTime = struct.getInt(THROTTLE_TIME_KEY_NAME);
+            ByteBuffer buffer = ByteBuffer.allocate(8);
+            buffer.putInt(throttleTime);
+            buffer.putInt(allTopicData.length);
+            buffer.rewind();
+            sends.add(new ByteBufferSend(dest, buffer));
+        } else {
+            ByteBuffer buffer = ByteBuffer.allocate(4);
+            buffer.putInt(allTopicData.length);
+            buffer.rewind();
+            sends.add(new ByteBufferSend(dest, buffer));
+        }
+
+        for (Object topicData : allTopicData)
+            addTopicData(dest, sends, (Struct) topicData);
+    }
+
+    private void addTopicData(String dest, List<Send> sends, Struct topicData) {
+        String topic = topicData.getString(TOPIC_KEY_NAME);
+        Object[] allPartitionData = topicData.getArray(PARTITIONS_KEY_NAME);
+
+        // include the topic header and the count for the number of partitions
+        ByteBuffer buffer = ByteBuffer.allocate(Type.STRING.sizeOf(topic) + 4);
+        Type.STRING.write(buffer, topic);
+        buffer.putInt(allPartitionData.length);
+        buffer.rewind();
+        sends.add(new ByteBufferSend(dest, buffer));
+
+        for (Object partitionData : allPartitionData)
+            addPartitionData(dest, sends, (Struct) partitionData);
+    }
+
+    private void addPartitionData(String dest, List<Send> sends, Struct partitionData) {
+        Struct header = partitionData.getStruct(PARTITION_HEADER_KEY_NAME);
+        Records records = partitionData.getRecords(RECORD_SET_KEY_NAME);
+
+        // include the partition header and the size of the record set
+        ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + 4);
+        header.writeTo(buffer);
+        buffer.putInt(records.sizeInBytes());
+        buffer.rewind();
+        sends.add(new ByteBufferSend(dest, buffer));
+
+        // finally the send for the record set itself
+        sends.add(new RecordsSend(dest, records));
+    }
+
+    private static void writeStruct(Struct struct,
+                                    int version,
+                                    LinkedHashMap<TopicPartition, PartitionData> responseData,
+                                    int throttleTime) {
+        List<FetchRequest.TopicAndPartitionData<PartitionData>> topicsData = FetchRequest.TopicAndPartitionData.batchByTopic(responseData);
+        List<Struct> topicArray = new ArrayList<>();
+        for (FetchRequest.TopicAndPartitionData<PartitionData> topicEntry: topicsData) {
+            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, topicEntry.topic);
+            List<Struct> partitionArray = new ArrayList<>();
+            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.partitions.entrySet()) {
+                PartitionData fetchPartitionData = partitionEntry.getValue();
+                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+                Struct partitionDataHeader = partitionData.instance(PARTITION_HEADER_KEY_NAME);
+                partitionDataHeader.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+                partitionDataHeader.set(ERROR_CODE_KEY_NAME, fetchPartitionData.errorCode);
+                partitionDataHeader.set(HIGH_WATERMARK_KEY_NAME, fetchPartitionData.highWatermark);
+                partitionData.set(PARTITION_HEADER_KEY_NAME, partitionDataHeader);
+                partitionData.set(RECORD_SET_KEY_NAME, fetchPartitionData.records);
+                partitionArray.add(partitionData);
+            }
+            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+            topicArray.add(topicData);
+        }
+        struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
+
+        if (version >= 1)
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTime);
+    }
+
+    public static int sizeOf(int version, LinkedHashMap<TopicPartition, PartitionData> responseData) {
+        Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.FETCH.id, version));
+        writeStruct(struct, version, responseData, 0);
+        return 4 + struct.sizeOf();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
index 0b98e55..7fee476 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
@@ -41,7 +41,7 @@ public class GroupCoordinatorRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
         switch (versionId) {
             case 0:
                 return new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode());

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
index 8e7beb4..1f447f7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
@@ -20,7 +20,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
-public class GroupCoordinatorResponse extends AbstractRequestResponse {
+public class GroupCoordinatorResponse extends AbstractResponse {
     
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.GROUP_COORDINATOR.id);
     private static final String ERROR_CODE_KEY_NAME = "error_code";

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
index 02eaa99..3e7401c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
@@ -49,7 +49,7 @@ public class HeartbeatRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
         switch (versionId) {
             case 0:
                 return new HeartbeatResponse(Errors.forException(e).code());

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index 7fe227c..72f0175 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -19,7 +19,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
-public class HeartbeatResponse extends AbstractRequestResponse {
+public class HeartbeatResponse extends AbstractResponse {
     
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.HEARTBEAT.id);
     private static final String ERROR_CODE_KEY_NAME = "error_code";

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 2845ee0..51855b6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -142,7 +142,7 @@ public class JoinGroupRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
         switch (versionId) {
             case 0:
             case 1:

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index 8895ace..e7fc5b1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -23,7 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class JoinGroupResponse extends AbstractRequestResponse {
+public class JoinGroupResponse extends AbstractResponse {
 
     private static final short CURRENT_VERSION = ProtoUtils.latestVersion(ApiKeys.JOIN_GROUP.id);
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.JOIN_GROUP.id);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index 52b9674..79dcd4a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -145,7 +145,7 @@ public class LeaderAndIsrRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
         Map<TopicPartition, Short> responses = new HashMap<>(partitionStates.size());
         for (TopicPartition partition : partitionStates.keySet()) {
             responses.put(partition, Errors.forException(e).code());

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
index df57714..a754def 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
@@ -26,7 +26,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class LeaderAndIsrResponse extends AbstractRequestResponse {
+public class LeaderAndIsrResponse extends AbstractResponse {
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LEADER_AND_ISR.id);
 
     private static final String ERROR_CODE_KEY_NAME = "error_code";

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
index 3047193..6a3f8a6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
@@ -43,7 +43,7 @@ public class LeaveGroupRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
         switch (versionId) {
             case 0:
                 return new LeaveGroupResponse(Errors.forException(e).code());

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
index 6481ca7..9c7998b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
@@ -18,7 +18,7 @@ import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
-public class LeaveGroupResponse extends AbstractRequestResponse {
+public class LeaveGroupResponse extends AbstractResponse {
 
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LEAVE_GROUP.id);
     private static final String ERROR_CODE_KEY_NAME = "error_code";

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
index 3160702..3fd3b81 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
@@ -34,7 +34,7 @@ public class ListGroupsRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
         switch (versionId) {
             case 0:
                 short errorCode = Errors.forException(e).code();

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index 5519670..98573f8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-public class ListGroupsResponse extends AbstractRequestResponse {
+public class ListGroupsResponse extends AbstractResponse {
 
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LIST_GROUPS.id);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index 1aed523..c1db82d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -166,7 +166,7 @@ public class ListOffsetRequest extends AbstractRequest {
 
     @Override
     @SuppressWarnings("deprecation")
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
         Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
 
         if (versionId == 0) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index dbeef05..0b257bc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -29,7 +29,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class ListOffsetResponse extends AbstractRequestResponse {
+public class ListOffsetResponse extends AbstractResponse {
     public static final long UNKNOWN_TIMESTAMP = -1L;
     public static final long UNKNOWN_OFFSET = -1L;
     

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index 24a9bfc..f7d8f8b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -64,7 +64,7 @@ public class MetadataRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
         List<MetadataResponse.TopicMetadata> topicMetadatas = new ArrayList<>();
         Errors error = Errors.forException(e);
         List<MetadataResponse.PartitionMetadata> partitions = Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 444941b..a8baee5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -29,7 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-public class MetadataResponse extends AbstractRequestResponse {
+public class MetadataResponse extends AbstractResponse {
 
     private static final short CURRENT_VERSION = ProtoUtils.latestVersion(ApiKeys.METADATA.id);
     private static final String BROKERS_KEY_NAME = "brokers";

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index df18486..1dfad1e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -154,7 +154,7 @@ public class OffsetCommitRequest extends AbstractRequest {
         for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
             Struct topicData = struct.instance(TOPICS_KEY_NAME);
             topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
+            List<Struct> partitionArray = new ArrayList<>();
             for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
                 PartitionData fetchPartitionData = partitionEntry.getValue();
                 Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
@@ -194,7 +194,7 @@ public class OffsetCommitRequest extends AbstractRequest {
         else
             retentionTime = DEFAULT_RETENTION_TIME;
 
-        offsetData = new HashMap<TopicPartition, PartitionData>();
+        offsetData = new HashMap<>();
         for (Object topicDataObj : struct.getArray(TOPICS_KEY_NAME)) {
             Struct topicData = (Struct) topicDataObj;
             String topic = topicData.getString(TOPIC_KEY_NAME);
@@ -217,8 +217,8 @@ public class OffsetCommitRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
-        Map<TopicPartition, Short> responseData = new HashMap<TopicPartition, Short>();
+    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
+        Map<TopicPartition, Short> responseData = new HashMap<>();
         for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) {
             responseData.put(entry.getKey(), Errors.forException(e).code());
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 1dfda93..abb260e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -25,7 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class OffsetCommitResponse extends AbstractRequestResponse {
+public class OffsetCommitResponse extends AbstractResponse {
     
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_COMMIT.id);
     private static final String RESPONSES_KEY_NAME = "responses";

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 422328e..ede0f27 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -51,11 +51,11 @@ public class OffsetFetchRequest extends AbstractRequest {
         Map<String, List<Integer>> topicsData = CollectionUtils.groupDataByTopic(partitions);
 
         struct.set(GROUP_ID_KEY_NAME, groupId);
-        List<Struct> topicArray = new ArrayList<Struct>();
+        List<Struct> topicArray = new ArrayList<>();
         for (Map.Entry<String, List<Integer>> entries: topicsData.entrySet()) {
             Struct topicData = struct.instance(TOPICS_KEY_NAME);
             topicData.set(TOPIC_KEY_NAME, entries.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
+            List<Struct> partitionArray = new ArrayList<>();
             for (Integer partiitonId : entries.getValue()) {
                 Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
                 partitionData.set(PARTITION_KEY_NAME, partiitonId);
@@ -71,7 +71,7 @@ public class OffsetFetchRequest extends AbstractRequest {
 
     public OffsetFetchRequest(Struct struct) {
         super(struct);
-        partitions = new ArrayList<TopicPartition>();
+        partitions = new ArrayList<>();
         for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
             Struct topicResponse = (Struct) topicResponseObj;
             String topic = topicResponse.getString(TOPIC_KEY_NAME);
@@ -85,8 +85,8 @@ public class OffsetFetchRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
-        Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<TopicPartition, OffsetFetchResponse.PartitionData>();
+    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
+        Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<>();
 
         for (TopicPartition partition: partitions) {
             responseData.put(partition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 1715777..ae4e066 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -26,7 +26,7 @@ import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
 
-public class OffsetFetchResponse extends AbstractRequestResponse {
+public class OffsetFetchResponse extends AbstractResponse {
     
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_FETCH.id);
     private static final String RESPONSES_KEY_NAME = "responses";

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index c7d41e6..25209ce 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -19,6 +19,7 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.utils.CollectionUtils;
 
@@ -45,23 +46,23 @@ public class ProduceRequest extends AbstractRequest {
 
     private final short acks;
     private final int timeout;
-    private final Map<TopicPartition, ByteBuffer> partitionRecords;
+    private final Map<TopicPartition, MemoryRecords> partitionRecords;
 
-    public ProduceRequest(short acks, int timeout, Map<TopicPartition, ByteBuffer> partitionRecords) {
+    public ProduceRequest(short acks, int timeout, Map<TopicPartition, MemoryRecords> partitionRecords) {
         super(new Struct(CURRENT_SCHEMA));
-        Map<String, Map<Integer, ByteBuffer>> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords);
+        Map<String, Map<Integer, MemoryRecords>> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords);
         struct.set(ACKS_KEY_NAME, acks);
         struct.set(TIMEOUT_KEY_NAME, timeout);
-        List<Struct> topicDatas = new ArrayList<Struct>(recordsByTopic.size());
-        for (Map.Entry<String, Map<Integer, ByteBuffer>> entry : recordsByTopic.entrySet()) {
+        List<Struct> topicDatas = new ArrayList<>(recordsByTopic.size());
+        for (Map.Entry<String, Map<Integer, MemoryRecords>> entry : recordsByTopic.entrySet()) {
             Struct topicData = struct.instance(TOPIC_DATA_KEY_NAME);
             topicData.set(TOPIC_KEY_NAME, entry.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
-            for (Map.Entry<Integer, ByteBuffer> partitionEntry : entry.getValue().entrySet()) {
-                ByteBuffer buffer = partitionEntry.getValue().duplicate();
+            List<Struct> partitionArray = new ArrayList<>();
+            for (Map.Entry<Integer, MemoryRecords> partitionEntry : entry.getValue().entrySet()) {
+                MemoryRecords records = partitionEntry.getValue();
                 Struct part = topicData.instance(PARTITION_DATA_KEY_NAME)
                                        .set(PARTITION_KEY_NAME, partitionEntry.getKey())
-                                       .set(RECORD_SET_KEY_NAME, buffer);
+                                       .set(RECORD_SET_KEY_NAME, records);
                 partitionArray.add(part);
             }
             topicData.set(PARTITION_DATA_KEY_NAME, partitionArray.toArray());
@@ -75,14 +76,14 @@ public class ProduceRequest extends AbstractRequest {
 
     public ProduceRequest(Struct struct) {
         super(struct);
-        partitionRecords = new HashMap<TopicPartition, ByteBuffer>();
+        partitionRecords = new HashMap<>();
         for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) {
             Struct topicData = (Struct) topicDataObj;
             String topic = topicData.getString(TOPIC_KEY_NAME);
             for (Object partitionResponseObj : topicData.getArray(PARTITION_DATA_KEY_NAME)) {
                 Struct partitionResponse = (Struct) partitionResponseObj;
                 int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
-                ByteBuffer records = partitionResponse.getBytes(RECORD_SET_KEY_NAME);
+                MemoryRecords records = (MemoryRecords) partitionResponse.getRecords(RECORD_SET_KEY_NAME);
                 partitionRecords.put(new TopicPartition(topic, partition), records);
             }
         }
@@ -91,14 +92,14 @@ public class ProduceRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
         /* In case the producer doesn't actually want any response */
         if (acks == 0)
             return null;
 
-        Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
+        Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>();
 
-        for (Map.Entry<TopicPartition, ByteBuffer> entry : partitionRecords.entrySet()) {
+        for (Map.Entry<TopicPartition, MemoryRecords> entry : partitionRecords.entrySet()) {
             responseMap.put(entry.getKey(), new ProduceResponse.PartitionResponse(Errors.forException(e).code(), ProduceResponse.INVALID_OFFSET, Record.NO_TIMESTAMP));
         }
 
@@ -122,7 +123,7 @@ public class ProduceRequest extends AbstractRequest {
         return timeout;
     }
 
-    public Map<TopicPartition, ByteBuffer> partitionRecords() {
+    public Map<TopicPartition, MemoryRecords> partitionRecords() {
         return partitionRecords;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 58175e1..71e6ab5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -28,7 +28,7 @@ import java.util.Map;
 /**
  * This wrapper supports both v0 and v1 of ProduceResponse.
  */
-public class ProduceResponse extends AbstractRequestResponse {
+public class ProduceResponse extends AbstractResponse {
     
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id);
     private static final String RESPONSES_KEY_NAME = "responses";
@@ -98,7 +98,7 @@ public class ProduceResponse extends AbstractRequestResponse {
      */
     public ProduceResponse(Struct struct) {
         super(struct);
-        responses = new HashMap<TopicPartition, PartitionResponse>();
+        responses = new HashMap<>();
         for (Object topicResponse : struct.getArray(RESPONSES_KEY_NAME)) {
             Struct topicRespStruct = (Struct) topicResponse;
             String topic = topicRespStruct.getString(TOPIC_KEY_NAME);
@@ -117,11 +117,11 @@ public class ProduceResponse extends AbstractRequestResponse {
 
     private void initCommonFields(Map<TopicPartition, PartitionResponse> responses) {
         Map<String, Map<Integer, PartitionResponse>> responseByTopic = CollectionUtils.groupDataByTopic(responses);
-        List<Struct> topicDatas = new ArrayList<Struct>(responseByTopic.size());
+        List<Struct> topicDatas = new ArrayList<>(responseByTopic.size());
         for (Map.Entry<String, Map<Integer, PartitionResponse>> entry : responseByTopic.entrySet()) {
             Struct topicData = struct.instance(RESPONSES_KEY_NAME);
             topicData.set(TOPIC_KEY_NAME, entry.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
+            List<Struct> partitionArray = new ArrayList<>();
             for (Map.Entry<Integer, PartitionResponse> partitionEntry : entry.getValue().entrySet()) {
                 PartitionResponse part = partitionEntry.getValue();
                 Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/RecordsSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RecordsSend.java b/clients/src/main/java/org/apache/kafka/common/requests/RecordsSend.java
new file mode 100644
index 0000000..1c3bb0d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RecordsSend.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.network.TransportLayer;
+import org.apache.kafka.common.record.Records;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
+
+public class RecordsSend implements Send {
+    private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
+
+    private final String destination;
+    private final Records records;
+    private int remaining;
+    private boolean pending = false;
+
+    public RecordsSend(String destination, Records records) {
+        this.destination = destination;
+        this.records = records;
+        this.remaining = records.sizeInBytes();
+    }
+
+    @Override
+    public String destination() {
+        return destination;
+    }
+
+    @Override
+    public boolean completed() {
+        return remaining <= 0 && !pending;
+    }
+
+    @Override
+    public long writeTo(GatheringByteChannel channel) throws IOException {
+        long written = 0;
+        if (remaining > 0) {
+            written = records.writeTo(channel, size() - remaining, remaining);
+            if (written < 0)
+                throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
+        }
+
+        if (channel instanceof TransportLayer) {
+            TransportLayer tl = (TransportLayer) channel;
+            pending = tl.hasPendingWrites();
+
+            if (remaining <= 0 && pending)
+                channel.write(EMPTY_BYTE_BUFFER);
+        }
+
+        remaining -= written;
+        return written;
+    }
+
+    @Override
+    public long size() {
+        return records.sizeInBytes();
+    }
+}