You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/11/15 00:46:29 UTC

[3/5] kafka git commit: KAFKA-2066; Use client-side FetchRequest/FetchResponse on server

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java
deleted file mode 100644
index 02cac80..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import java.nio.ByteBuffer;
-
-import org.apache.kafka.common.network.NetworkSend;
-import org.apache.kafka.common.protocol.types.Struct;
-
-/**
- * A send object for a kafka request
- */
-public class RequestSend extends NetworkSend {
-
-    private final RequestHeader header;
-    private final Struct body;
-
-    public RequestSend(String destination, RequestHeader header, Struct body) {
-        super(destination, serialize(header, body));
-        this.header = header;
-        this.body = body;
-    }
-
-    public static ByteBuffer serialize(RequestHeader header, Struct body) {
-        ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());
-        header.writeTo(buffer);
-        body.writeTo(buffer);
-        buffer.rewind();
-        return buffer;
-    }
-
-    public RequestHeader header() {
-        return this.header;
-    }
-
-    public Struct body() {
-        return body;
-    }
-
-    @Override
-    public String toString() {
-        return "RequestSend(header=" + header.toString() + ", body=" + body.toString() + ")";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java b/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java
deleted file mode 100644
index 9494de7..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.network.NetworkSend;
-import org.apache.kafka.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class ResponseSend extends NetworkSend {
-
-    public ResponseSend(String destination, ResponseHeader header, Struct body) {
-        super(destination, serialize(header, body));
-    }
-
-    public ResponseSend(String destination, ResponseHeader header, AbstractRequestResponse response) {
-        this(destination, header, response.toStruct());
-    }
-
-    public static ByteBuffer serialize(ResponseHeader header, Struct body) {
-        ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());
-        header.writeTo(buffer);
-        body.writeTo(buffer);
-        buffer.rewind();
-        return buffer;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
index bddc9f0..97adcb3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
@@ -61,7 +61,7 @@ public class SaslHandshakeRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
         switch (versionId) {
             case 0:
                 List<String> enabledMechanisms = Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
index c0fc495..6d7f734 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
@@ -33,7 +33,7 @@ import org.apache.kafka.common.protocol.types.Struct;
  * Response from SASL server which indicates if the client-chosen mechanism is enabled in the server.
  * For error responses, the list of enabled mechanisms is included in the response.
  */
-public class SaslHandshakeResponse extends AbstractRequestResponse {
+public class SaslHandshakeResponse extends AbstractResponse {
 
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.SASL_HANDSHAKE.id);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
index bc63521..e3e2507 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
@@ -84,7 +84,7 @@ public class StopReplicaRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
         Map<TopicPartition, Short> responses = new HashMap<>(partitions.size());
         for (TopicPartition partition : partitions) {
             responses.put(partition, Errors.forException(e).code());

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
index 3c4db68..92d9e58 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
@@ -26,7 +26,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class StopReplicaResponse extends AbstractRequestResponse {
+public class StopReplicaResponse extends AbstractResponse {
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.STOP_REPLICA.id);
 
     private static final String ERROR_CODE_KEY_NAME = "error_code";

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
index 606584b..efb484c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
@@ -83,7 +83,7 @@ public class SyncGroupRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
         switch (versionId) {
             case 0:
                 return new SyncGroupResponse(

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index 3584f14..f459656 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
-public class SyncGroupResponse extends AbstractRequestResponse {
+public class SyncGroupResponse extends AbstractResponse {
 
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.SYNC_GROUP.id);
     public static final String ERROR_CODE_KEY_NAME = "error_code";

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index 1c21789..2e5ffb2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -242,7 +242,7 @@ public class UpdateMetadataRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+    public AbstractResponse getErrorResponse(int versionId, Throwable e) {
         if (versionId <= 2)
             return new UpdateMetadataResponse(Errors.forException(e).code());
         else

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
index 0ff35d9..e1c3634 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
@@ -20,7 +20,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
-public class UpdateMetadataResponse extends AbstractRequestResponse {
+public class UpdateMetadataResponse extends AbstractResponse {
 
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.UPDATE_METADATA_KEY.id);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index ba201dc..b9c3b33 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -18,45 +18,44 @@
 
 package org.apache.kafka.common.security.authenticator;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.util.Arrays;
-import java.util.Map;
-import java.security.Principal;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
-
-import javax.security.auth.Subject;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
 import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
 import org.apache.kafka.common.network.Authenticator;
 import org.apache.kafka.common.network.Mode;
-import org.apache.kafka.common.network.NetworkSend;
 import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.network.NetworkSend;
+import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.SchemaException;
-import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.RequestHeader;
-import org.apache.kafka.common.requests.RequestSend;
 import org.apache.kafka.common.requests.SaslHandshakeRequest;
 import org.apache.kafka.common.requests.SaslHandshakeResponse;
 import org.apache.kafka.common.security.auth.AuthCallbackHandler;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.PrincipalBuilder;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.NetworkClient;
-import org.apache.kafka.common.KafkaException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.security.auth.Subject;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.Map;
+
 public class SaslClientAuthenticator implements Authenticator {
 
     public enum SaslState {
@@ -81,7 +80,7 @@ public class SaslClientAuthenticator implements Authenticator {
 
     // buffers used in `authenticate`
     private NetworkReceive netInBuffer;
-    private NetworkSend netOutBuffer;
+    private Send netOutBuffer;
 
     // Current SASL state
     private SaslState saslState;
@@ -156,7 +155,7 @@ public class SaslClientAuthenticator implements Authenticator {
                 String clientId = (String) configs.get(CommonClientConfigs.CLIENT_ID_CONFIG);
                 currentRequestHeader = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, clientId, correlationId++);
                 SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest(mechanism);
-                send(RequestSend.serialize(currentRequestHeader, handshakeRequest.toStruct()));
+                send(handshakeRequest.toSend(node, currentRequestHeader));
                 setSaslState(SaslState.RECEIVE_HANDSHAKE_RESPONSE);
                 break;
             case RECEIVE_HANDSHAKE_RESPONSE:
@@ -209,13 +208,13 @@ public class SaslClientAuthenticator implements Authenticator {
         if (!saslClient.isComplete()) {
             byte[] saslToken = createSaslToken(serverToken, isInitial);
             if (saslToken != null)
-                send(ByteBuffer.wrap(saslToken));
+                send(new NetworkSend(node, ByteBuffer.wrap(saslToken)));
         }
     }
 
-    private void send(ByteBuffer buffer) throws IOException {
+    private void send(Send send) throws IOException {
         try {
-            netOutBuffer = new NetworkSend(node, buffer);
+            netOutBuffer = send;
             flushNetOutBufferAndUpdateInterestOps();
         } catch (IOException e) {
             setSaslState(SaslState.FAILED);
@@ -281,7 +280,7 @@ public class SaslClientAuthenticator implements Authenticator {
             // TODO: introspect about e: look for GSS information.
             final String unknownServerErrorText =
                 "(Mechanism level: Server not found in Kerberos database (7) - UNKNOWN_SERVER)";
-            if (e.toString().indexOf(unknownServerErrorText) > -1) {
+            if (e.toString().contains(unknownServerErrorText)) {
                 error += " This may be caused by Java's being unable to resolve the Kafka Broker's" +
                     " hostname correctly. You may want to try to adding" +
                     " '-Dsun.net.spi.nameservice.provider.1=dns,sun' to your client's JVMFLAGS environment." +
@@ -302,10 +301,10 @@ public class SaslClientAuthenticator implements Authenticator {
     }
 
     private void handleKafkaResponse(RequestHeader requestHeader, byte[] responseBytes) {
-        Struct struct;
+        AbstractResponse response;
         ApiKeys apiKey;
         try {
-            struct = NetworkClient.parseResponse(ByteBuffer.wrap(responseBytes), requestHeader);
+            response = NetworkClient.parseResponse(ByteBuffer.wrap(responseBytes), requestHeader);
             apiKey = ApiKeys.forId(requestHeader.apiKey());
         } catch (SchemaException | IllegalArgumentException e) {
             LOG.debug("Invalid SASL mechanism response, server may be expecting only GSSAPI tokens");
@@ -313,7 +312,7 @@ public class SaslClientAuthenticator implements Authenticator {
         }
         switch (apiKey) {
             case SASL_HANDSHAKE:
-                handleSaslHandshakeResponse(new SaslHandshakeResponse(struct));
+                handleSaslHandshakeResponse((SaslHandshakeResponse) response);
                 break;
             default:
                 throw new IllegalStateException("Unexpected API key during handshake: " + apiKey);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 206fe39..b193bf2 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -17,36 +17,7 @@
 
 package org.apache.kafka.common.security.authenticator;
 
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.security.Principal;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
-
-import javax.security.auth.login.Configuration;
-import javax.security.auth.Subject;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslServer;
-import javax.security.sasl.SaslException;
-
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.security.kerberos.KerberosName;
-import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
-import org.ietf.jgss.GSSContext;
-import org.ietf.jgss.GSSCredential;
-import org.ietf.jgss.GSSException;
-import org.ietf.jgss.GSSManager;
-import org.ietf.jgss.GSSName;
-import org.ietf.jgss.Oid;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.kafka.common.security.auth.AuthCallbackHandler;
-import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
@@ -54,22 +25,49 @@ import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.network.Authenticator;
 import org.apache.kafka.common.network.Mode;
-import org.apache.kafka.common.network.NetworkSend;
 import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.network.NetworkSend;
+import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Protocol;
 import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.requests.AbstractRequest;
-import org.apache.kafka.common.requests.AbstractRequestResponse;
+import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.RequestHeader;
-import org.apache.kafka.common.requests.ResponseHeader;
-import org.apache.kafka.common.requests.ResponseSend;
 import org.apache.kafka.common.requests.SaslHandshakeRequest;
 import org.apache.kafka.common.requests.SaslHandshakeResponse;
+import org.apache.kafka.common.security.auth.AuthCallbackHandler;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.PrincipalBuilder;
+import org.apache.kafka.common.security.kerberos.KerberosName;
+import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
+import org.ietf.jgss.GSSContext;
+import org.ietf.jgss.GSSCredential;
+import org.ietf.jgss.GSSException;
+import org.ietf.jgss.GSSManager;
+import org.ietf.jgss.GSSName;
+import org.ietf.jgss.Oid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import javax.security.auth.login.Configuration;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 public class SaslServerAuthenticator implements Authenticator {
 
@@ -100,7 +98,7 @@ public class SaslServerAuthenticator implements Authenticator {
 
     // buffers used in `authenticate`
     private NetworkReceive netInBuffer;
-    private NetworkSend netOutBuffer;
+    private Send netOutBuffer;
 
     public SaslServerAuthenticator(String node, final Subject subject, KerberosShortNamer kerberosNameParser, String host, int maxReceiveSize) throws IOException {
         if (subject == null)
@@ -364,9 +362,8 @@ public class SaslServerAuthenticator implements Authenticator {
         sendKafkaResponse(requestHeader, ApiVersionsResponse.apiVersionsResponse());
     }
 
-    private void sendKafkaResponse(RequestHeader requestHeader, AbstractRequestResponse response) throws IOException {
-        ResponseHeader responseHeader = new ResponseHeader(requestHeader.correlationId());
-        netOutBuffer = new NetworkSend(node, ResponseSend.serialize(responseHeader, response.toStruct()));
+    private void sendKafkaResponse(RequestHeader requestHeader, AbstractResponse response) throws IOException {
+        netOutBuffer = response.toSend(node, requestHeader);
         flushNetOutBufferAndUpdateInterestOps();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/utils/AbstractIterator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/AbstractIterator.java b/clients/src/main/java/org/apache/kafka/common/utils/AbstractIterator.java
index b798e73..d6771a3 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/AbstractIterator.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/AbstractIterator.java
@@ -25,9 +25,9 @@ import java.util.NoSuchElementException;
  */
 public abstract class AbstractIterator<T> implements Iterator<T> {
 
-    private static enum State {
+    private enum State {
         READY, NOT_READY, DONE, FAILED
-    };
+    }
 
     private State state = State.NOT_READY;
     private T next;

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 0af4d34..9b445c7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -16,6 +16,14 @@
  */
 package org.apache.kafka.clients;
 
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.Time;
+
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -26,31 +34,24 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.requests.RequestHeader;
-import org.apache.kafka.common.utils.Time;
-
 /**
  * A mock network client for use testing code
  */
 public class MockClient implements KafkaClient {
     public static final RequestMatcher ALWAYS_TRUE = new RequestMatcher() {
         @Override
-        public boolean matches(ClientRequest request) {
+        public boolean matches(AbstractRequest body) {
             return true;
         }
     };
 
     private class FutureResponse {
-        public final Struct responseBody;
+        public final AbstractResponse responseBody;
         public final boolean disconnected;
         public final RequestMatcher requestMatcher;
         public Node node;
 
-        public FutureResponse(Struct responseBody, boolean disconnected, RequestMatcher requestMatcher, Node node) {
+        public FutureResponse(AbstractResponse responseBody, boolean disconnected, RequestMatcher requestMatcher, Node node) {
             this.responseBody = responseBody;
             this.disconnected = disconnected;
             this.requestMatcher = requestMatcher;
@@ -125,8 +126,9 @@ public class MockClient implements KafkaClient {
         Iterator<ClientRequest> iter = requests.iterator();
         while (iter.hasNext()) {
             ClientRequest request = iter.next();
-            if (request.request().destination().equals(node)) {
-                responses.add(new ClientResponse(request, now, true, null));
+            if (request.destination().equals(node)) {
+                responses.add(new ClientResponse(request.header(), request.callback(), request.destination(),
+                        request.createdTimeMs(), now, true, null));
                 iter.remove();
             }
         }
@@ -138,19 +140,19 @@ public class MockClient implements KafkaClient {
         Iterator<FutureResponse> iterator = futureResponses.iterator();
         while (iterator.hasNext()) {
             FutureResponse futureResp = iterator.next();
-            if (futureResp.node != null && !request.request().destination().equals(futureResp.node.idString()))
+            if (futureResp.node != null && !request.destination().equals(futureResp.node.idString()))
                 continue;
 
-            if (!futureResp.requestMatcher.matches(request))
+            if (!futureResp.requestMatcher.matches(request.body()))
                 throw new IllegalStateException("Next in line response did not match expected request");
 
-            ClientResponse resp = new ClientResponse(request, time.milliseconds(), futureResp.disconnected, futureResp.responseBody);
+            ClientResponse resp = new ClientResponse(request.header(), request.callback(), request.destination(),
+                    request.createdTimeMs(), time.milliseconds(), futureResp.disconnected, futureResp.responseBody);
             responses.add(resp);
             iterator.remove();
             return;
         }
 
-        request.setSendTimeMs(now);
         this.requests.add(request);
     }
 
@@ -168,8 +170,7 @@ public class MockClient implements KafkaClient {
 
         while (!this.responses.isEmpty()) {
             ClientResponse response = this.responses.poll();
-            if (response.request().hasCallback())
-                response.request().callback().onComplete(response);
+            response.onComplete();
         }
 
         return copy;
@@ -179,75 +180,77 @@ public class MockClient implements KafkaClient {
         return this.requests;
     }
 
-    public void respond(Struct body) {
-        respond(body, false);
+    public void respond(AbstractResponse response) {
+        respond(response, false);
     }
 
-    public void respond(Struct body, boolean disconnected) {
+    public void respond(AbstractResponse response, boolean disconnected) {
         ClientRequest request = requests.remove();
-        responses.add(new ClientResponse(request, time.milliseconds(), disconnected, body));
+        responses.add(new ClientResponse(request.header(), request.callback(), request.destination(),
+                request.createdTimeMs(), time.milliseconds(), disconnected, response));
     }
 
-    public void respondFrom(Struct body, Node node) {
-        respondFrom(body, node, false);
+    public void respondFrom(AbstractResponse response, Node node) {
+        respondFrom(response, node, false);
     }
 
-    public void respondFrom(Struct body, Node node, boolean disconnected) {
+    public void respondFrom(AbstractResponse response, Node node, boolean disconnected) {
         Iterator<ClientRequest> iterator = requests.iterator();
         while (iterator.hasNext()) {
             ClientRequest request = iterator.next();
-            if (request.request().destination().equals(node.idString())) {
+            if (request.destination().equals(node.idString())) {
                 iterator.remove();
-                responses.add(new ClientResponse(request, time.milliseconds(), disconnected, body));
+                responses.add(new ClientResponse(request.header(), request.callback(), request.destination(),
+                        request.createdTimeMs(), time.milliseconds(), disconnected, response));
                 return;
             }
         }
         throw new IllegalArgumentException("No requests available to node " + node);
     }
 
-    public void prepareResponse(Struct body) {
-        prepareResponse(ALWAYS_TRUE, body, false);
+    public void prepareResponse(AbstractResponse response) {
+        prepareResponse(ALWAYS_TRUE, response, false);
     }
 
-    public void prepareResponseFrom(Struct body, Node node) {
-        prepareResponseFrom(ALWAYS_TRUE, body, node, false);
+    public void prepareResponseFrom(AbstractResponse response, Node node) {
+        prepareResponseFrom(ALWAYS_TRUE, response, node, false);
     }
 
     /**
      * Prepare a response for a request matching the provided matcher. If the matcher does not
-     * match, {@link #send(ClientRequest, long)} will throw IllegalStateException
+     * match, {@link KafkaClient#send(ClientRequest, long)} will throw IllegalStateException
      * @param matcher The matcher to apply
-     * @param body The response body
+     * @param response The response body
      */
-    public void prepareResponse(RequestMatcher matcher, Struct body) {
-        prepareResponse(matcher, body, false);
+    public void prepareResponse(RequestMatcher matcher, AbstractResponse response) {
+        prepareResponse(matcher, response, false);
     }
 
-    public void prepareResponseFrom(RequestMatcher matcher, Struct body, Node node) {
-        prepareResponseFrom(matcher, body, node, false);
+    public void prepareResponseFrom(RequestMatcher matcher, AbstractResponse response, Node node) {
+        prepareResponseFrom(matcher, response, node, false);
     }
 
-    public void prepareResponse(Struct body, boolean disconnected) {
-        prepareResponse(ALWAYS_TRUE, body, disconnected);
+    public void prepareResponse(AbstractResponse response, boolean disconnected) {
+        prepareResponse(ALWAYS_TRUE, response, disconnected);
     }
 
-    public void prepareResponseFrom(Struct body, Node node, boolean disconnected) {
-        prepareResponseFrom(ALWAYS_TRUE, body, node, disconnected);
+    public void prepareResponseFrom(AbstractResponse response, Node node, boolean disconnected) {
+        prepareResponseFrom(ALWAYS_TRUE, response, node, disconnected);
     }
 
     /**
      * Prepare a response for a request matching the provided matcher. If the matcher does not
-     * match, {@link #send(ClientRequest, long)} will throw IllegalStateException
+     * match, {@link KafkaClient#send(ClientRequest, long)} will throw IllegalStateException
      * @param matcher The matcher to apply
-     * @param body The response body
+     * @param response The response body
      * @param disconnected Whether the request was disconnected
      */
-    public void prepareResponse(RequestMatcher matcher, Struct body, boolean disconnected) {
-        prepareResponseFrom(matcher, body, null, disconnected);
+    public void prepareResponse(RequestMatcher matcher, AbstractResponse response, boolean disconnected) {
+        prepareResponseFrom(matcher, response, null, disconnected);
     }
 
-    public void prepareResponseFrom(RequestMatcher matcher, Struct body, Node node, boolean disconnected) {
-        futureResponses.add(new FutureResponse(body, disconnected, matcher, node));
+    public void prepareResponseFrom(RequestMatcher matcher, AbstractResponse response, Node node, boolean disconnected) {
+        futureResponses.add(new FutureResponse(response, disconnected, matcher, node));
     }
 
     public void reset() {
@@ -307,12 +310,12 @@ public class MockClient implements KafkaClient {
 
     /**
      * The RequestMatcher provides a way to match a particular request to a response prepared
-     * through {@link #prepareResponse(RequestMatcher, Struct)}. Basically this allows testers
+     * through {@link #prepareResponse(RequestMatcher, AbstractResponse)}. Basically this allows testers
      * to inspect the request body for the type of the request or for specific fields that should be set,
      * and to fail the test if it doesn't match.
      */
     public interface RequestMatcher {
-        boolean matches(ClientRequest request);
+        boolean matches(AbstractRequest body);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index d305e8e..0966ee5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -23,10 +23,10 @@ import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.RequestHeader;
-import org.apache.kafka.common.requests.RequestSend;
 import org.apache.kafka.common.requests.ResponseHeader;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.MockSelector;
@@ -66,10 +66,9 @@ public class NetworkClientTest {
 
     @Test(expected = IllegalStateException.class)
     public void testSendToUnreadyNode() {
-        RequestSend send = new RequestSend("5",
-                                           client.nextRequestHeader(ApiKeys.METADATA),
-                                           new MetadataRequest(Arrays.asList("test")).toStruct());
-        ClientRequest request = new ClientRequest(time.milliseconds(), false, send, null);
+        MetadataRequest metadataRequest = new MetadataRequest(Arrays.asList("test"));
+        RequestHeader header = client.nextRequestHeader(ApiKeys.METADATA);
+        ClientRequest request = new ClientRequest("5", time.milliseconds(), false, header, metadataRequest, null);
         client.send(request, time.milliseconds());
         client.poll(1, time.milliseconds());
     }
@@ -91,10 +90,9 @@ public class NetworkClientTest {
         client.poll(1, time.milliseconds());
         assertTrue("The client should be ready", client.isReady(node, time.milliseconds()));
 
-        ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, ByteBuffer>emptyMap());
+        ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, MemoryRecords>emptyMap());
         RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE);
-        RequestSend send = new RequestSend(node.idString(), reqHeader, produceRequest.toStruct());
-        ClientRequest request = new ClientRequest(time.milliseconds(), true, send, null);
+        ClientRequest request = new ClientRequest(node.idString(), time.milliseconds(), true, reqHeader, produceRequest, null);
         client.send(request, time.milliseconds());
         assertEquals("There should be 1 in-flight request after send", 1, client.inFlightRequestCount(node.idString()));
 
@@ -104,11 +102,10 @@ public class NetworkClientTest {
     }
 
     private void checkSimpleRequestResponse(NetworkClient networkClient) {
-        ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, ByteBuffer>emptyMap());
+        ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, MemoryRecords>emptyMap());
         RequestHeader reqHeader = networkClient.nextRequestHeader(ApiKeys.PRODUCE);
-        RequestSend send = new RequestSend(node.idString(), reqHeader, produceRequest.toStruct());
         TestCallbackHandler handler = new TestCallbackHandler();
-        ClientRequest request = new ClientRequest(time.milliseconds(), true, send, handler);
+        ClientRequest request = new ClientRequest(node.idString(), time.milliseconds(), true, reqHeader, produceRequest, handler);
         awaitReady(networkClient, node);
         networkClient.send(request, time.milliseconds());
         networkClient.poll(1, time.milliseconds());
@@ -126,7 +123,7 @@ public class NetworkClientTest {
         assertEquals(1, responses.size());
         assertTrue("The handler should have executed.", handler.executed);
         assertTrue("Should have a response body.", handler.response.hasResponse());
-        assertEquals("Should be correlated to the original request", request, handler.response.request());
+        assertEquals("Should be correlated to the original request", request.header(), handler.response.requestHeader());
     }
 
     private void awaitReady(NetworkClient client, Node node) {
@@ -136,11 +133,10 @@ public class NetworkClientTest {
 
     @Test
     public void testRequestTimeout() {
-        ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, ByteBuffer>emptyMap());
+        ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, MemoryRecords>emptyMap());
         RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE);
-        RequestSend send = new RequestSend(node.idString(), reqHeader, produceRequest.toStruct());
         TestCallbackHandler handler = new TestCallbackHandler();
-        ClientRequest request = new ClientRequest(time.milliseconds(), true, send, handler);
+        ClientRequest request = new ClientRequest(node.idString(), time.milliseconds(), true, reqHeader, produceRequest, handler);
         awaitReady(client, node);
         long now = time.milliseconds();
         client.send(request, now);
@@ -211,9 +207,8 @@ public class NetworkClientTest {
         awaitReady(client, node);
         RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.METADATA);
 
-        Struct req = new MetadataRequest(Collections.<String>emptyList()).toStruct();
-        RequestSend send = new RequestSend(node.idString(), reqHeader, req);
-        ClientRequest request = new ClientRequest(time.milliseconds(), true, send, null);
+        MetadataRequest metadataRequest = new MetadataRequest(Collections.<String>emptyList());
+        ClientRequest request = new ClientRequest(node.idString(), time.milliseconds(), true, reqHeader, metadataRequest, null);
         client.send(request, time.milliseconds());
         client.poll(requestTimeoutMs, time.milliseconds());
         assertEquals(1, client.inFlightRequestCount(node.idString()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index bf45ee6..fa30a4b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -37,9 +37,9 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.FetchResponse.PartitionData;
 import org.apache.kafka.common.requests.GroupCoordinatorResponse;
@@ -470,10 +470,10 @@ public class KafkaConsumerTest {
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
                 rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
-        consumer.assign(Arrays.asList(tp0));
+        consumer.assign(singletonList(tp0));
 
         // lookup coordinator
-        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
+        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
         // fetch offset for one topic
@@ -892,7 +892,7 @@ public class KafkaConsumerTest {
 
         // the auto commit is disabled, so no offset commit request should be sent
         for (ClientRequest req: client.requests())
-            assertTrue(req.request().header().apiKey() != ApiKeys.OFFSET_COMMIT.id);
+            assertTrue(req.header().apiKey() != ApiKeys.OFFSET_COMMIT.id);
 
         // subscription change
         consumer.unsubscribe();
@@ -903,7 +903,7 @@ public class KafkaConsumerTest {
 
         // the auto commit is disabled, so no offset commit request should be sent
         for (ClientRequest req: client.requests())
-            assertTrue(req.request().header().apiKey() != ApiKeys.OFFSET_COMMIT.id);
+            assertTrue(req.header().apiKey() != ApiKeys.OFFSET_COMMIT.id);
 
         consumer.close();
     }
@@ -933,7 +933,7 @@ public class KafkaConsumerTest {
                 rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
 
         // lookup coordinator
-        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
+        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
         // manual assignment
@@ -997,7 +997,7 @@ public class KafkaConsumerTest {
                 rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, autoCommitIntervalMs);
 
         // lookup coordinator
-        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
+        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
         // manual assignment
@@ -1029,8 +1029,8 @@ public class KafkaConsumerTest {
         assertTrue(consumer.assignment().equals(Collections.singleton(t2p0)));
 
         // the auto commit is disabled, so no offset commit request should be sent
-        for (ClientRequest req: client.requests())
-            assertTrue(req.request().header().apiKey() != ApiKeys.OFFSET_COMMIT.id);
+        for (ClientRequest req : client.requests())
+            assertTrue(req.header().apiKey() != ApiKeys.OFFSET_COMMIT.id);
 
         consumer.close();
     }
@@ -1086,15 +1086,15 @@ public class KafkaConsumerTest {
     private Node prepareRebalance(MockClient client, Node node, final Set<String> subscribedTopics, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
         if (coordinator == null) {
             // lookup coordinator
-            client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
+            client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node), node);
             coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
         }
 
         // join group
         client.prepareResponseFrom(new MockClient.RequestMatcher() {
             @Override
-            public boolean matches(ClientRequest request) {
-                JoinGroupRequest joinGroupRequest = new JoinGroupRequest(request.request().body());
+            public boolean matches(AbstractRequest body) {
+                JoinGroupRequest joinGroupRequest = (JoinGroupRequest) body;
                 PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(joinGroupRequest.groupProtocols().get(0).metadata());
                 return subscribedTopics.equals(new HashSet<>(subscription.topics()));
             }
@@ -1109,7 +1109,7 @@ public class KafkaConsumerTest {
     private Node prepareRebalance(MockClient client, Node node, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
         if (coordinator == null) {
             // lookup coordinator
-            client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
+            client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node), node);
             coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
         }
 
@@ -1126,11 +1126,11 @@ public class KafkaConsumerTest {
         final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
         client.prepareResponseFrom(new MockClient.RequestMatcher() {
             @Override
-            public boolean matches(ClientRequest request) {
+            public boolean matches(AbstractRequest body) {
                 heartbeatReceived.set(true);
                 return true;
             }
-        }, new HeartbeatResponse(Errors.NONE.code()).toStruct(), coordinator);
+        }, new HeartbeatResponse(Errors.NONE.code()), coordinator);
         return heartbeatReceived;
     }
 
@@ -1142,8 +1142,8 @@ public class KafkaConsumerTest {
 
         client.prepareResponseFrom(new MockClient.RequestMatcher() {
             @Override
-            public boolean matches(ClientRequest request) {
-                OffsetCommitRequest commitRequest = new OffsetCommitRequest(request.request().body());
+            public boolean matches(AbstractRequest body) {
+                OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
                 for (Map.Entry<TopicPartition, Long> partitionOffset : partitionOffsets.entrySet()) {
                     OffsetCommitRequest.PartitionData partitionData = commitRequest.offsetData().get(partitionOffset.getKey());
                     // verify that the expected offset has been committed
@@ -1162,39 +1162,38 @@ public class KafkaConsumerTest {
         return prepareOffsetCommitResponse(client, coordinator, Collections.singletonMap(partition, offset));
     }
 
-    private Struct offsetCommitResponse(Map<TopicPartition, Short> responseData) {
-        OffsetCommitResponse response = new OffsetCommitResponse(responseData);
-        return response.toStruct();
+    private OffsetCommitResponse offsetCommitResponse(Map<TopicPartition, Short> responseData) {
+        return new OffsetCommitResponse(responseData);
     }
 
-    private Struct joinGroupFollowerResponse(PartitionAssignor assignor, int generationId, String memberId, String leaderId, short error) {
+    private JoinGroupResponse joinGroupFollowerResponse(PartitionAssignor assignor, int generationId, String memberId, String leaderId, short error) {
         return new JoinGroupResponse(error, generationId, assignor.name(), memberId, leaderId,
-                Collections.<String, ByteBuffer>emptyMap()).toStruct();
+                Collections.<String, ByteBuffer>emptyMap());
     }
 
-    private Struct syncGroupResponse(List<TopicPartition> partitions, short error) {
+    private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, short error) {
         ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions));
-        return new SyncGroupResponse(error, buf).toStruct();
+        return new SyncGroupResponse(error, buf);
     }
 
-    private Struct offsetResponse(Map<TopicPartition, Long> offsets, short error) {
+    private OffsetFetchResponse offsetResponse(Map<TopicPartition, Long> offsets, short error) {
         Map<TopicPartition, OffsetFetchResponse.PartitionData> partitionData = new HashMap<>();
         for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
             partitionData.put(entry.getKey(), new OffsetFetchResponse.PartitionData(entry.getValue(), "", error));
         }
-        return new OffsetFetchResponse(partitionData).toStruct();
+        return new OffsetFetchResponse(partitionData);
     }
 
-    private Struct listOffsetsResponse(Map<TopicPartition, Long> offsets, short error) {
+    private ListOffsetResponse listOffsetsResponse(Map<TopicPartition, Long> offsets, short error) {
         Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>();
         for (Map.Entry<TopicPartition, Long> partitionOffset : offsets.entrySet()) {
             partitionData.put(partitionOffset.getKey(), new ListOffsetResponse.PartitionData(error,
                     1L, partitionOffset.getValue()));
         }
-        return new ListOffsetResponse(partitionData, 1).toStruct();
+        return new ListOffsetResponse(partitionData, 1);
     }
 
-    private Struct fetchResponse(Map<TopicPartition, FetchInfo> fetches) {
+    private FetchResponse fetchResponse(Map<TopicPartition, FetchInfo> fetches) {
         Map<TopicPartition, PartitionData> tpResponses = new HashMap<>();
         for (Map.Entry<TopicPartition, FetchInfo> fetchEntry : fetches.entrySet()) {
             TopicPartition partition = fetchEntry.getKey();
@@ -1204,13 +1203,12 @@ public class KafkaConsumerTest {
             for (int i = 0; i < fetchCount; i++)
                 records.append(fetchOffset + i, 0L, ("key-" + i).getBytes(), ("value-" + i).getBytes());
             records.close();
-            tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE.code(), 0, records.buffer()));
+            tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE.code(), 0, records));
         }
-        FetchResponse response = new FetchResponse(tpResponses, 0);
-        return response.toStruct();
+        return new FetchResponse(tpResponses, 0);
     }
 
-    private Struct fetchResponse(TopicPartition partition, long fetchOffset, int count) {
+    private FetchResponse fetchResponse(TopicPartition partition, long fetchOffset, int count) {
         FetchInfo fetchInfo = new FetchInfo(fetchOffset, count);
         return fetchResponse(Collections.singletonMap(partition, fetchInfo));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 3c8c793..bb617ae 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -16,20 +16,20 @@
  **/
 package org.apache.kafka.clients.consumer.internals;
 
-import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.GroupCoordinatorResponse;
+import org.apache.kafka.common.requests.HeartbeatRequest;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.SyncGroupRequest;
 import org.apache.kafka.common.requests.SyncGroupResponse;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
@@ -114,8 +114,8 @@ public class AbstractCoordinatorTest {
         // raise the error when the background thread tries to send a heartbeat
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
             @Override
-            public boolean matches(ClientRequest request) {
-                if (request.request().header().apiKey() == ApiKeys.HEARTBEAT.id)
+            public boolean matches(AbstractRequest body) {
+                if (body instanceof HeartbeatRequest)
                     throw e;
                 return false;
             }
@@ -160,9 +160,9 @@ public class AbstractCoordinatorTest {
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
             private int invocations = 0;
             @Override
-            public boolean matches(ClientRequest request) {
+            public boolean matches(AbstractRequest body) {
                 invocations++;
-                boolean isJoinGroupRequest = request.request().header().apiKey() == ApiKeys.JOIN_GROUP.id;
+                boolean isJoinGroupRequest = body instanceof JoinGroupRequest;
                 if (isJoinGroupRequest && invocations == 1)
                     // simulate wakeup before the request returns
                     throw new WakeupException();
@@ -196,9 +196,9 @@ public class AbstractCoordinatorTest {
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
             private int invocations = 0;
             @Override
-            public boolean matches(ClientRequest request) {
+            public boolean matches(AbstractRequest body) {
                 invocations++;
-                boolean isJoinGroupRequest = request.request().header().apiKey() == ApiKeys.JOIN_GROUP.id;
+                boolean isJoinGroupRequest = body instanceof JoinGroupRequest;
                 if (isJoinGroupRequest && invocations == 1)
                     // simulate wakeup before the request returns
                     throw new WakeupException();
@@ -233,8 +233,8 @@ public class AbstractCoordinatorTest {
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
             @Override
-            public boolean matches(ClientRequest request) {
-                boolean isJoinGroupRequest = request.request().header().apiKey() == ApiKeys.JOIN_GROUP.id;
+            public boolean matches(AbstractRequest body) {
+                boolean isJoinGroupRequest = body instanceof JoinGroupRequest;
                 if (isJoinGroupRequest)
                     // wakeup after the request returns
                     consumerClient.wakeup();
@@ -267,8 +267,8 @@ public class AbstractCoordinatorTest {
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
             @Override
-            public boolean matches(ClientRequest request) {
-                boolean isJoinGroupRequest = request.request().header().apiKey() == ApiKeys.JOIN_GROUP.id;
+            public boolean matches(AbstractRequest body) {
+                boolean isJoinGroupRequest = body instanceof JoinGroupRequest;
                 if (isJoinGroupRequest)
                     // wakeup after the request returns
                     consumerClient.wakeup();
@@ -305,9 +305,9 @@ public class AbstractCoordinatorTest {
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
             private int invocations = 0;
             @Override
-            public boolean matches(ClientRequest request) {
+            public boolean matches(AbstractRequest body) {
                 invocations++;
-                boolean isSyncGroupRequest = request.request().header().apiKey() == ApiKeys.SYNC_GROUP.id;
+                boolean isSyncGroupRequest = body instanceof SyncGroupRequest;
                 if (isSyncGroupRequest && invocations == 1)
                     // simulate wakeup after the request sent
                     throw new WakeupException();
@@ -341,9 +341,9 @@ public class AbstractCoordinatorTest {
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
             private int invocations = 0;
             @Override
-            public boolean matches(ClientRequest request) {
+            public boolean matches(AbstractRequest body) {
                 invocations++;
-                boolean isSyncGroupRequest = request.request().header().apiKey() == ApiKeys.SYNC_GROUP.id;
+                boolean isSyncGroupRequest = body instanceof SyncGroupRequest;
                 if (isSyncGroupRequest && invocations == 1)
                     // simulate wakeup after the request sent
                     throw new WakeupException();
@@ -378,8 +378,8 @@ public class AbstractCoordinatorTest {
         mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
             @Override
-            public boolean matches(ClientRequest request) {
-                boolean isSyncGroupRequest = request.request().header().apiKey() == ApiKeys.SYNC_GROUP.id;
+            public boolean matches(AbstractRequest body) {
+                boolean isSyncGroupRequest = body instanceof SyncGroupRequest;
                 if (isSyncGroupRequest)
                     // wakeup after the request returns
                     consumerClient.wakeup();
@@ -412,8 +412,8 @@ public class AbstractCoordinatorTest {
         mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
             @Override
-            public boolean matches(ClientRequest request) {
-                boolean isSyncGroupRequest = request.request().header().apiKey() == ApiKeys.SYNC_GROUP.id;
+            public boolean matches(AbstractRequest body) {
+                boolean isSyncGroupRequest = body instanceof SyncGroupRequest;
                 if (isSyncGroupRequest)
                     // wakeup after the request returns
                     consumerClient.wakeup();
@@ -446,8 +446,8 @@ public class AbstractCoordinatorTest {
         final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
             @Override
-            public boolean matches(ClientRequest request) {
-                boolean isHeartbeatRequest = request.request().header().apiKey() == ApiKeys.HEARTBEAT.id;
+            public boolean matches(AbstractRequest body) {
+                boolean isHeartbeatRequest = body instanceof HeartbeatRequest;
                 if (isHeartbeatRequest)
                     heartbeatReceived.set(true);
                 return isHeartbeatRequest;
@@ -466,23 +466,21 @@ public class AbstractCoordinatorTest {
         }, 3000, "Should have received a heartbeat request after joining the group");
     }
 
-    private Struct groupCoordinatorResponse(Node node, Errors error) {
-        GroupCoordinatorResponse response = new GroupCoordinatorResponse(error.code(), node);
-        return response.toStruct();
+    private GroupCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
+        return new GroupCoordinatorResponse(error.code(), node);
     }
 
-    private Struct heartbeatResponse(Errors error) {
-        HeartbeatResponse response = new HeartbeatResponse(error.code());
-        return response.toStruct();
+    private HeartbeatResponse heartbeatResponse(Errors error) {
+        return new HeartbeatResponse(error.code());
     }
 
-    private Struct joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error) {
+    private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error) {
         return new JoinGroupResponse(error.code(), generationId, "dummy-subprotocol", memberId, leaderId,
-                Collections.<String, ByteBuffer>emptyMap()).toStruct();
+                Collections.<String, ByteBuffer>emptyMap());
     }
 
-    private Struct syncGroupResponse(Errors error) {
-        return new SyncGroupResponse(error.code(), ByteBuffer.allocate(0)).toStruct();
+    private SyncGroupResponse syncGroupResponse(Errors error) {
+        return new SyncGroupResponse(error.code(), ByteBuffer.allocate(0));
     }
 
     public class DummyCoordinator extends AbstractCoordinator {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index a83d7a9..e7ba401 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.CommitFailedException;
@@ -39,7 +38,7 @@ import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.GroupCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
@@ -311,8 +310,8 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code()));
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
-            public boolean matches(ClientRequest request) {
-                SyncGroupRequest sync = new SyncGroupRequest(request.request().body());
+            public boolean matches(AbstractRequest body) {
+                SyncGroupRequest sync = (SyncGroupRequest) body;
                 return sync.memberId().equals(consumerId) &&
                         sync.generationId() == 1 &&
                         sync.groupAssignment().containsKey(consumerId);
@@ -353,7 +352,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, initialSubscription, Errors.NONE.code()));
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
-            public boolean matches(ClientRequest request) {
+            public boolean matches(AbstractRequest body) {
                 final Map<String, Integer> updatedPartitions = new HashMap<>();
                 for (String topic : updatedSubscription)
                     updatedPartitions.put(topic, 1);
@@ -371,8 +370,8 @@ public class ConsumerCoordinatorTest {
         // we expect to see a second rebalance with the new-found topics
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
-            public boolean matches(ClientRequest request) {
-                JoinGroupRequest join = new JoinGroupRequest(request.request().body());
+            public boolean matches(AbstractRequest body) {
+                JoinGroupRequest join = (JoinGroupRequest) body;
                 ProtocolMetadata protocolMetadata = join.groupProtocols().iterator().next();
                 PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata.metadata());
                 protocolMetadata.metadata().rewind();
@@ -443,8 +442,8 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
-            public boolean matches(ClientRequest request) {
-                SyncGroupRequest sync = new SyncGroupRequest(request.request().body());
+            public boolean matches(AbstractRequest body) {
+                SyncGroupRequest sync = (SyncGroupRequest) body;
                 return sync.memberId().equals(consumerId) &&
                         sync.generationId() == 1 &&
                         sync.groupAssignment().isEmpty();
@@ -476,13 +475,13 @@ public class ConsumerCoordinatorTest {
         final AtomicBoolean received = new AtomicBoolean(false);
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
-            public boolean matches(ClientRequest request) {
+            public boolean matches(AbstractRequest body) {
                 received.set(true);
-                LeaveGroupRequest leaveRequest = new LeaveGroupRequest(request.request().body());
+                LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
                 return leaveRequest.memberId().equals(consumerId) &&
                         leaveRequest.groupId().equals(groupId);
             }
-        }, new LeaveGroupResponse(Errors.NONE.code()).toStruct());
+        }, new LeaveGroupResponse(Errors.NONE.code()));
         coordinator.close();
         assertTrue(received.get());
     }
@@ -503,13 +502,13 @@ public class ConsumerCoordinatorTest {
         final AtomicBoolean received = new AtomicBoolean(false);
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
-            public boolean matches(ClientRequest request) {
+            public boolean matches(AbstractRequest body) {
                 received.set(true);
-                LeaveGroupRequest leaveRequest = new LeaveGroupRequest(request.request().body());
+                LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
                 return leaveRequest.memberId().equals(consumerId) &&
                         leaveRequest.groupId().equals(groupId);
             }
-        }, new LeaveGroupResponse(Errors.NONE.code()).toStruct());
+        }, new LeaveGroupResponse(Errors.NONE.code()));
         coordinator.maybeLeaveGroup();
         assertTrue(received.get());
 
@@ -548,8 +547,8 @@ public class ConsumerCoordinatorTest {
         // now we should see a new join with the empty UNKNOWN_MEMBER_ID
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
-            public boolean matches(ClientRequest request) {
-                JoinGroupRequest joinRequest = new JoinGroupRequest(request.request().body());
+            public boolean matches(AbstractRequest body) {
+                JoinGroupRequest joinRequest = (JoinGroupRequest) body;
                 return joinRequest.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
             }
         }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
@@ -600,8 +599,8 @@ public class ConsumerCoordinatorTest {
         // then let the full join/sync finish successfully
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
-            public boolean matches(ClientRequest request) {
-                JoinGroupRequest joinRequest = new JoinGroupRequest(request.request().body());
+            public boolean matches(AbstractRequest body) {
+                JoinGroupRequest joinRequest = (JoinGroupRequest) body;
                 return joinRequest.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
             }
         }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
@@ -670,8 +669,8 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code()));
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
-            public boolean matches(ClientRequest request) {
-                SyncGroupRequest sync = new SyncGroupRequest(request.request().body());
+            public boolean matches(AbstractRequest body) {
+                SyncGroupRequest sync = (SyncGroupRequest) body;
                 if (sync.memberId().equals(consumerId) &&
                         sync.generationId() == 1 &&
                         sync.groupAssignment().containsKey(consumerId)) {
@@ -939,7 +938,7 @@ public class ConsumerCoordinatorTest {
         coordinator.joinGroupIfNeeded();
 
         // now switch to manual assignment
-        client.prepareResponse(new LeaveGroupResponse(Errors.NONE.code()).toStruct());
+        client.prepareResponse(new LeaveGroupResponse(Errors.NONE.code()));
         subscriptions.unsubscribe();
         coordinator.maybeLeaveGroup();
         subscriptions.assignFromUser(singleton(tp));
@@ -947,8 +946,8 @@ public class ConsumerCoordinatorTest {
         // the client should not reuse generation/memberId from auto-subscribed generation
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
-            public boolean matches(ClientRequest request) {
-                OffsetCommitRequest commitRequest = new OffsetCommitRequest(request.request().body());
+            public boolean matches(AbstractRequest body) {
+                OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
                 return commitRequest.memberId().equals(OffsetCommitRequest.DEFAULT_MEMBER_ID) &&
                         commitRequest.generationId() == OffsetCommitRequest.DEFAULT_GENERATION_ID;
             }
@@ -1245,48 +1244,44 @@ public class ConsumerCoordinatorTest {
                 excludeInternalTopics);
     }
 
-    private Struct groupCoordinatorResponse(Node node, short error) {
-        GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node);
-        return response.toStruct();
+    private GroupCoordinatorResponse groupCoordinatorResponse(Node node, short error) {
+        return new GroupCoordinatorResponse(error, node);
     }
 
-    private Struct heartbeatResponse(short error) {
-        HeartbeatResponse response = new HeartbeatResponse(error);
-        return response.toStruct();
+    private HeartbeatResponse heartbeatResponse(short error) {
+        return new HeartbeatResponse(error);
     }
 
-    private Struct joinGroupLeaderResponse(int generationId,
-                                           String memberId,
-                                           Map<String, List<String>> subscriptions,
-                                           short error) {
+    private JoinGroupResponse joinGroupLeaderResponse(int generationId,
+                                                      String memberId,
+                                                      Map<String, List<String>> subscriptions,
+                                                      short error) {
         Map<String, ByteBuffer> metadata = new HashMap<>();
         for (Map.Entry<String, List<String>> subscriptionEntry : subscriptions.entrySet()) {
             PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(subscriptionEntry.getValue());
             ByteBuffer buf = ConsumerProtocol.serializeSubscription(subscription);
             metadata.put(subscriptionEntry.getKey(), buf);
         }
-        return new JoinGroupResponse(error, generationId, partitionAssignor.name(), memberId, memberId, metadata).toStruct();
+        return new JoinGroupResponse(error, generationId, partitionAssignor.name(), memberId, memberId, metadata);
     }
 
-    private Struct joinGroupFollowerResponse(int generationId, String memberId, String leaderId, short error) {
+    private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, short error) {
         return new JoinGroupResponse(error, generationId, partitionAssignor.name(), memberId, leaderId,
-                Collections.<String, ByteBuffer>emptyMap()).toStruct();
+                Collections.<String, ByteBuffer>emptyMap());
     }
 
-    private Struct syncGroupResponse(List<TopicPartition> partitions, short error) {
+    private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, short error) {
         ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions));
-        return new SyncGroupResponse(error, buf).toStruct();
+        return new SyncGroupResponse(error, buf);
     }
 
-    private Struct offsetCommitResponse(Map<TopicPartition, Short> responseData) {
-        OffsetCommitResponse response = new OffsetCommitResponse(responseData);
-        return response.toStruct();
+    private OffsetCommitResponse offsetCommitResponse(Map<TopicPartition, Short> responseData) {
+        return new OffsetCommitResponse(responseData);
     }
 
-    private Struct offsetFetchResponse(TopicPartition tp, Short error, String metadata, long offset) {
+    private OffsetFetchResponse offsetFetchResponse(TopicPartition tp, Short error, String metadata, long offset) {
         OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, metadata, error);
-        OffsetFetchResponse response = new OffsetFetchResponse(Collections.singletonMap(tp, data));
-        return response.toStruct();
+        return new OffsetFetchResponse(Collections.singletonMap(tp, data));
     }
 
     private OffsetCommitCallback callback(final AtomicBoolean success) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index f8ad3ca..003d92d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.requests.HeartbeatRequest;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.utils.MockTime;
@@ -60,7 +59,7 @@ public class ConsumerNetworkClientTest {
         assertTrue(future.succeeded());
 
         ClientResponse clientResponse = future.value();
-        HeartbeatResponse response = new HeartbeatResponse(clientResponse.responseBody());
+        HeartbeatResponse response = (HeartbeatResponse) clientResponse.responseBody();
         assertEquals(Errors.NONE.code(), response.errorCode());
     }
 
@@ -206,7 +205,7 @@ public class ConsumerNetworkClientTest {
         client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
         consumerClient.poll(future2);
         ClientResponse clientResponse = future2.value();
-        HeartbeatResponse response = new HeartbeatResponse(clientResponse.responseBody());
+        HeartbeatResponse response = (HeartbeatResponse) clientResponse.responseBody();
         assertEquals(Errors.NONE.code(), response.errorCode());
 
         // Disable ready flag to delay send and queue another send. Disconnection should remove pending send
@@ -226,9 +225,8 @@ public class ConsumerNetworkClientTest {
         return new HeartbeatRequest("group", 1, "memberId");
     }
 
-    private Struct heartbeatResponse(short error) {
-        HeartbeatResponse response = new HeartbeatResponse(error);
-        return response.toStruct();
+    private HeartbeatResponse heartbeatResponse(short error) {
+        return new HeartbeatResponse(error);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 5822646..140e041 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -16,12 +16,12 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Cluster;
@@ -37,12 +37,11 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.Compressor;
 import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.ListOffsetRequest;
@@ -166,8 +165,8 @@ public class FetcherTest {
     private MockClient.RequestMatcher matchesOffset(final TopicPartition tp, final long offset) {
         return new MockClient.RequestMatcher() {
             @Override
-            public boolean matches(ClientRequest request) {
-                FetchRequest fetch = new FetchRequest(request.request().body());
+            public boolean matches(AbstractRequest body) {
+                FetchRequest fetch = (FetchRequest) body;
                 return fetch.fetchData().containsKey(tp) &&
                         fetch.fetchData().get(tp).offset == offset;
             }
@@ -542,7 +541,7 @@ public class FetcherTest {
     @Test
     public void testGetAllTopics() {
         // sending response before request, as getTopicMetadata is a blocking call
-        client.prepareResponse(newMetadataResponse(topicName, Errors.NONE).toStruct());
+        client.prepareResponse(newMetadataResponse(topicName, Errors.NONE));
 
         Map<String, List<PartitionInfo>> allTopics = fetcher.getAllTopicMetadata(5000L);
 
@@ -553,7 +552,7 @@ public class FetcherTest {
     public void testGetAllTopicsDisconnect() {
         // first try gets a disconnect, next succeeds
         client.prepareResponse(null, true);
-        client.prepareResponse(newMetadataResponse(topicName, Errors.NONE).toStruct());
+        client.prepareResponse(newMetadataResponse(topicName, Errors.NONE));
         Map<String, List<PartitionInfo>> allTopics = fetcher.getAllTopicMetadata(5000L);
         assertEquals(cluster.topics().size(), allTopics.size());
     }
@@ -566,7 +565,7 @@ public class FetcherTest {
 
     @Test
     public void testGetAllTopicsUnauthorized() {
-        client.prepareResponse(newMetadataResponse(topicName, Errors.TOPIC_AUTHORIZATION_FAILED).toStruct());
+        client.prepareResponse(newMetadataResponse(topicName, Errors.TOPIC_AUTHORIZATION_FAILED));
         try {
             fetcher.getAllTopicMetadata(10L);
             fail();
@@ -577,13 +576,13 @@ public class FetcherTest {
 
     @Test(expected = InvalidTopicException.class)
     public void testGetTopicMetadataInvalidTopic() {
-        client.prepareResponse(newMetadataResponse(topicName, Errors.INVALID_TOPIC_EXCEPTION).toStruct());
+        client.prepareResponse(newMetadataResponse(topicName, Errors.INVALID_TOPIC_EXCEPTION));
         fetcher.getTopicMetadata(new MetadataRequest(Collections.singletonList(topicName)), 5000L);
     }
 
     @Test
     public void testGetTopicMetadataUnknownTopic() {
-        client.prepareResponse(newMetadataResponse(topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION).toStruct());
+        client.prepareResponse(newMetadataResponse(topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION));
 
         Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(new MetadataRequest(Collections.singletonList(topicName)), 5000L);
         assertNull(topicMetadata.get(topicName));
@@ -591,8 +590,8 @@ public class FetcherTest {
 
     @Test
     public void testGetTopicMetadataLeaderNotAvailable() {
-        client.prepareResponse(newMetadataResponse(topicName, Errors.LEADER_NOT_AVAILABLE).toStruct());
-        client.prepareResponse(newMetadataResponse(topicName, Errors.NONE).toStruct());
+        client.prepareResponse(newMetadataResponse(topicName, Errors.LEADER_NOT_AVAILABLE));
+        client.prepareResponse(newMetadataResponse(topicName, Errors.NONE));
 
         Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(new MetadataRequest(Collections.singletonList(topicName)), 5000L);
         assertTrue(topicMetadata.containsKey(topicName));
@@ -705,28 +704,27 @@ public class FetcherTest {
         // matches any list offset request with the provided timestamp
         return new MockClient.RequestMatcher() {
             @Override
-            public boolean matches(ClientRequest request) {
-                ListOffsetRequest req = new ListOffsetRequest(request.request().body());
+            public boolean matches(AbstractRequest body) {
+                ListOffsetRequest req = (ListOffsetRequest) body;
                 return timestamp == req.partitionTimestamps().get(tp);
             }
         };
     }
 
-    private Struct listOffsetResponse(Errors error, long timestamp, long offset) {
+    private ListOffsetResponse listOffsetResponse(Errors error, long timestamp, long offset) {
         return listOffsetResponse(tp, error, timestamp, offset);
     }
 
-    private Struct listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset) {
+    private ListOffsetResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset) {
         ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(error.code(), timestamp, offset);
         Map<TopicPartition, ListOffsetResponse.PartitionData> allPartitionData = new HashMap<>();
         allPartitionData.put(tp, partitionData);
-        ListOffsetResponse response = new ListOffsetResponse(allPartitionData, 1);
-        return response.toStruct();
+        return new ListOffsetResponse(allPartitionData, 1);
     }
 
-    private Struct fetchResponse(ByteBuffer buffer, short error, long hw, int throttleTime) {
-        FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer)), throttleTime);
-        return response.toStruct();
+    private FetchResponse fetchResponse(ByteBuffer buffer, short error, long hw, int throttleTime) {
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        return new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, records)), throttleTime);
     }
 
     private MetadataResponse newMetadataResponse(String topic, Errors error) {