You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/11/15 00:46:31 UTC

[5/5] kafka git commit: KAFKA-2066; Use client-side FetchRequest/FetchResponse on server

KAFKA-2066; Use client-side FetchRequest/FetchResponse on server

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #2069 from hachikuji/KAFKA-2066


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3b4c3479
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3b4c3479
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3b4c3479

Branch: refs/heads/trunk
Commit: 3b4c347949c02b1e2b1dd473deda0f8d2304d027
Parents: 1f1d450
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Nov 14 16:31:04 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Nov 14 16:31:04 2016 -0800

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   2 +
 .../org/apache/kafka/clients/ClientRequest.java |  65 +++---
 .../apache/kafka/clients/ClientResponse.java    |  56 +++--
 .../apache/kafka/clients/InFlightRequests.java  |  46 ++--
 .../org/apache/kafka/clients/KafkaClient.java   |   1 -
 .../kafka/clients/ManualMetadataUpdater.java    |  11 +-
 .../apache/kafka/clients/MetadataUpdater.java   |  10 +-
 .../org/apache/kafka/clients/NetworkClient.java | 164 ++++++++------
 .../consumer/internals/AbstractCoordinator.java |  33 +--
 .../consumer/internals/ConsumerCoordinator.java |  12 -
 .../internals/ConsumerNetworkClient.java        |  30 ++-
 .../clients/consumer/internals/Fetcher.java     |  11 +-
 .../clients/producer/internals/Sender.java      |  68 +++---
 .../kafka/common/network/ByteBufferSend.java    |   5 +-
 .../kafka/common/network/NetworkSend.java       |   4 +-
 .../apache/kafka/common/protocol/ApiKeys.java   |   2 +-
 .../apache/kafka/common/protocol/Protocol.java  |  21 +-
 .../kafka/common/protocol/types/ArrayOf.java    |  13 +-
 .../kafka/common/protocol/types/Schema.java     |  18 +-
 .../kafka/common/protocol/types/Struct.java     |  16 +-
 .../kafka/common/protocol/types/Type.java       |  48 ++++
 .../apache/kafka/common/record/FileRecords.java | 126 +++++++++++
 .../kafka/common/record/LogInputStream.java     |  35 +++
 .../kafka/common/record/MemoryRecords.java      | 186 ++++------------
 .../org/apache/kafka/common/record/Records.java |  15 ++
 .../kafka/common/record/RecordsIterator.java    | 170 ++++++++++++++
 .../kafka/common/requests/AbstractRequest.java  |   8 +-
 .../requests/AbstractRequestResponse.java       |   9 +-
 .../kafka/common/requests/AbstractResponse.java |  86 ++++++++
 .../common/requests/ApiVersionsRequest.java     |   2 +-
 .../common/requests/ApiVersionsResponse.java    |   2 +-
 .../requests/ControlledShutdownRequest.java     |   2 +-
 .../requests/ControlledShutdownResponse.java    |   2 +-
 .../common/requests/CreateTopicsRequest.java    |   2 +-
 .../common/requests/CreateTopicsResponse.java   |   2 +-
 .../common/requests/DeleteTopicsRequest.java    |   2 +-
 .../common/requests/DeleteTopicsResponse.java   |   2 +-
 .../common/requests/DescribeGroupsRequest.java  |   2 +-
 .../common/requests/DescribeGroupsResponse.java |   2 +-
 .../kafka/common/requests/FetchRequest.java     |  17 +-
 .../kafka/common/requests/FetchResponse.java    | 163 ++++++++++----
 .../requests/GroupCoordinatorRequest.java       |   2 +-
 .../requests/GroupCoordinatorResponse.java      |   2 +-
 .../kafka/common/requests/HeartbeatRequest.java |   2 +-
 .../common/requests/HeartbeatResponse.java      |   2 +-
 .../kafka/common/requests/JoinGroupRequest.java |   2 +-
 .../common/requests/JoinGroupResponse.java      |   2 +-
 .../common/requests/LeaderAndIsrRequest.java    |   2 +-
 .../common/requests/LeaderAndIsrResponse.java   |   2 +-
 .../common/requests/LeaveGroupRequest.java      |   2 +-
 .../common/requests/LeaveGroupResponse.java     |   2 +-
 .../common/requests/ListGroupsRequest.java      |   2 +-
 .../common/requests/ListGroupsResponse.java     |   2 +-
 .../common/requests/ListOffsetRequest.java      |   2 +-
 .../common/requests/ListOffsetResponse.java     |   2 +-
 .../kafka/common/requests/MetadataRequest.java  |   2 +-
 .../kafka/common/requests/MetadataResponse.java |   2 +-
 .../common/requests/OffsetCommitRequest.java    |   8 +-
 .../common/requests/OffsetCommitResponse.java   |   2 +-
 .../common/requests/OffsetFetchRequest.java     |  10 +-
 .../common/requests/OffsetFetchResponse.java    |   2 +-
 .../kafka/common/requests/ProduceRequest.java   |  31 +--
 .../kafka/common/requests/ProduceResponse.java  |   8 +-
 .../kafka/common/requests/RecordsSend.java      |  77 +++++++
 .../kafka/common/requests/RequestSend.java      |  55 -----
 .../kafka/common/requests/ResponseSend.java     |  41 ----
 .../common/requests/SaslHandshakeRequest.java   |   2 +-
 .../common/requests/SaslHandshakeResponse.java  |   2 +-
 .../common/requests/StopReplicaRequest.java     |   2 +-
 .../common/requests/StopReplicaResponse.java    |   2 +-
 .../kafka/common/requests/SyncGroupRequest.java |   2 +-
 .../common/requests/SyncGroupResponse.java      |   2 +-
 .../common/requests/UpdateMetadataRequest.java  |   2 +-
 .../common/requests/UpdateMetadataResponse.java |   2 +-
 .../authenticator/SaslClientAuthenticator.java  |  57 +++--
 .../authenticator/SaslServerAuthenticator.java  |  71 +++---
 .../kafka/common/utils/AbstractIterator.java    |   4 +-
 .../org/apache/kafka/clients/MockClient.java    | 101 ++++-----
 .../apache/kafka/clients/NetworkClientTest.java |  31 ++-
 .../clients/consumer/KafkaConsumerTest.java     |  64 +++---
 .../internals/AbstractCoordinatorTest.java      |  64 +++---
 .../internals/ConsumerCoordinatorTest.java      |  85 ++++---
 .../internals/ConsumerNetworkClientTest.java    |  10 +-
 .../clients/consumer/internals/FetcherTest.java |  40 ++--
 .../clients/producer/internals/SenderTest.java  |  40 ++--
 .../common/requests/RequestResponseTest.java    | 174 +++++++++++----
 .../authenticator/SaslAuthenticatorTest.java    |  54 ++---
 .../distributed/WorkerCoordinatorTest.java      |  41 ++--
 .../main/scala/kafka/admin/AdminClient.scala    |  18 +-
 .../src/main/scala/kafka/api/FetchRequest.scala |   3 +-
 .../main/scala/kafka/api/FetchResponse.scala    |   3 +-
 .../kafka/api/GenericRequestAndHeader.scala     |   5 +-
 .../kafka/api/GenericResponseAndHeader.scala    |   5 +-
 .../main/scala/kafka/api/ProducerRequest.scala  |   4 +-
 .../scala/kafka/api/RequestOrResponse.scala     |   2 +-
 .../controller/ControllerChannelManager.scala   |  59 +++--
 .../kafka/controller/KafkaController.scala      |   8 +-
 .../kafka/controller/TopicDeletionManager.scala |   7 +-
 .../main/scala/kafka/log/FileMessageSet.scala   |  35 +--
 .../kafka/message/ByteBufferMessageSet.scala    |  17 +-
 .../main/scala/kafka/message/MessageSet.scala   |  12 +-
 .../scala/kafka/network/RequestChannel.scala    |  45 ++--
 .../main/scala/kafka/server/DelayedFetch.scala  |   9 +-
 .../src/main/scala/kafka/server/KafkaApis.scala | 219 +++++++++----------
 .../main/scala/kafka/server/KafkaServer.scala   |  58 +++--
 .../kafka/server/ReplicaFetcherThread.scala     |  24 +-
 .../scala/kafka/server/ReplicaManager.scala     |  31 +--
 .../kafka/server/ReplicationQuotaManager.scala  |   2 +-
 .../kafka/utils/NetworkClientBlockingOps.scala  |  14 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   |  20 +-
 .../kafka/message/BaseMessageSetTestCases.scala |   4 +-
 .../unit/kafka/network/SocketServerTest.scala   |   7 +-
 .../kafka/server/CreateTopicsRequestTest.scala  |   2 +-
 .../unit/kafka/server/EdgeCaseRequestTest.scala |   7 +-
 .../unit/kafka/server/FetchRequestTest.scala    |  10 +-
 .../unit/kafka/server/LeaderElectionTest.scala  |   4 +-
 .../unit/kafka/server/ProduceRequestTest.scala  |   6 +-
 .../kafka/server/ReplicaManagerQuotasTest.scala |  11 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  |   9 +-
 .../unit/kafka/server/SimpleFetchTest.scala     |  15 +-
 120 files changed, 1870 insertions(+), 1394 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 7716f43..58525ad 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -81,11 +81,13 @@
     <subpackage name="protocol">
       <allow pkg="org.apache.kafka.common.errors" />
       <allow pkg="org.apache.kafka.common.protocol.types" />
+      <allow pkg="org.apache.kafka.common.record" />
     </subpackage>
 
     <subpackage name="record">
       <allow pkg="net.jpountz" />
       <allow pkg="org.apache.kafka.common.record" />
+      <allow pkg="org.apache.kafka.common.network" />
     </subpackage>
 
     <subpackage name="requests">

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
index 117b0bf..de6e506 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
@@ -12,56 +12,50 @@
  */
 package org.apache.kafka.clients;
 
-import org.apache.kafka.common.requests.RequestSend;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.RequestHeader;
 
 /**
  * A request being sent to the server. This holds both the network send as well as the client-level metadata.
  */
 public final class ClientRequest {
 
+    private final String destination;
+    private final RequestHeader header;
+    private final AbstractRequest body;
     private final long createdTimeMs;
     private final boolean expectResponse;
-    private final RequestSend request;
     private final RequestCompletionHandler callback;
-    private final boolean isInitiatedByNetworkClient;
-    private long sendTimeMs;
 
     /**
+     * @param destination The brokerId to send the request to
      * @param createdTimeMs The unix timestamp in milliseconds for the time at which this request was created.
      * @param expectResponse Should we expect a response message or is this request complete once it is sent?
-     * @param request The request
+     * @param header The request's header
+     * @param body The request's body
      * @param callback A callback to execute when the response has been received (or null if no callback is necessary)
      */
-    public ClientRequest(long createdTimeMs, boolean expectResponse, RequestSend request,
+    public ClientRequest(String destination,
+                         long createdTimeMs,
+                         boolean expectResponse,
+                         RequestHeader header,
+                         AbstractRequest body,
                          RequestCompletionHandler callback) {
-        this(createdTimeMs, expectResponse, request, callback, false);
-    }
-
-    /**
-     * @param createdTimeMs The unix timestamp in milliseconds for the time at which this request was created.
-     * @param expectResponse Should we expect a response message or is this request complete once it is sent?
-     * @param request The request
-     * @param callback A callback to execute when the response has been received (or null if no callback is necessary)
-     * @param isInitiatedByNetworkClient Is request initiated by network client, if yes, its
-     *                                   response will be consumed by network client
-     */
-    public ClientRequest(long createdTimeMs, boolean expectResponse, RequestSend request,
-                         RequestCompletionHandler callback, boolean isInitiatedByNetworkClient) {
+        this.destination = destination;
         this.createdTimeMs = createdTimeMs;
         this.callback = callback;
-        this.request = request;
+        this.header = header;
+        this.body = body;
         this.expectResponse = expectResponse;
-        this.isInitiatedByNetworkClient = isInitiatedByNetworkClient;
     }
 
     @Override
     public String toString() {
         return "ClientRequest(expectResponse=" + expectResponse +
             ", callback=" + callback +
-            ", request=" + request +
-            (isInitiatedByNetworkClient ? ", isInitiatedByNetworkClient" : "") +
+            ", header=" + header +
+            ", body=" + body +
             ", createdTimeMs=" + createdTimeMs +
-            ", sendTimeMs=" + sendTimeMs +
             ")";
     }
 
@@ -69,12 +63,16 @@ public final class ClientRequest {
         return expectResponse;
     }
 
-    public RequestSend request() {
-        return request;
+    public RequestHeader header() {
+        return header;
     }
 
-    public boolean hasCallback() {
-        return callback != null;
+    public AbstractRequest body() {
+        return body;
+    }
+
+    public String destination() {
+        return destination;
     }
 
     public RequestCompletionHandler callback() {
@@ -85,15 +83,4 @@ public final class ClientRequest {
         return createdTimeMs;
     }
 
-    public boolean isInitiatedByNetworkClient() {
-        return isInitiatedByNetworkClient;
-    }
-
-    public long sendTimeMs() {
-        return sendTimeMs;
-    }
-
-    public void setSendTimeMs(long sendTimeMs) {
-        this.sendTimeMs = sendTimeMs;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
index 3b6f955..3cd8f1a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
@@ -12,30 +12,45 @@
  */
 package org.apache.kafka.clients;
 
-import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.RequestHeader;
 
 /**
- * A response from the server. Contains both the body of the response as well as the correlated request that was
- * originally sent.
+ * A response from the server. Contains both the body of the response as well as the correlated request
+ * metadata that was originally sent.
  */
 public class ClientResponse {
 
+    private final RequestHeader requestHeader;
+    private final RequestCompletionHandler callback;
+    private final String destination;
     private final long receivedTimeMs;
+    private final long latencyMs;
     private final boolean disconnected;
-    private final ClientRequest request;
-    private final Struct responseBody;
+    private final AbstractResponse responseBody;
 
     /**
-     * @param request The original request
+     * @param requestHeader The header of the corresponding request
+     * @param callback The callback to be invoked
+     * @param createdTimeMs The unix timestamp when the corresponding request was created
+     * @param destination The node the corresponding request was sent to
      * @param receivedTimeMs The unix timestamp when this response was received
      * @param disconnected Whether the client disconnected before fully reading a response
      * @param responseBody The response contents (or null) if we disconnected or no response was expected
      */
-    public ClientResponse(ClientRequest request, long receivedTimeMs, boolean disconnected, Struct responseBody) {
-        super();
+    public ClientResponse(RequestHeader requestHeader,
+                          RequestCompletionHandler callback,
+                          String destination,
+                          long createdTimeMs,
+                          long receivedTimeMs,
+                          boolean disconnected,
+                          AbstractResponse responseBody) {
+        this.requestHeader = requestHeader;
+        this.callback = callback;
+        this.destination = destination;
         this.receivedTimeMs = receivedTimeMs;
+        this.latencyMs = receivedTimeMs - createdTimeMs;
         this.disconnected = disconnected;
-        this.request = request;
         this.responseBody = responseBody;
     }
 
@@ -47,11 +62,15 @@ public class ClientResponse {
         return disconnected;
     }
 
-    public ClientRequest request() {
-        return request;
+    public RequestHeader requestHeader() {
+        return requestHeader;
     }
 
-    public Struct responseBody() {
+    public String destination() {
+        return destination;
+    }
+
+    public AbstractResponse responseBody() {
         return responseBody;
     }
 
@@ -60,16 +79,23 @@ public class ClientResponse {
     }
 
     public long requestLatencyMs() {
-        return receivedTimeMs() - this.request.createdTimeMs();
+        return latencyMs;
+    }
+
+    public void onComplete() {
+        if (callback != null)
+            callback.onComplete(this);
     }
 
     @Override
     public String toString() {
         return "ClientResponse(receivedTimeMs=" + receivedTimeMs +
+               ", latencyMs=" +
+               latencyMs +
                ", disconnected=" +
                disconnected +
-               ", request=" +
-               request +
+               ", requestHeader=" +
+               requestHeader +
                ", responseBody=" +
                responseBody +
                ")";

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
index 91b9dba..f4f753e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
+++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
@@ -26,7 +26,7 @@ import java.util.Map;
 final class InFlightRequests {
 
     private final int maxInFlightRequestsPerConnection;
-    private final Map<String, Deque<ClientRequest>> requests = new HashMap<>();
+    private final Map<String, Deque<NetworkClient.InFlightRequest>> requests = new HashMap<>();
 
     public InFlightRequests(int maxInFlightRequestsPerConnection) {
         this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection;
@@ -35,11 +35,12 @@ final class InFlightRequests {
     /**
      * Add the given request to the queue for the connection it was directed to
      */
-    public void add(ClientRequest request) {
-        Deque<ClientRequest> reqs = this.requests.get(request.request().destination());
+    public void add(NetworkClient.InFlightRequest request) {
+        String destination = request.destination;
+        Deque<NetworkClient.InFlightRequest> reqs = this.requests.get(destination);
         if (reqs == null) {
             reqs = new ArrayDeque<>();
-            this.requests.put(request.request().destination(), reqs);
+            this.requests.put(destination, reqs);
         }
         reqs.addFirst(request);
     }
@@ -47,8 +48,8 @@ final class InFlightRequests {
     /**
      * Get the request queue for the given node
      */
-    private Deque<ClientRequest> requestQueue(String node) {
-        Deque<ClientRequest> reqs = requests.get(node);
+    private Deque<NetworkClient.InFlightRequest> requestQueue(String node) {
+        Deque<NetworkClient.InFlightRequest> reqs = requests.get(node);
         if (reqs == null || reqs.isEmpty())
             throw new IllegalStateException("Response from server for which there are no in-flight requests.");
         return reqs;
@@ -57,7 +58,7 @@ final class InFlightRequests {
     /**
      * Get the oldest request (the one that that will be completed next) for the given node
      */
-    public ClientRequest completeNext(String node) {
+    public NetworkClient.InFlightRequest completeNext(String node) {
         return requestQueue(node).pollLast();
     }
 
@@ -65,7 +66,7 @@ final class InFlightRequests {
      * Get the last request we sent to the given node (but don't remove it from the queue)
      * @param node The node id
      */
-    public ClientRequest lastSent(String node) {
+    public NetworkClient.InFlightRequest lastSent(String node) {
         return requestQueue(node).peekFirst();
     }
 
@@ -74,20 +75,20 @@ final class InFlightRequests {
      * @param node The node the request was sent to
      * @return The request
      */
-    public ClientRequest completeLastSent(String node) {
+    public NetworkClient.InFlightRequest completeLastSent(String node) {
         return requestQueue(node).pollFirst();
     }
 
     /**
      * Can we send more requests to this node?
-     * 
+     *
      * @param node Node in question
      * @return true iff we have no requests still being sent to the given node
      */
     public boolean canSendMore(String node) {
-        Deque<ClientRequest> queue = requests.get(node);
+        Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
         return queue == null || queue.isEmpty() ||
-               (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
+               (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);
     }
 
     /**
@@ -96,7 +97,7 @@ final class InFlightRequests {
      * @return The request count.
      */
     public int inFlightRequestCount(String node) {
-        Deque<ClientRequest> queue = requests.get(node);
+        Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
         return queue == null ? 0 : queue.size();
     }
 
@@ -105,19 +106,19 @@ final class InFlightRequests {
      */
     public int inFlightRequestCount() {
         int total = 0;
-        for (Deque<ClientRequest> deque : this.requests.values())
+        for (Deque<NetworkClient.InFlightRequest> deque : this.requests.values())
             total += deque.size();
         return total;
     }
 
     /**
      * Clear out all the in-flight requests for the given node and return them
-     * 
+     *
      * @param node The node
      * @return All the in-flight requests for that node that have been removed
      */
-    public Iterable<ClientRequest> clearAll(String node) {
-        Deque<ClientRequest> reqs = requests.get(node);
+    public Iterable<NetworkClient.InFlightRequest> clearAll(String node) {
+        Deque<NetworkClient.InFlightRequest> reqs = requests.get(node);
         if (reqs == null) {
             return Collections.emptyList();
         } else {
@@ -126,7 +127,7 @@ final class InFlightRequests {
     }
 
     /**
-     * Returns a list of nodes with pending inflight request, that need to be timed out
+     * Returns a list of nodes with pending in-flight request, that need to be timed out
      *
      * @param now current time in milliseconds
      * @param requestTimeout max time to wait for the request to be completed
@@ -134,13 +135,13 @@ final class InFlightRequests {
      */
     public List<String> getNodesWithTimedOutRequests(long now, int requestTimeout) {
         List<String> nodeIds = new LinkedList<>();
-        for (Map.Entry<String, Deque<ClientRequest>> requestEntry : requests.entrySet()) {
+        for (Map.Entry<String, Deque<NetworkClient.InFlightRequest>> requestEntry : requests.entrySet()) {
             String nodeId = requestEntry.getKey();
-            Deque<ClientRequest> deque = requestEntry.getValue();
+            Deque<NetworkClient.InFlightRequest> deque = requestEntry.getValue();
 
             if (!deque.isEmpty()) {
-                ClientRequest request = deque.peekLast();
-                long timeSinceSend = now - request.sendTimeMs();
+                NetworkClient.InFlightRequest request = deque.peekLast();
+                long timeSinceSend = now - request.sendTimeMs;
                 if (timeSinceSend > requestTimeout)
                     nodeIds.add(nodeId);
             }
@@ -148,4 +149,5 @@ final class InFlightRequests {
 
         return nodeIds;
     }
+    
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 8c6e39a..f171d13 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -66,7 +66,6 @@ public interface KafkaClient extends Closeable {
 
     /**
      * Queue up the given request for sending. Requests can only be sent on ready connections.
-     * 
      * @param request The request
      * @param now The current timestamp
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
index efbe664..1c9fa79 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
@@ -14,7 +14,8 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.RequestHeader;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -60,13 +61,13 @@ public class ManualMetadataUpdater implements MetadataUpdater {
     }
 
     @Override
-    public boolean maybeHandleDisconnection(ClientRequest request) {
-        return false;
+    public void handleDisconnection(String destination) {
+        // Do nothing
     }
 
     @Override
-    public boolean maybeHandleCompletedReceive(ClientRequest request, long now, Struct body) {
-        return false;
+    public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, AbstractResponse body) {
+        // Do nothing
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
index 4669a68..21c50bd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
@@ -14,7 +14,8 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.RequestHeader;
 
 import java.util.List;
 
@@ -41,7 +42,7 @@ interface MetadataUpdater {
      * be 0 if an update has been started as a result of this call).
      *
      * If the implementation relies on `NetworkClient` to send requests, the completed receive will be passed to
-     * `maybeHandleCompletedReceive`.
+     * `handleCompletedMetadataResponse`.
      *
      * The semantics of `needed` and `possible` are implementation-dependent and may take into account a number of
      * factors like node availability, how long since the last metadata update, etc.
@@ -53,8 +54,9 @@ interface MetadataUpdater {
      *
      * This provides a mechanism for the `MetadataUpdater` implementation to use the NetworkClient instance for its own
      * requests with special handling for disconnections of such requests.
+     * @param destination
      */
-    boolean maybeHandleDisconnection(ClientRequest request);
+    void handleDisconnection(String destination);
 
     /**
      * If `request` is a metadata request, handles it and returns `true`. Otherwise, returns `false`.
@@ -62,7 +64,7 @@ interface MetadataUpdater {
      * This provides a mechanism for the `MetadataUpdater` implementation to use the NetworkClient instance for its own
      * requests with special handling for completed receives of such requests.
      */
-    boolean maybeHandleCompletedReceive(ClientRequest request, long now, Struct body);
+    void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, AbstractResponse body);
 
     /**
      * Schedules an update of the current cluster metadata info. A subsequent call to `maybeUpdate` would trigger the

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 29c6d6f..124810d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -21,10 +21,10 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.RequestHeader;
-import org.apache.kafka.common.requests.RequestSend;
 import org.apache.kafka.common.requests.ResponseHeader;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -173,8 +173,9 @@ public class NetworkClient implements KafkaClient {
     @Override
     public void close(String nodeId) {
         selector.close(nodeId);
-        for (ClientRequest request : inFlightRequests.clearAll(nodeId))
-            metadataUpdater.maybeHandleDisconnection(request);
+        for (InFlightRequest request : inFlightRequests.clearAll(nodeId))
+            if (request.isInternalMetadataRequest)
+                metadataUpdater.handleDisconnection(request.destination);
         connectionStates.remove(nodeId);
     }
 
@@ -230,22 +231,38 @@ public class NetworkClient implements KafkaClient {
 
     /**
      * Queue up the given request for sending. Requests can only be sent out to ready nodes.
-     *
      * @param request The request
      * @param now The current timestamp
      */
     @Override
     public void send(ClientRequest request, long now) {
-        String nodeId = request.request().destination();
+        doSend(request, false, now);
+    }
+
+    private void sendInternalMetadataRequest(MetadataRequest metadataRequest, String nodeConnectionId, long now) {
+        ClientRequest clientRequest = new ClientRequest(nodeConnectionId, now, true,
+                nextRequestHeader(ApiKeys.METADATA), metadataRequest, null);
+        doSend(clientRequest, true, now);
+    }
+
+    private void doSend(ClientRequest request, boolean isInternalMetadataRequest, long now) {
+        String nodeId = request.destination();
         if (!canSendRequest(nodeId))
             throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
-        doSend(request, now);
-    }
 
-    private void doSend(ClientRequest request, long now) {
-        request.setSendTimeMs(now);
-        this.inFlightRequests.add(request);
-        selector.send(request.request());
+        Send send = request.body().toSend(nodeId, request.header());
+        InFlightRequest inFlightRequest = new InFlightRequest(
+                request.header(),
+                request.createdTimeMs(),
+                request.destination(),
+                request.callback(),
+                request.expectResponse(),
+                isInternalMetadataRequest,
+                send,
+                now);
+
+        this.inFlightRequests.add(inFlightRequest);
+        selector.send(inFlightRequest.send);
     }
 
     /**
@@ -277,12 +294,10 @@ public class NetworkClient implements KafkaClient {
 
         // invoke callbacks
         for (ClientResponse response : responses) {
-            if (response.request().hasCallback()) {
-                try {
-                    response.request().callback().onComplete(response);
-                } catch (Exception e) {
-                    log.error("Uncaught error in request completion:", e);
-                }
+            try {
+                response.onComplete();
+            } catch (Exception e) {
+                log.error("Uncaught error in request completion:", e);
             }
         }
 
@@ -376,14 +391,14 @@ public class NetworkClient implements KafkaClient {
         return found;
     }
 
-    public static Struct parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) {
+    public static AbstractResponse parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) {
         ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer);
         // Always expect the response version id to be the same as the request version id
         short apiKey = requestHeader.apiKey();
         short apiVer = requestHeader.apiVersion();
         Struct responseBody = ProtoUtils.responseSchema(apiKey, apiVer).read(responseBuffer);
         correlate(requestHeader, responseHeader);
-        return responseBody;
+        return AbstractResponse.getResponse(apiKey, responseBody);
     }
 
     /**
@@ -395,10 +410,12 @@ public class NetworkClient implements KafkaClient {
      */
     private void processDisconnection(List<ClientResponse> responses, String nodeId, long now) {
         connectionStates.disconnected(nodeId, now);
-        for (ClientRequest request : this.inFlightRequests.clearAll(nodeId)) {
+        for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) {
             log.trace("Cancelled request {} due to node {} being disconnected", request, nodeId);
-            if (!metadataUpdater.maybeHandleDisconnection(request))
-                responses.add(new ClientResponse(request, now, true, null));
+            if (request.isInternalMetadataRequest)
+                metadataUpdater.handleDisconnection(request.destination);
+            else
+                responses.add(request.disconnected(now));
         }
     }
 
@@ -432,10 +449,10 @@ public class NetworkClient implements KafkaClient {
     private void handleCompletedSends(List<ClientResponse> responses, long now) {
         // if no response is expected then when the send is completed, return it
         for (Send send : this.selector.completedSends()) {
-            ClientRequest request = this.inFlightRequests.lastSent(send.destination());
-            if (!request.expectResponse()) {
+            InFlightRequest request = this.inFlightRequests.lastSent(send.destination());
+            if (!request.expectResponse) {
                 this.inFlightRequests.completeLastSent(send.destination());
-                responses.add(new ClientResponse(request, now, false, null));
+                responses.add(request.completed(null, now));
             }
         }
     }
@@ -449,10 +466,12 @@ public class NetworkClient implements KafkaClient {
     private void handleCompletedReceives(List<ClientResponse> responses, long now) {
         for (NetworkReceive receive : this.selector.completedReceives()) {
             String source = receive.source();
-            ClientRequest req = inFlightRequests.completeNext(source);
-            Struct body = parseResponse(receive.payload(), req.request().header());
-            if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
-                responses.add(new ClientResponse(req, now, false, body));
+            InFlightRequest req = inFlightRequests.completeNext(source);
+            AbstractResponse body = parseResponse(receive.payload(), req.header);
+            if (req.isInternalMetadataRequest)
+                metadataUpdater.handleCompletedMetadataResponse(req.header, now, body);
+            else
+                responses.add(req.completed(body, now));
         }
     }
 
@@ -558,33 +577,23 @@ public class NetworkClient implements KafkaClient {
         }
 
         @Override
-        public boolean maybeHandleDisconnection(ClientRequest request) {
-            ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey());
-
-            if (requestKey == ApiKeys.METADATA && request.isInitiatedByNetworkClient()) {
-                Cluster cluster = metadata.fetch();
-                if (cluster.isBootstrapConfigured()) {
-                    int nodeId = Integer.parseInt(request.request().destination());
-                    Node node = cluster.nodeById(nodeId);
-                    if (node != null)
-                        log.warn("Bootstrap broker {}:{} disconnected", node.host(), node.port());
-                }
-
-                metadataFetchInProgress = false;
-                return true;
+        public void handleDisconnection(String destination) {
+            Cluster cluster = metadata.fetch();
+            if (cluster.isBootstrapConfigured()) {
+                int nodeId = Integer.parseInt(destination);
+                Node node = cluster.nodeById(nodeId);
+                if (node != null)
+                    log.warn("Bootstrap broker {}:{} disconnected", node.host(), node.port());
             }
 
-            return false;
+            metadataFetchInProgress = false;
         }
 
         @Override
-        public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {
-            short apiKey = req.request().header().apiKey();
-            if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
-                handleResponse(req.request().header(), body, now);
-                return true;
-            }
-            return false;
+        public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, AbstractResponse response) {
+            if (!(response instanceof MetadataResponse))
+                throw new IllegalStateException("Unexpected response type in metadata handler: " + response);
+            handleMetadataResponse(requestHeader, (MetadataResponse) response, now);
         }
 
         @Override
@@ -592,9 +601,8 @@ public class NetworkClient implements KafkaClient {
             this.metadata.requestUpdate();
         }
 
-        private void handleResponse(RequestHeader header, Struct body, long now) {
+        private void handleMetadataResponse(RequestHeader header, MetadataResponse response, long now) {
             this.metadataFetchInProgress = false;
-            MetadataResponse response = new MetadataResponse(body);
             Cluster cluster = response.cluster();
             // check if any topics metadata failed to get updated
             Map<String, Errors> errors = response.errors();
@@ -612,14 +620,6 @@ public class NetworkClient implements KafkaClient {
         }
 
         /**
-         * Create a metadata request for the given topics
-         */
-        private ClientRequest request(long now, String node, MetadataRequest metadata) {
-            RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct());
-            return new ClientRequest(now, true, send, null, true);
-        }
-
-        /**
          * Return true if there's at least one connection establishment is currently underway
          */
         private boolean isAnyNodeConnecting() {
@@ -644,9 +644,9 @@ public class NetworkClient implements KafkaClient {
                     metadataRequest = MetadataRequest.allTopics();
                 else
                     metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));
-                ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
+
                 log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
-                doSend(clientRequest, now);
+                sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
                 return requestTimeoutMs;
             }
 
@@ -674,4 +674,42 @@ public class NetworkClient implements KafkaClient {
 
     }
 
+    static class InFlightRequest {
+        final RequestHeader header;
+        final String destination;
+        final RequestCompletionHandler callback;
+        final boolean expectResponse;
+        final boolean isInternalMetadataRequest; // used to flag metadata fetches which are triggered internally by NetworkClient
+        final Send send;
+        final long sendTimeMs;
+        final long createdTimeMs;
+
+        public InFlightRequest(RequestHeader header,
+                               long createdTimeMs,
+                               String destination,
+                               RequestCompletionHandler callback,
+                               boolean expectResponse,
+                               boolean isInternalMetadataRequest,
+                               Send send,
+                               long sendTimeMs) {
+            this.header = header;
+            this.destination = destination;
+            this.callback = callback;
+            this.expectResponse = expectResponse;
+            this.isInternalMetadataRequest = isInternalMetadataRequest;
+            this.send = send;
+            this.sendTimeMs = sendTimeMs;
+            this.createdTimeMs = createdTimeMs;
+        }
+
+        public ClientResponse completed(AbstractResponse response, long timeMs) {
+            return new ClientResponse(header, callback, destination, createdTimeMs, timeMs, false, response);
+        }
+
+        public ClientResponse disconnected(long timeMs) {
+            return new ClientResponse(header, callback, destination, createdTimeMs, timeMs, true, null);
+        }
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 59319ef..c205273 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -393,17 +393,11 @@ public abstract class AbstractCoordinator implements Closeable {
     }
 
     private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
-
-        @Override
-        public JoinGroupResponse parse(ClientResponse response) {
-            return new JoinGroupResponse(response.responseBody());
-        }
-
         @Override
         public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
             Errors error = Errors.forCode(joinResponse.errorCode());
             if (error == Errors.NONE) {
-                log.debug("Received successful join group response for group {}: {}", groupId, joinResponse.toStruct());
+                log.debug("Received successful join group response for group {}: {}", groupId, joinResponse);
                 sensors.joinLatency.record(response.requestLatencyMs());
 
                 synchronized (AbstractCoordinator.this) {
@@ -483,12 +477,6 @@ public abstract class AbstractCoordinator implements Closeable {
     }
 
     private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
-
-        @Override
-        public SyncGroupResponse parse(ClientResponse response) {
-            return new SyncGroupResponse(response.responseBody());
-        }
-
         @Override
         public void handle(SyncGroupResponse syncResponse,
                            RequestFuture<ByteBuffer> future) {
@@ -540,7 +528,7 @@ public abstract class AbstractCoordinator implements Closeable {
         public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
             log.debug("Received group coordinator response {}", resp);
 
-            GroupCoordinatorResponse groupCoordinatorResponse = new GroupCoordinatorResponse(resp.responseBody());
+            GroupCoordinatorResponse groupCoordinatorResponse = (GroupCoordinatorResponse) resp.responseBody();
             // use MAX_VALUE - node.id as the coordinator id to mimic separate connections
             // for the coordinator in the underlying network client layer
             // TODO: this needs to be better handled in KAFKA-1935
@@ -653,12 +641,6 @@ public abstract class AbstractCoordinator implements Closeable {
     }
 
     private class LeaveGroupResponseHandler extends CoordinatorResponseHandler<LeaveGroupResponse, Void> {
-
-        @Override
-        public LeaveGroupResponse parse(ClientResponse response) {
-            return new LeaveGroupResponse(response.responseBody());
-        }
-
         @Override
         public void handle(LeaveGroupResponse leaveResponse, RequestFuture<Void> future) {
             Errors error = Errors.forCode(leaveResponse.errorCode());
@@ -680,12 +662,6 @@ public abstract class AbstractCoordinator implements Closeable {
     }
 
     private class HeartbeatResponseHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
-
-        @Override
-        public HeartbeatResponse parse(ClientResponse response) {
-            return new HeartbeatResponse(response.responseBody());
-        }
-
         @Override
         public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
             sensors.heartbeatLatency.record(response.requestLatencyMs());
@@ -722,8 +698,6 @@ public abstract class AbstractCoordinator implements Closeable {
     protected abstract class CoordinatorResponseHandler<R, T> extends RequestFutureAdapter<ClientResponse, T> {
         protected ClientResponse response;
 
-        public abstract R parse(ClientResponse response);
-
         public abstract void handle(R response, RequestFuture<T> future);
 
         @Override
@@ -735,10 +709,11 @@ public abstract class AbstractCoordinator implements Closeable {
         }
 
         @Override
+        @SuppressWarnings("unchecked")
         public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) {
             try {
                 this.response = clientResponse;
-                R responseObj = parse(clientResponse);
+                R responseObj = (R) clientResponse.responseBody();
                 handle(responseObj, future);
             } catch (RuntimeException e) {
                 if (!future.isDone())

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 2621c09..4889872 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -12,7 +12,6 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -624,11 +623,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         }
 
         @Override
-        public OffsetCommitResponse parse(ClientResponse response) {
-            return new OffsetCommitResponse(response.responseBody());
-        }
-
-        @Override
         public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
             sensors.commitLatency.record(response.requestLatencyMs());
             Set<String> unauthorizedTopics = new HashSet<>();
@@ -718,12 +712,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     }
 
     private class OffsetFetchResponseHandler extends CoordinatorResponseHandler<OffsetFetchResponse, Map<TopicPartition, OffsetAndMetadata>> {
-
-        @Override
-        public OffsetFetchResponse parse(ClientResponse response) {
-            return new OffsetFetchResponse(response.responseBody());
-        }
-
         @Override
         public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
             Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(response.responseData().size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 2495b23..d9baa56 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.RequestHeader;
-import org.apache.kafka.common.requests.RequestSend;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -98,14 +97,14 @@ public class ConsumerNetworkClient implements Closeable {
     }
 
     private RequestFuture<ClientResponse> send(Node node,
-                                              ApiKeys api,
-                                              short version,
-                                              AbstractRequest request) {
+                                               ApiKeys api,
+                                               short version,
+                                               AbstractRequest request) {
         long now = time.milliseconds();
         RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
         RequestHeader header = client.nextRequestHeader(api, version);
-        RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
-        put(node, new ClientRequest(now, true, send, completionHandler));
+        ClientRequest clientRequest = new ClientRequest(node.idString(), now, true, header, request, completionHandler);
+        put(node, clientRequest);
 
         // wakeup the client in case it is blocking in poll so that we can send the queued request
         client.wakeup();
@@ -336,9 +335,9 @@ public class ConsumerNetworkClient implements Closeable {
                 // coordinator failures traversing the unsent list again.
                 iterator.remove();
                 for (ClientRequest request : requestEntry.getValue()) {
-                    RequestFutureCompletionHandler handler =
-                            (RequestFutureCompletionHandler) request.callback();
-                    handler.onComplete(new ClientResponse(request, now, true, null));
+                    RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
+                    handler.onComplete(new ClientResponse(request.header(), request.callback(), request.destination(),
+                            request.createdTimeMs(), now, true, null));
                 }
             }
         }
@@ -369,8 +368,8 @@ public class ConsumerNetworkClient implements Closeable {
         synchronized (this) {
             List<ClientRequest> unsentRequests = unsent.remove(node);
             if (unsentRequests != null) {
-                for (ClientRequest request : unsentRequests) {
-                    RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
+                for (ClientRequest unsentRequest : unsentRequests) {
+                    RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) unsentRequest.callback();
                     handler.onFailure(e);
                 }
             }
@@ -468,12 +467,11 @@ public class ConsumerNetworkClient implements Closeable {
             if (e != null) {
                 future.raise(e);
             } else if (response.wasDisconnected()) {
-                ClientRequest request = response.request();
-                RequestSend send = request.request();
-                ApiKeys api = ApiKeys.forId(send.header().apiKey());
-                int correlation = send.header().correlationId();
+                RequestHeader requestHeader = response.requestHeader();
+                ApiKeys api = ApiKeys.forId(requestHeader.apiKey());
+                int correlation = requestHeader.correlationId();
                 log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected",
-                        api, request, correlation, send.destination());
+                        api, requestHeader, correlation, response.destination());
                 future.raise(DisconnectException.INSTANCE);
             } else {
                 future.complete(response);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index bfc1a0b..703ea29 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -41,7 +41,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.InvalidRecordException;
 import org.apache.kafka.common.record.LogEntry;
-import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.FetchRequest;
@@ -155,7 +154,7 @@ public class Fetcher<K, V> {
                     .addListener(new RequestFutureListener<ClientResponse>() {
                         @Override
                         public void onSuccess(ClientResponse resp) {
-                            FetchResponse response = new FetchResponse(resp.responseBody());
+                            FetchResponse response = (FetchResponse) resp.responseBody();
                             if (!matchesRequestedPartitions(request, response)) {
                                 // obviously we expect the broker to always send us valid responses, so this check
                                 // is mainly for test cases where mock fetch responses must be manually crafted.
@@ -256,7 +255,7 @@ public class Fetcher<K, V> {
                 throw future.exception();
 
             if (future.succeeded()) {
-                MetadataResponse response = new MetadataResponse(future.value().responseBody());
+                MetadataResponse response = (MetadataResponse) future.value().responseBody();
                 Cluster cluster = response.cluster();
 
                 Set<String> unauthorizedTopics = cluster.unauthorizedTopics();
@@ -549,7 +548,7 @@ public class Fetcher<K, V> {
                 .compose(new RequestFutureAdapter<ClientResponse, Map<TopicPartition, OffsetAndTimestamp>>() {
                     @Override
                     public void onSuccess(ClientResponse response, RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> future) {
-                        ListOffsetResponse lor = new ListOffsetResponse(response.responseBody());
+                        ListOffsetResponse lor = (ListOffsetResponse) response.responseBody();
                         log.trace("Received ListOffsetResponse {} from broker {}", lor, node);
                         handleListOffsetResponse(timestampsToSearch, lor, future);
                     }
@@ -673,10 +672,8 @@ public class Fetcher<K, V> {
                     return null;
                 }
 
-                ByteBuffer buffer = partition.recordSet;
-                MemoryRecords records = MemoryRecords.readableRecords(buffer);
                 List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
-                for (LogEntry logEntry : records) {
+                for (LogEntry logEntry : partition.records) {
                     // Skip the messages earlier than current position.
                     if (logEntry.offset() >= position) {
                         parsed.add(parseRecord(tp, logEntry));

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index c71bb67..7555b71 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -12,20 +12,13 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InvalidMetadataException;
@@ -34,7 +27,6 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -42,15 +34,22 @@ import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ProduceResponse;
-import org.apache.kafka.common.requests.RequestSend;
+import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
 /**
  * The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata
  * requests to renew its view of the cluster and then sends produce requests to the appropriate nodes.
@@ -210,19 +209,17 @@ public class Sender implements Runnable {
             this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
 
         sensors.updateProduceRequestMetrics(batches);
-        List<ClientRequest> requests = createProduceRequests(batches, now);
+
         // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
         // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
         // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
         // with sendable data that aren't ready to send since they would cause busy looping.
         long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
-        if (result.readyNodes.size() > 0) {
+        if (!result.readyNodes.isEmpty()) {
             log.trace("Nodes with data ready to send: {}", result.readyNodes);
-            log.trace("Created {} produce requests: {}", requests.size(), requests);
             pollTimeout = 0;
         }
-        for (ClientRequest request : requests)
-            client.send(request, now);
+        sendProduceRequests(batches, now);
 
         // if some partitions are already ready to be sent, the select time would be 0;
         // otherwise if some partition already has some data accumulated but not ready yet,
@@ -254,20 +251,16 @@ public class Sender implements Runnable {
      * Handle a produce response
      */
     private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) {
-        int correlationId = response.request().request().header().correlationId();
+        int correlationId = response.requestHeader().correlationId();
         if (response.wasDisconnected()) {
-            log.trace("Cancelled request {} due to node {} being disconnected", response, response.request()
-                                                                                                  .request()
-                                                                                                  .destination());
+            log.trace("Cancelled request {} due to node {} being disconnected", response, response.destination());
             for (RecordBatch batch : batches.values())
                 completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, Record.NO_TIMESTAMP, correlationId, now);
         } else {
-            log.trace("Received produce response from node {} with correlation id {}",
-                      response.request().request().destination(),
-                      correlationId);
+            log.trace("Received produce response from node {} with correlation id {}", response.destination(), correlationId);
             // if we have a response, parse it
             if (response.hasResponse()) {
-                ProduceResponse produceResponse = new ProduceResponse(response.responseBody());
+                ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
                 for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
                     TopicPartition tp = entry.getKey();
                     ProduceResponse.PartitionResponse partResp = entry.getValue();
@@ -275,7 +268,7 @@ public class Sender implements Runnable {
                     RecordBatch batch = batches.get(tp);
                     completeBatch(batch, error, partResp.baseOffset, partResp.timestamp, correlationId, now);
                 }
-                this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs());
+                this.sensors.recordLatency(response.destination(), response.requestLatencyMs());
                 this.sensors.recordThrottleTime(produceResponse.getThrottleTime());
             } else {
                 // this is the acks = 0 case, just complete all requests
@@ -339,35 +332,36 @@ public class Sender implements Runnable {
     /**
      * Transfer the record batches into a list of produce requests on a per-node basis
      */
-    private List<ClientRequest> createProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) {
-        List<ClientRequest> requests = new ArrayList<ClientRequest>(collated.size());
+    private void sendProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) {
         for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
-            requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()));
-        return requests;
+            sendProduceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue());
     }
 
     /**
      * Create a produce request from the given record batches
      */
-    private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
-        Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size());
-        final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());
+    private void sendProduceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
+        Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
+        final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<>(batches.size());
         for (RecordBatch batch : batches) {
             TopicPartition tp = batch.topicPartition;
-            produceRecordsByPartition.put(tp, batch.records.buffer());
+            produceRecordsByPartition.put(tp, batch.records);
             recordsByPartition.put(tp, batch);
         }
-        ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);
-        RequestSend send = new RequestSend(Integer.toString(destination),
-                                           this.client.nextRequestHeader(ApiKeys.PRODUCE),
-                                           request.toStruct());
+
+        ProduceRequest produceRequest = new ProduceRequest(acks, timeout, produceRecordsByPartition);
+        RequestHeader header = this.client.nextRequestHeader(ApiKeys.PRODUCE);
         RequestCompletionHandler callback = new RequestCompletionHandler() {
             public void onComplete(ClientResponse response) {
                 handleProduceResponse(response, recordsByPartition, time.milliseconds());
             }
         };
 
-        return new ClientRequest(now, acks != 0, send, callback);
+        String nodeId = Integer.toString(destination);
+        ClientRequest clientRequest = new ClientRequest(nodeId, now, acks != 0, header, produceRequest, callback);
+
+        client.send(clientRequest, now);
+        log.trace("Sent produce request to {}: {}", nodeId, produceRequest);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
index 9e213ec..3683283 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
@@ -29,11 +29,10 @@ public class ByteBufferSend implements Send {
     private boolean pending = false;
 
     public ByteBufferSend(String destination, ByteBuffer... buffers) {
-        super();
         this.destination = destination;
         this.buffers = buffers;
-        for (int i = 0; i < buffers.length; i++)
-            remaining += buffers[i].remaining();
+        for (ByteBuffer buffer : buffers)
+            remaining += buffer.remaining();
         this.size = remaining;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
index 49964b0..5e4bf2c 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
@@ -29,8 +29,8 @@ public class NetworkSend extends ByteBufferSend {
 
     private static ByteBuffer[] sizeDelimit(ByteBuffer[] buffers) {
         int size = 0;
-        for (int i = 0; i < buffers.length; i++)
-            size += buffers[i].remaining();
+        for (ByteBuffer buffer : buffers)
+            size += buffer.remaining();
         ByteBuffer[] delimited = new ByteBuffer[buffers.length + 1];
         delimited[0] = ByteBuffer.allocate(4);
         delimited[0].putInt(size);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 6178b80..e07c3c3 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -63,7 +63,7 @@ public enum ApiKeys {
     /** an english description of the api--this is for debugging and can change */
     public final String name;
 
-    private ApiKeys(int id, String name) {
+    ApiKeys(int id, String name) {
         this.id = (short) id;
         this.name = name;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 9e21f3b..cd4e6e3 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -31,6 +31,7 @@ import static org.apache.kafka.common.protocol.types.Type.BYTES;
 import static org.apache.kafka.common.protocol.types.Type.INT16;
 import static org.apache.kafka.common.protocol.types.Type.INT32;
 import static org.apache.kafka.common.protocol.types.Type.INT64;
+import static org.apache.kafka.common.protocol.types.Type.RECORDS;
 import static org.apache.kafka.common.protocol.types.Type.STRING;
 import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
 
@@ -135,7 +136,7 @@ public class Protocol {
 
     public static final Schema TOPIC_PRODUCE_DATA_V0 = new Schema(new Field("topic", STRING),
                                                                   new Field("data", new ArrayOf(new Schema(new Field("partition", INT32),
-                                                                                                     new Field("record_set", BYTES)))));
+                                                                                                     new Field("record_set", RECORDS)))));
 
     public static final Schema PRODUCE_REQUEST_V0 = new Schema(new Field("acks",
                                                                    INT16,
@@ -500,14 +501,15 @@ public class Protocol {
                                                                        new ArrayOf(FETCH_REQUEST_TOPIC_V0),
                                                                        "Topics to fetch in the order provided."));
 
-    public static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
-                                                                                  INT32,
-                                                                                  "Topic partition id."),
-                                                                        new Field("error_code", INT16),
-                                                                        new Field("high_watermark",
-                                                                                  INT64,
-                                                                                  "Last committed offset."),
-                                                                        new Field("record_set", BYTES));
+    public static final Schema FETCH_RESPONSE_PARTITION_HEADER_V0 = new Schema(new Field("partition",
+                                                                                         INT32,
+                                                                                         "Topic partition id."),
+                                                                               new Field("error_code", INT16),
+                                                                               new Field("high_watermark",
+                                                                                         INT64,
+                                                                                         "Last committed offset."));
+    public static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition_header", FETCH_RESPONSE_PARTITION_HEADER_V0),
+                                                                        new Field("record_set", RECORDS));
 
     public static final Schema FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
                                                                     new Field("partition_responses",
@@ -515,6 +517,7 @@ public class Protocol {
 
     public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses",
                                                                         new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
+
     public static final Schema FETCH_RESPONSE_V1 = new Schema(new Field("throttle_time_ms",
                                                                         INT32,
                                                                         "Duration in milliseconds for which the request was throttled" +

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
index 207f108..1ba8e44 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
@@ -54,8 +54,9 @@ public class ArrayOf extends Type {
         Object[] objs = (Object[]) o;
         int size = objs.length;
         buffer.putInt(size);
-        for (int i = 0; i < size; i++)
-            type.write(buffer, objs[i]);
+
+        for (Object obj : objs)
+            type.write(buffer, obj);
     }
 
     @Override
@@ -81,8 +82,8 @@ public class ArrayOf extends Type {
             return size;
 
         Object[] objs = (Object[]) o;
-        for (int i = 0; i < objs.length; i++)
-            size += type.sizeOf(objs[i]);
+        for (Object obj : objs)
+            size += type.sizeOf(obj);
         return size;
     }
 
@@ -102,8 +103,8 @@ public class ArrayOf extends Type {
                 return null;
 
             Object[] array = (Object[]) item;
-            for (int i = 0; i < array.length; i++)
-                type.validate(array[i]);
+            for (Object obj : array)
+                type.validate(obj);
             return array;
         } catch (ClassCastException e) {
             throw new SchemaException("Not an Object[].");

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
index e8dce31..efbfd14 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
@@ -31,7 +31,7 @@ public class Schema extends Type {
      */
     public Schema(Field... fs) {
         this.fields = new Field[fs.length];
-        this.fieldsByName = new HashMap<String, Field>();
+        this.fieldsByName = new HashMap<>();
         for (int i = 0; i < this.fields.length; i++) {
             Field field = fs[i];
             if (fieldsByName.containsKey(field.name))
@@ -47,13 +47,12 @@ public class Schema extends Type {
     @Override
     public void write(ByteBuffer buffer, Object o) {
         Struct r = (Struct) o;
-        for (int i = 0; i < fields.length; i++) {
-            Field f = fields[i];
+        for (Field field : fields) {
             try {
-                Object value = f.type().validate(r.get(f));
-                f.type.write(buffer, value);
+                Object value = field.type().validate(r.get(field));
+                field.type.write(buffer, value);
             } catch (Exception e) {
-                throw new SchemaException("Error writing field '" + f.name +
+                throw new SchemaException("Error writing field '" + field.name +
                                           "': " +
                                           (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
             }
@@ -85,8 +84,8 @@ public class Schema extends Type {
     public int sizeOf(Object o) {
         int size = 0;
         Struct r = (Struct) o;
-        for (int i = 0; i < fields.length; i++)
-            size += fields[i].type.sizeOf(r.get(fields[i]));
+        for (Field field : fields)
+            size += field.type.sizeOf(r.get(field));
         return size;
     }
 
@@ -146,8 +145,7 @@ public class Schema extends Type {
     public Struct validate(Object item) {
         try {
             Struct struct = (Struct) item;
-            for (int i = 0; i < this.fields.length; i++) {
-                Field field = this.fields[i];
+            for (Field field : fields) {
                 try {
                     field.type.validate(struct.get(field));
                 } catch (SchemaException e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 212d701..0165ce6 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -12,6 +12,8 @@
  */
 package org.apache.kafka.common.protocol.types;
 
+import org.apache.kafka.common.record.Records;
+
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 
@@ -108,6 +110,10 @@ public class Struct {
         return (Byte) get(name);
     }
 
+    public Records getRecords(String name) {
+        return (Records) get(name);
+    }
+
     public Short getShort(Field field) {
         return (Short) get(field);
     }
@@ -272,16 +278,6 @@ public class Struct {
         this.schema.validate(this);
     }
 
-    /**
-     * Create a byte buffer containing the serialized form of the values in this struct. This method can choose to break
-     * the struct into multiple ByteBuffers if need be.
-     */
-    public ByteBuffer[] toBytes() {
-        ByteBuffer buffer = ByteBuffer.allocate(sizeOf());
-        writeTo(buffer);
-        return new ByteBuffer[] {buffer};
-    }
-
     @Override
     public String toString() {
         StringBuilder b = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
index 43b4a37..e2f7baf 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
@@ -18,6 +18,9 @@ package org.apache.kafka.common.protocol.types;
 
 import java.nio.ByteBuffer;
 
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.utils.Utils;
 
 /**
@@ -424,5 +427,50 @@ public abstract class Type {
         }
     };
 
+    public static final Type RECORDS = new Type() {
+        @Override
+        public boolean isNullable() {
+            return true;
+        }
+
+        @Override
+        public void write(ByteBuffer buffer, Object o) {
+            if (o instanceof FileRecords)
+                throw new IllegalArgumentException("FileRecords must be written to the channel directly");
+            MemoryRecords records = (MemoryRecords) o;
+            NULLABLE_BYTES.write(buffer, records.buffer().duplicate());
+        }
+
+        @Override
+        public Object read(ByteBuffer buffer) {
+            ByteBuffer recordsBuffer = (ByteBuffer) NULLABLE_BYTES.read(buffer);
+            return MemoryRecords.readableRecords(recordsBuffer);
+        }
+
+        @Override
+        public int sizeOf(Object o) {
+            if (o == null)
+                return 4;
+
+            Records records = (Records) o;
+            return 4 + records.sizeInBytes();
+        }
+
+        @Override
+        public String toString() {
+            return "RECORDS";
+        }
+
+        @Override
+        public Records validate(Object item) {
+            if (item == null)
+                return null;
+
+            if (item instanceof Records)
+                return (Records) item;
+
+            throw new SchemaException(item + " is not an instance of " + Records.class.getName());
+        }
+    };
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
new file mode 100644
index 0000000..bdae08d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.network.TransportLayer;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.GatheringByteChannel;
+
+/**
+ * File-backed record set.
+ */
+public class FileRecords implements Records {
+    private final File file;
+    private final FileChannel channel;
+    private final long start;
+    private final long end;
+    private final long size;
+
+    public FileRecords(File file,
+                       FileChannel channel,
+                       int start,
+                       int end,
+                       boolean isSlice) throws IOException {
+        this.file = file;
+        this.channel = channel;
+        this.start = start;
+        this.end = end;
+
+        if (isSlice)
+            this.size = end - start;
+        else
+            this.size = Math.min(channel.size(), end) - start;
+    }
+
+    @Override
+    public int sizeInBytes() {
+        return (int) size;
+    }
+
+    @Override
+    public long writeTo(GatheringByteChannel destChannel, long offset, int length) throws IOException {
+        long newSize = Math.min(channel.size(), end) - start;
+        if (newSize < size)
+            throw new KafkaException(String.format("Size of FileRecords %s has been truncated during write: old size %d, new size %d", file.getAbsolutePath(), size, newSize));
+
+        if (offset > size)
+            throw new KafkaException(String.format("The requested offset %d is out of range. The size of this FileRecords is %d.", offset, size));
+
+        long position = start + offset;
+        long count = Math.min(length, this.size - offset);
+        if (destChannel instanceof TransportLayer) {
+            TransportLayer tl = (TransportLayer) destChannel;
+            return tl.transferFrom(this.channel, position, count);
+        } else {
+            return this.channel.transferTo(position, count, destChannel);
+        }
+    }
+
+    @Override
+    public RecordsIterator iterator() {
+        return new RecordsIterator(new FileLogInputStream(channel, start, end), false);
+    }
+
+    private static class FileLogInputStream implements LogInputStream {
+        private long position;
+        protected final long end;
+        protected final FileChannel channel;
+        private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(Records.LOG_OVERHEAD);
+
+        public FileLogInputStream(FileChannel channel, long start, long end) {
+            this.channel = channel;
+            this.position = start;
+            this.end = end;
+        }
+
+        @Override
+        public LogEntry nextEntry() throws IOException {
+            if (position + Records.LOG_OVERHEAD >= end)
+                return null;
+
+            logHeaderBuffer.rewind();
+            channel.read(logHeaderBuffer, position);
+            if (logHeaderBuffer.hasRemaining())
+                return null;
+
+            logHeaderBuffer.rewind();
+            long offset = logHeaderBuffer.getLong();
+            int size = logHeaderBuffer.getInt();
+            if (size < 0)
+                throw new IllegalStateException("Record with size " + size);
+
+            if (position + Records.LOG_OVERHEAD + size > end)
+                return null;
+
+            ByteBuffer recordBuffer = ByteBuffer.allocate(size);
+            channel.read(recordBuffer, position + Records.LOG_OVERHEAD);
+            if (recordBuffer.hasRemaining())
+                return null;
+            recordBuffer.rewind();
+
+            Record record = new Record(recordBuffer);
+            LogEntry logEntry = new LogEntry(offset, record);
+            position += logEntry.size();
+            return logEntry;
+        }
+    }
+}