You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/06/04 06:40:45 UTC

[2/3] kafka git commit: kafka-1928; Move kafka.network over to using the network classes in org.apache.kafka.common.network; patched by Gwen Shapira; reviewed by Joel Koshy, Jay Kreps, Jiangjie Qin, Guozhang Wang and Jun Rao

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 57de058..effb1e6 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -17,17 +17,8 @@ import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.nio.channels.CancelledKeyException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.nio.channels.*;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.common.KafkaException;
@@ -40,20 +31,21 @@ import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A selector interface for doing non-blocking multi-connection network I/O.
+ * A nioSelector interface for doing non-blocking multi-connection network I/O.
  * <p>
  * This class works with {@link NetworkSend} and {@link NetworkReceive} to transmit size-delimited network requests and
  * responses.
  * <p>
- * A connection can be added to the selector associated with an integer id by doing
+ * A connection can be added to the nioSelector associated with an integer id by doing
  * 
  * <pre>
- * selector.connect(42, new InetSocketAddress(&quot;google.com&quot;, server.port), 64000, 64000);
+ * nioSelector.connect(42, new InetSocketAddress(&quot;google.com&quot;, server.port), 64000, 64000);
  * </pre>
  * 
  * The connect call does not block on the creation of the TCP connection, so the connect method only begins initiating
@@ -64,10 +56,10 @@ import org.slf4j.LoggerFactory;
  * 
  * <pre>
  * List&lt;NetworkRequest&gt; requestsToSend = Arrays.asList(new NetworkRequest(0, myBytes), new NetworkRequest(1, myOtherBytes));
- * selector.poll(TIMEOUT_MS, requestsToSend);
+ * nioSelector.poll(TIMEOUT_MS, requestsToSend);
  * </pre>
  * 
- * The selector maintains several lists that are reset by each call to <code>poll()</code> which are available via
+ * The nioSelector maintains several lists that are reset by each call to <code>poll()</code> which are available via
  * various getters. These are reset by each call to <code>poll()</code>.
  * 
  * This class is not thread safe!
@@ -76,41 +68,59 @@ public class Selector implements Selectable {
 
     private static final Logger log = LoggerFactory.getLogger(Selector.class);
 
-    private final java.nio.channels.Selector selector;
-    private final Map<Integer, SelectionKey> keys;
-    private final List<NetworkSend> completedSends;
+    private final java.nio.channels.Selector nioSelector;
+    private final Map<String, SelectionKey> keys;
+    private final List<Send> completedSends;
     private final List<NetworkReceive> completedReceives;
-    private final List<Integer> disconnected;
-    private final List<Integer> connected;
-    private final List<Integer> failedSends;
+    private final List<String> disconnected;
+    private final List<String> connected;
+    private final List<String> failedSends;
     private final Time time;
     private final SelectorMetrics sensors;
     private final String metricGrpPrefix;
     private final Map<String, String> metricTags;
+    private final Map<String, Long> lruConnections;
+    private final long connectionsMaxIdleNanos;
+    private final int maxReceiveSize;
+    private final boolean metricsPerConnection;
+    private long currentTimeNanos;
+    private long nextIdleCloseCheckTime;
+
 
     /**
-     * Create a new selector
+     * Create a new nioSelector
      */
-    public Selector(Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags) {
+    public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection) {
         try {
-            this.selector = java.nio.channels.Selector.open();
+            this.nioSelector = java.nio.channels.Selector.open();
         } catch (IOException e) {
             throw new KafkaException(e);
         }
+        this.maxReceiveSize = maxReceiveSize;
+        this.connectionsMaxIdleNanos = connectionMaxIdleMs * 1000 * 1000;
         this.time = time;
         this.metricGrpPrefix = metricGrpPrefix;
         this.metricTags = metricTags;
-        this.keys = new HashMap<Integer, SelectionKey>();
-        this.completedSends = new ArrayList<NetworkSend>();
+        this.keys = new HashMap<String, SelectionKey>();
+        this.completedSends = new ArrayList<Send>();
         this.completedReceives = new ArrayList<NetworkReceive>();
-        this.connected = new ArrayList<Integer>();
-        this.disconnected = new ArrayList<Integer>();
-        this.failedSends = new ArrayList<Integer>();
+        this.connected = new ArrayList<String>();
+        this.disconnected = new ArrayList<String>();
+        this.failedSends = new ArrayList<String>();
         this.sensors = new SelectorMetrics(metrics);
+        // initial capacity and load factor are default, we set them explicitly because we want to set accessOrder = true
+        this.lruConnections = new LinkedHashMap<String, Long>(16, .75F, true);
+        currentTimeNanos = new SystemTime().nanoseconds();
+        nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
+        this.metricsPerConnection = metricsPerConnection;
+    }
+
+    public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags) {
+        this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, metricTags, true);
     }
 
     /**
-     * Begin connecting to the given address and add the connection to this selector associated with the given id
+     * Begin connecting to the given address and add the connection to this nioSelector associated with the given id
      * number.
      * <p>
      * Note that this call only initiates the connection, which will be completed on a future {@link #poll(long, List)}
@@ -123,7 +133,7 @@ public class Selector implements Selectable {
      * @throws IOException if DNS resolution fails on the hostname or if the broker is down
      */
     @Override
-    public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
+    public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
         if (this.keys.containsKey(id))
             throw new IllegalStateException("There is already a connection for id " + id);
 
@@ -143,7 +153,18 @@ public class Selector implements Selectable {
             channel.close();
             throw e;
         }
-        SelectionKey key = channel.register(this.selector, SelectionKey.OP_CONNECT);
+        SelectionKey key = channel.register(this.nioSelector, SelectionKey.OP_CONNECT);
+        key.attach(new Transmissions(id));
+        this.keys.put(id, key);
+    }
+
+    /**
+     * Register the nioSelector with an existing channel
+     * Use this on server-side, when a connection is accepted by a different thread but processed by the Selector
+     * Note that we are not checking if the connection id is valid - since the connection already exists
+     */
+    public void register(String id, SocketChannel channel) throws ClosedChannelException {
+        SelectionKey key = channel.register(nioSelector, SelectionKey.OP_READ);
         key.attach(new Transmissions(id));
         this.keys.put(id, key);
     }
@@ -153,18 +174,18 @@ public class Selector implements Selectable {
      * processed until the next {@link #poll(long, List) poll()} call.
      */
     @Override
-    public void disconnect(int id) {
+    public void disconnect(String id) {
         SelectionKey key = this.keys.get(id);
         if (key != null)
             key.cancel();
     }
 
     /**
-     * Interrupt the selector if it is blocked waiting to do I/O.
+     * Interrupt the nioSelector if it is blocked waiting to do I/O.
      */
     @Override
     public void wakeup() {
-        this.selector.wakeup();
+        this.nioSelector.wakeup();
     }
 
     /**
@@ -172,12 +193,14 @@ public class Selector implements Selectable {
      */
     @Override
     public void close() {
-        for (SelectionKey key : this.selector.keys())
-            close(key);
+        List<String> connections = new LinkedList<String>(keys.keySet());
+        for (String id: connections)
+            close(id);
+
         try {
-            this.selector.close();
+            this.nioSelector.close();
         } catch (IOException e) {
-            log.error("Exception closing selector:", e);
+            log.error("Exception closing nioSelector:", e);
         }
     }
 
@@ -185,7 +208,7 @@ public class Selector implements Selectable {
      * Queue the given request for sending in the subsequent {@poll(long)} calls
      * @param send The request to send
      */
-    public void send(NetworkSend send) {
+    public void send(Send send) {
         SelectionKey key = keyForId(send.destination());
         Transmissions transmissions = transmissions(key);
         if (transmissions.hasSend())
@@ -194,7 +217,7 @@ public class Selector implements Selectable {
         try {
             key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
         } catch (CancelledKeyException e) {
-            close(key);
+            close(transmissions.id);
             this.failedSends.add(send.destination());
         }
     }
@@ -220,10 +243,11 @@ public class Selector implements Selectable {
         long startSelect = time.nanoseconds();
         int readyKeys = select(timeout);
         long endSelect = time.nanoseconds();
+        currentTimeNanos = endSelect;
         this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
 
         if (readyKeys > 0) {
-            Set<SelectionKey> keys = this.selector.selectedKeys();
+            Set<SelectionKey> keys = this.nioSelector.selectedKeys();
             Iterator<SelectionKey> iter = keys.iterator();
             while (iter.hasNext()) {
                 SelectionKey key = iter.next();
@@ -232,8 +256,9 @@ public class Selector implements Selectable {
                 Transmissions transmissions = transmissions(key);
                 SocketChannel channel = channel(key);
 
-                // register all per-broker metrics at once
-                sensors.maybeRegisterNodeMetrics(transmissions.id);
+                // register all per-connection metrics at once
+                sensors.maybeRegisterConnectionMetrics(transmissions.id);
+                lruConnections.put(transmissions.id, currentTimeNanos);
 
                 try {
                     /* complete any connections that have finished their handshake */
@@ -247,8 +272,14 @@ public class Selector implements Selectable {
                     /* read from any connections that have readable data */
                     if (key.isReadable()) {
                         if (!transmissions.hasReceive())
-                            transmissions.receive = new NetworkReceive(transmissions.id);
-                        transmissions.receive.readFrom(channel);
+                            transmissions.receive = new NetworkReceive(maxReceiveSize, transmissions.id);
+                        try {
+                            transmissions.receive.readFrom(channel);
+                        } catch (InvalidReceiveException e) {
+                            log.error("Invalid data received from " + transmissions.id + " closing connection", e);
+                            close(transmissions.id);
+                            throw e;
+                        }
                         if (transmissions.receive.complete()) {
                             transmissions.receive.payload().rewind();
                             this.completedReceives.add(transmissions.receive);
@@ -260,7 +291,7 @@ public class Selector implements Selectable {
                     /* write to any sockets that have space in their buffer and for which we have data */
                     if (key.isWritable()) {
                         transmissions.send.writeTo(channel);
-                        if (transmissions.send.remaining() <= 0) {
+                        if (transmissions.send.completed()) {
                             this.completedSends.add(transmissions.send);
                             this.sensors.recordBytesSent(transmissions.id, transmissions.send.size());
                             transmissions.clearSend();
@@ -270,7 +301,7 @@ public class Selector implements Selectable {
 
                     /* cancel any defunct sockets */
                     if (!key.isValid()) {
-                        close(key);
+                        close(transmissions.id);
                         this.disconnected.add(transmissions.id);
                     }
                 } catch (IOException e) {
@@ -279,15 +310,16 @@ public class Selector implements Selectable {
                         log.info("Connection {} disconnected", desc);
                     else
                         log.warn("Error in I/O with connection to {}", desc, e);
-                    close(key);
+                    close(transmissions.id);
                     this.disconnected.add(transmissions.id);
                 }
             }
         }
         long endIo = time.nanoseconds();
         this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
+        maybeCloseOldestConnection();
     }
-    
+
     private String socketDescription(SocketChannel channel) {
         Socket socket = channel.socket();
         if (socket == null)
@@ -299,7 +331,7 @@ public class Selector implements Selectable {
     }
 
     @Override
-    public List<NetworkSend> completedSends() {
+    public List<Send> completedSends() {
         return this.completedSends;
     }
 
@@ -309,17 +341,17 @@ public class Selector implements Selectable {
     }
 
     @Override
-    public List<Integer> disconnected() {
+    public List<String> disconnected() {
         return this.disconnected;
     }
 
     @Override
-    public List<Integer> connected() {
+    public List<String> connected() {
         return this.connected;
     }
 
     @Override
-    public void mute(int id) {
+    public void mute(String id) {
         mute(this.keyForId(id));
     }
 
@@ -328,7 +360,7 @@ public class Selector implements Selectable {
     }
 
     @Override
-    public void unmute(int id) {
+    public void unmute(String id) {
         unmute(this.keyForId(id));
     }
 
@@ -348,6 +380,25 @@ public class Selector implements Selectable {
             unmute(key);
     }
 
+    private void maybeCloseOldestConnection() {
+        if (currentTimeNanos > nextIdleCloseCheckTime) {
+            if (lruConnections.isEmpty()) {
+                nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
+            } else {
+                Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet().iterator().next();
+                Long connectionLastActiveTime = oldestConnectionEntry.getValue();
+                nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos;
+                if (currentTimeNanos > nextIdleCloseCheckTime) {
+                    String connectionId = oldestConnectionEntry.getKey();
+                    if (log.isTraceEnabled())
+                        log.trace("About to close the idle connection from " + connectionId
+                                + " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis");
+                    close(connectionId);
+                }
+            }
+        }
+    }
+
     /**
      * Clear the results from the prior poll
      */
@@ -369,17 +420,19 @@ public class Selector implements Selectable {
      */
     private int select(long ms) throws IOException {
         if (ms == 0L)
-            return this.selector.selectNow();
+            return this.nioSelector.selectNow();
         else if (ms < 0L)
-            return this.selector.select();
+            return this.nioSelector.select();
         else
-            return this.selector.select(ms);
+            return this.nioSelector.select(ms);
     }
 
     /**
      * Begin closing this connection
      */
-    private void close(SelectionKey key) {
+    public void close(String id) {
+        SelectionKey key = keyForId(id);
+        lruConnections.remove(id);
         SocketChannel channel = channel(key);
         Transmissions trans = transmissions(key);
         if (trans != null) {
@@ -401,10 +454,10 @@ public class Selector implements Selectable {
     /**
      * Get the selection key associated with this numeric id
      */
-    private SelectionKey keyForId(int id) {
+    private SelectionKey keyForId(String id) {
         SelectionKey key = this.keys.get(id);
         if (key == null)
-            throw new IllegalStateException("Attempt to write to socket for which there is no open connection.");
+            throw new IllegalStateException("Attempt to write to socket for which there is no open connection. Connection id " + id + " existing connections " + keys.keySet().toString());
         return key;
     }
 
@@ -426,11 +479,11 @@ public class Selector implements Selectable {
      * The id and in-progress send and receive associated with a connection
      */
     private static class Transmissions {
-        public int id;
-        public NetworkSend send;
+        public String id;
+        public Send send;
         public NetworkReceive receive;
 
-        public Transmissions(int id) {
+        public Transmissions(String id) {
             this.id = id;
         }
 
@@ -464,20 +517,27 @@ public class Selector implements Selectable {
         public SelectorMetrics(Metrics metrics) {
             this.metrics = metrics;
             String metricGrpName = metricGrpPrefix + "-metrics";
+            StringBuilder tagsSuffix = new StringBuilder();
+
+            for (Map.Entry<String, String> tag: metricTags.entrySet()) {
+                tagsSuffix.append(tag.getKey());
+                tagsSuffix.append("-");
+                tagsSuffix.append(tag.getValue());
+            }
 
-            this.connectionClosed = this.metrics.sensor("connections-closed");
+            this.connectionClosed = this.metrics.sensor("connections-closed:" + tagsSuffix.toString());
             MetricName metricName = new MetricName("connection-close-rate", metricGrpName, "Connections closed per second in the window.", metricTags);
             this.connectionClosed.add(metricName, new Rate());
 
-            this.connectionCreated = this.metrics.sensor("connections-created");
+            this.connectionCreated = this.metrics.sensor("connections-created:" + tagsSuffix.toString());
             metricName = new MetricName("connection-creation-rate", metricGrpName, "New connections established per second in the window.", metricTags);
             this.connectionCreated.add(metricName, new Rate());
 
-            this.bytesTransferred = this.metrics.sensor("bytes-sent-received");
+            this.bytesTransferred = this.metrics.sensor("bytes-sent-received:" + tagsSuffix.toString());
             metricName = new MetricName("network-io-rate", metricGrpName, "The average number of network operations (reads or writes) on all connections per second.", metricTags);
             bytesTransferred.add(metricName, new Rate(new Count()));
 
-            this.bytesSent = this.metrics.sensor("bytes-sent", bytesTransferred);
+            this.bytesSent = this.metrics.sensor("bytes-sent:" + tagsSuffix.toString(), bytesTransferred);
             metricName = new MetricName("outgoing-byte-rate", metricGrpName, "The average number of outgoing bytes sent per second to all servers.", metricTags);
             this.bytesSent.add(metricName, new Rate());
             metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", metricTags);
@@ -487,13 +547,13 @@ public class Selector implements Selectable {
             metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", metricTags);
             this.bytesSent.add(metricName, new Max());
 
-            this.bytesReceived = this.metrics.sensor("bytes-received", bytesTransferred);
+            this.bytesReceived = this.metrics.sensor("bytes-received:" + tagsSuffix.toString(), bytesTransferred);
             metricName = new MetricName("incoming-byte-rate", metricGrpName, "Bytes/second read off all sockets", metricTags);
             this.bytesReceived.add(metricName, new Rate());
             metricName = new MetricName("response-rate", metricGrpName, "Responses received sent per second.", metricTags);
             this.bytesReceived.add(metricName, new Rate(new Count()));
 
-            this.selectTime = this.metrics.sensor("select-time");
+            this.selectTime = this.metrics.sensor("select-time:" + tagsSuffix.toString());
             metricName = new MetricName("select-rate", metricGrpName, "Number of times the I/O layer checked for new I/O to perform per second", metricTags);
             this.selectTime.add(metricName, new Rate(new Count()));
             metricName = new MetricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags);
@@ -501,7 +561,7 @@ public class Selector implements Selectable {
             metricName = new MetricName("io-wait-ratio", metricGrpName, "The fraction of time the I/O thread spent waiting.", metricTags);
             this.selectTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
 
-            this.ioTime = this.metrics.sensor("io-time");
+            this.ioTime = this.metrics.sensor("io-time:" + tagsSuffix.toString());
             metricName = new MetricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", metricTags);
             this.ioTime.add(metricName, new Avg());
             metricName = new MetricName("io-ratio", metricGrpName, "The fraction of time the I/O thread spent doing I/O", metricTags);
@@ -515,17 +575,17 @@ public class Selector implements Selectable {
             });
         }
 
-        public void maybeRegisterNodeMetrics(int node) {
-            if (node >= 0) {
-                // if one sensor of the metrics has been registered for the node,
+        public void maybeRegisterConnectionMetrics(String connectionId) {
+            if (!connectionId.isEmpty() && metricsPerConnection) {
+                // if one sensor of the metrics has been registered for the connection,
                 // then all other sensors should have been registered; and vice versa
-                String nodeRequestName = "node-" + node + ".bytes-sent";
+                String nodeRequestName = "node-" + connectionId + ".bytes-sent";
                 Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
                 if (nodeRequest == null) {
                     String metricGrpName = metricGrpPrefix + "-node-metrics";
 
                     Map<String, String> tags = new LinkedHashMap<String, String>(metricTags);
-                    tags.put("node-id", "node-" + node);
+                    tags.put("node-id", "node-" + connectionId);
 
                     nodeRequest = this.metrics.sensor(nodeRequestName);
                     MetricName metricName = new MetricName("outgoing-byte-rate", metricGrpName, tags);
@@ -537,14 +597,14 @@ public class Selector implements Selectable {
                     metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", tags);
                     nodeRequest.add(metricName, new Max());
 
-                    String nodeResponseName = "node-" + node + ".bytes-received";
+                    String nodeResponseName = "node-" + connectionId + ".bytes-received";
                     Sensor nodeResponse = this.metrics.sensor(nodeResponseName);
                     metricName = new MetricName("incoming-byte-rate", metricGrpName, tags);
                     nodeResponse.add(metricName, new Rate());
                     metricName = new MetricName("response-rate", metricGrpName, "The average number of responses received per second.", tags);
                     nodeResponse.add(metricName, new Rate(new Count()));
 
-                    String nodeTimeName = "node-" + node + ".latency";
+                    String nodeTimeName = "node-" + connectionId + ".latency";
                     Sensor nodeRequestTime = this.metrics.sensor(nodeTimeName);
                     metricName = new MetricName("request-latency-avg", metricGrpName, tags);
                     nodeRequestTime.add(metricName, new Avg());
@@ -554,22 +614,22 @@ public class Selector implements Selectable {
             }
         }
 
-        public void recordBytesSent(int node, int bytes) {
+        public void recordBytesSent(String connectionId, long bytes) {
             long now = time.milliseconds();
             this.bytesSent.record(bytes, now);
-            if (node >= 0) {
-                String nodeRequestName = "node-" + node + ".bytes-sent";
+            if (!connectionId.isEmpty()) {
+                String nodeRequestName = "node-" + connectionId + ".bytes-sent";
                 Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
                 if (nodeRequest != null)
                     nodeRequest.record(bytes, now);
             }
         }
 
-        public void recordBytesReceived(int node, int bytes) {
+        public void recordBytesReceived(String connection, int bytes) {
             long now = time.milliseconds();
             this.bytesReceived.record(bytes, now);
-            if (node >= 0) {
-                String nodeRequestName = "node-" + node + ".bytes-received";
+            if (!connection.isEmpty()) {
+                String nodeRequestName = "node-" + connection + ".bytes-received";
                 Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
                 if (nodeRequest != null)
                     nodeRequest.record(bytes, now);

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/network/Send.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Send.java b/clients/src/main/java/org/apache/kafka/common/network/Send.java
index 5d321a0..8f6daad 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Send.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Send.java
@@ -13,7 +13,6 @@
 package org.apache.kafka.common.network;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.channels.GatheringByteChannel;
 
 /**
@@ -24,12 +23,7 @@ public interface Send {
     /**
      * The numeric id for the destination of this send
      */
-    public int destination();
-
-    /**
-     * The number of bytes remaining to send
-     */
-    public int remaining();
+    public String destination();
 
     /**
      * Is this send complete?
@@ -37,11 +31,6 @@ public interface Send {
     public boolean completed();
 
     /**
-     * An optional method to turn this send into an array of ByteBuffers if possible (otherwise returns null)
-     */
-    public ByteBuffer[] reify();
-
-    /**
      * Write some as-yet unwritten bytes from this send to the provided channel. It may take multiple calls for the send
      * to be completely written
      * @param channel The channel to write to
@@ -50,4 +39,9 @@ public interface Send {
      */
     public long writeTo(GatheringByteChannel channel) throws IOException;
 
+    /**
+     * Size of the send
+     */
+    public long size();
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java
index 27cbf39..3fec60b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java
@@ -25,7 +25,7 @@ public class RequestSend extends NetworkSend {
     private final RequestHeader header;
     private final Struct body;
 
-    public RequestSend(int destination, RequestHeader header, Struct body) {
+    public RequestSend(String destination, RequestHeader header, Struct body) {
         super(destination, serialize(header, body));
         this.header = header;
         this.body = body;

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java b/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java
new file mode 100644
index 0000000..12b06d1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.network.NetworkSend;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class ResponseSend extends NetworkSend {
+
+    public ResponseSend(String destination, ResponseHeader header, Struct body) {
+        super(destination, serialize(header, body));
+    }
+
+    public ResponseSend(String destination, ResponseHeader header, AbstractRequestResponse response) {
+        this(destination, header, response.toStruct());
+    }
+
+    private static ByteBuffer serialize(ResponseHeader header, Struct body) {
+        ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());
+        header.writeTo(buffer);
+        body.writeTo(buffer);
+        buffer.rewind();
+        return buffer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 5e3fab1..d9c97e9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -78,7 +78,7 @@ public class MockClient implements KafkaClient {
         return false;
     }
 
-    public void disconnect(Integer node) {
+    public void disconnect(String node) {
         Iterator<ClientRequest> iter = requests.iterator();
         while (iter.hasNext()) {
             ClientRequest request = iter.next();
@@ -115,7 +115,7 @@ public class MockClient implements KafkaClient {
     }
 
     @Override
-    public List<ClientResponse> completeAll(int node, long now) {
+    public List<ClientResponse> completeAll(String node, long now) {
         return completeAll(now);
     }
 
@@ -158,7 +158,7 @@ public class MockClient implements KafkaClient {
     }
 
     @Override
-    public int inFlightRequestCount(int nodeId) {
+    public int inFlightRequestCount(String nodeId) {
         return requests.size();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 8b27889..43238ce 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -65,7 +65,7 @@ public class NetworkClientTest {
         client.poll(1, time.milliseconds());
         selector.clear();
         assertTrue("Now the client is ready", client.ready(node, time.milliseconds()));
-        selector.disconnect(node.id());
+        selector.disconnect(node.idString());
         client.poll(1, time.milliseconds());
         selector.clear();
         assertFalse("After we forced the disconnection the client is no longer ready.", client.ready(node, time.milliseconds()));
@@ -74,7 +74,7 @@ public class NetworkClientTest {
 
     @Test(expected = IllegalStateException.class)
     public void testSendToUnreadyNode() {
-        RequestSend send = new RequestSend(5,
+        RequestSend send = new RequestSend("5",
                                            client.nextRequestHeader(ApiKeys.METADATA),
                                            new MetadataRequest(Arrays.asList("test")).toStruct());
         ClientRequest request = new ClientRequest(time.milliseconds(), false, send, null);
@@ -86,7 +86,7 @@ public class NetworkClientTest {
     public void testSimpleRequestResponse() {
         ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, ByteBuffer>emptyMap());
         RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE);
-        RequestSend send = new RequestSend(node.id(), reqHeader, produceRequest.toStruct());
+        RequestSend send = new RequestSend(node.idString(), reqHeader, produceRequest.toStruct());
         TestCallbackHandler handler = new TestCallbackHandler();
         ClientRequest request = new ClientRequest(time.milliseconds(), true, send, handler);
         awaitReady(client, node);
@@ -101,7 +101,7 @@ public class NetworkClientTest {
         respHeader.writeTo(buffer);
         resp.writeTo(buffer);
         buffer.flip();
-        selector.completeReceive(new NetworkReceive(node.id(), buffer));
+        selector.completeReceive(new NetworkReceive(node.idString(), buffer));
         List<ClientResponse> responses = client.poll(1, time.milliseconds());
         assertEquals(1, responses.size());
         assertTrue("The handler should have executed.", handler.executed);

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index d5b306b..d23b4b6 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -22,10 +22,7 @@ import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
+import java.util.*;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
@@ -49,7 +46,7 @@ public class SelectorTest {
     public void setup() throws Exception {
         this.server = new EchoServer();
         this.server.start();
-        this.selector = new Selector(new Metrics(), new MockTime() , "MetricGroup", new LinkedHashMap<String, String>());
+        this.selector = new Selector(5000, new Metrics(), new MockTime() , "MetricGroup", new LinkedHashMap<String, String>());
     }
 
     @After
@@ -63,7 +60,7 @@ public class SelectorTest {
      */
     @Test
     public void testServerDisconnect() throws Exception {
-        int node = 0;
+        String node = "0";
 
         // connect and do a simple request
         blockingConnect(node);
@@ -84,7 +81,7 @@ public class SelectorTest {
      */
     @Test
     public void testClientDisconnect() throws Exception {
-        int node = 0;
+        String node = "0";
         blockingConnect(node);
         selector.disconnect(node);
         selector.send(createSend(node, "hello1"));
@@ -101,7 +98,7 @@ public class SelectorTest {
      */
     @Test(expected = IllegalStateException.class)
     public void testCantSendWithInProgress() throws Exception {
-        int node = 0;
+        String node = "0";
         blockingConnect(node);
         selector.send(createSend(node, "test1"));
         selector.send(createSend(node, "test2"));
@@ -113,7 +110,7 @@ public class SelectorTest {
      */
     @Test(expected = IllegalStateException.class)
     public void testCantSendWithoutConnecting() throws Exception {
-        selector.send(createSend(0, "test"));
+        selector.send(createSend("0", "test"));
         selector.poll(1000L);
     }
 
@@ -122,7 +119,7 @@ public class SelectorTest {
      */
     @Test(expected = IOException.class)
     public void testNoRouteToHost() throws Exception {
-        selector.connect(0, new InetSocketAddress("asdf.asdf.dsc", server.port), BUFFER_SIZE, BUFFER_SIZE);
+        selector.connect("0", new InetSocketAddress("asdf.asdf.dsc", server.port), BUFFER_SIZE, BUFFER_SIZE);
     }
 
     /**
@@ -130,7 +127,7 @@ public class SelectorTest {
      */
     @Test
     public void testConnectionRefused() throws Exception {
-        int node = 0;
+        String node = "0";
         ServerSocket nonListeningSocket = new ServerSocket(0);
         int nonListeningPort = nonListeningSocket.getLocalPort();
         selector.connect(node, new InetSocketAddress("localhost", nonListeningPort), BUFFER_SIZE, BUFFER_SIZE);
@@ -151,14 +148,15 @@ public class SelectorTest {
         // create connections
         InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
         for (int i = 0; i < conns; i++)
-            selector.connect(i, addr, BUFFER_SIZE, BUFFER_SIZE);
-
+            selector.connect(Integer.toString(i), addr, BUFFER_SIZE, BUFFER_SIZE);
         // send echo requests and receive responses
-        int[] requests = new int[conns];
-        int[] responses = new int[conns];
+        Map<String, Integer> requests = new HashMap<String, Integer>();
+        Map<String, Integer> responses = new HashMap<String, Integer>();
         int responseCount = 0;
-        for (int i = 0; i < conns; i++)
-            selector.send(createSend(i, i + "-" + 0));
+        for (int i = 0; i < conns; i++) {
+            String node = Integer.toString(i);
+            selector.send(createSend(node, node + "-0"));
+        }
 
         // loop until we complete all requests
         while (responseCount < conns * reqs) {
@@ -171,19 +169,27 @@ public class SelectorTest {
             for (NetworkReceive receive : selector.completedReceives()) {
                 String[] pieces = asString(receive).split("-");
                 assertEquals("Should be in the form 'conn-counter'", 2, pieces.length);
-                assertEquals("Check the source", receive.source(), Integer.parseInt(pieces[0]));
+                assertEquals("Check the source", receive.source(), pieces[0]);
                 assertEquals("Check that the receive has kindly been rewound", 0, receive.payload().position());
-                assertEquals("Check the request counter", responses[receive.source()], Integer.parseInt(pieces[1]));
-                responses[receive.source()]++; // increment the expected counter
+                if (responses.containsKey(receive.source())) {
+                    assertEquals("Check the request counter", (int) responses.get(receive.source()), Integer.parseInt(pieces[1]));
+                    responses.put(receive.source(), responses.get(receive.source()) + 1);
+                } else {
+                    assertEquals("Check the request counter", 0, Integer.parseInt(pieces[1]));
+                    responses.put(receive.source(), 1);
+                }
                 responseCount++;
             }
 
             // prepare new sends for the next round
-            for (NetworkSend send : selector.completedSends()) {
-                int dest = send.destination();
-                requests[dest]++;
-                if (requests[dest] < reqs)
-                    selector.send(createSend(dest, dest + "-" + requests[dest]));
+            for (Send send : selector.completedSends()) {
+                String dest = send.destination();
+                if (requests.containsKey(dest))
+                    requests.put(dest, requests.get(dest) + 1);
+                else
+                    requests.put(dest, 1);
+                if (requests.get(dest) < reqs)
+                    selector.send(createSend(dest, dest + "-" + requests.get(dest)));
             }
         }
     }
@@ -193,7 +199,7 @@ public class SelectorTest {
      */
     @Test
     public void testSendLargeRequest() throws Exception {
-        int node = 0;
+        String node = "0";
         blockingConnect(node);
         String big = TestUtils.randomString(10 * BUFFER_SIZE);
         assertEquals(big, blockingRequest(node, big));
@@ -204,41 +210,41 @@ public class SelectorTest {
      */
     @Test
     public void testEmptyRequest() throws Exception {
-        int node = 0;
+        String node = "0";
         blockingConnect(node);
         assertEquals("", blockingRequest(node, ""));
     }
 
     @Test(expected = IllegalStateException.class)
     public void testExistingConnectionId() throws IOException {
-        blockingConnect(0);
-        blockingConnect(0);
+        blockingConnect("0");
+        blockingConnect("0");
     }
 
     @Test
     public void testMute() throws Exception {
-        blockingConnect(0);
-        blockingConnect(1);
+        blockingConnect("0");
+        blockingConnect("1");
 
-        selector.send(createSend(0, "hello"));
-        selector.send(createSend(1, "hi"));
+        selector.send(createSend("0", "hello"));
+        selector.send(createSend("1", "hi"));
 
-        selector.mute(1);
+        selector.mute("1");
 
         while (selector.completedReceives().isEmpty())
             selector.poll(5);
         assertEquals("We should have only one response", 1, selector.completedReceives().size());
-        assertEquals("The response should not be from the muted node", 0, selector.completedReceives().get(0).source());
+        assertEquals("The response should not be from the muted node", "0", selector.completedReceives().get(0).source());
 
-        selector.unmute(1);
+        selector.unmute("1");
         do {
             selector.poll(5);
         } while (selector.completedReceives().isEmpty());
         assertEquals("We should have only one response", 1, selector.completedReceives().size());
-        assertEquals("The response should be from the previously muted node", 1, selector.completedReceives().get(0).source());
+        assertEquals("The response should be from the previously muted node", "1", selector.completedReceives().get(0).source());
     }
 
-    private String blockingRequest(int node, String s) throws IOException {
+    private String blockingRequest(String node, String s) throws IOException {
         selector.send(createSend(node, s));
         selector.poll(1000L);
         while (true) {
@@ -250,13 +256,13 @@ public class SelectorTest {
     }
 
     /* connect and wait for the connection to complete */
-    private void blockingConnect(int node) throws IOException {
+    private void blockingConnect(String node) throws IOException {
         selector.connect(node, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE);
         while (!selector.connected().contains(node))
             selector.poll(10000L);
     }
 
-    private NetworkSend createSend(int node, String s) {
+    private NetworkSend createSend(String node, String s) {
         return new NetworkSend(node, ByteBuffer.wrap(s.getBytes()));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/test/java/org/apache/kafka/test/MockSelector.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
index ea89b06..51eb9d1 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
@@ -20,6 +20,7 @@ import java.util.List;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.network.NetworkSend;
 import org.apache.kafka.common.network.Selectable;
+import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.utils.Time;
 
 /**
@@ -28,23 +29,23 @@ import org.apache.kafka.common.utils.Time;
 public class MockSelector implements Selectable {
 
     private final Time time;
-    private final List<NetworkSend> initiatedSends = new ArrayList<NetworkSend>();
-    private final List<NetworkSend> completedSends = new ArrayList<NetworkSend>();
+    private final List<Send> initiatedSends = new ArrayList<Send>();
+    private final List<Send> completedSends = new ArrayList<Send>();
     private final List<NetworkReceive> completedReceives = new ArrayList<NetworkReceive>();
-    private final List<Integer> disconnected = new ArrayList<Integer>();
-    private final List<Integer> connected = new ArrayList<Integer>();
+    private final List<String> disconnected = new ArrayList<String>();
+    private final List<String> connected = new ArrayList<String>();
 
     public MockSelector(Time time) {
         this.time = time;
     }
 
     @Override
-    public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
+    public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
         this.connected.add(id);
     }
 
     @Override
-    public void disconnect(int id) {
+    public void disconnect(String id) {
         this.disconnected.add(id);
     }
 
@@ -64,7 +65,7 @@ public class MockSelector implements Selectable {
     }
 
     @Override
-    public void send(NetworkSend send) {
+    public void send(Send send) {
         this.initiatedSends.add(send);
     }
 
@@ -76,7 +77,7 @@ public class MockSelector implements Selectable {
     }
 
     @Override
-    public List<NetworkSend> completedSends() {
+    public List<Send> completedSends() {
         return completedSends;
     }
 
@@ -94,21 +95,21 @@ public class MockSelector implements Selectable {
     }
 
     @Override
-    public List<Integer> disconnected() {
+    public List<String> disconnected() {
         return disconnected;
     }
 
     @Override
-    public List<Integer> connected() {
+    public List<String> connected() {
         return connected;
     }
 
     @Override
-    public void mute(int id) {
+    public void mute(String id) {
     }
 
     @Override
-    public void unmute(int id) {
+    public void unmute(String id) {
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/Kafka.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala
index 9efabaa..6af7b80 100755
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -17,6 +17,8 @@
 
 package kafka
 
+import java.util.Properties
+
 import scala.collection.JavaConversions._
 import joptsimple.OptionParser
 import metrics.KafkaMetricsReporter
@@ -26,7 +28,7 @@ import org.apache.kafka.common.utils.Utils
 
 object Kafka extends Logging {
 
-  def getKafkaConfigFromArgs(args: Array[String]): KafkaConfig = {
+  def getPropsFromArgs(args: Array[String]): Properties = {
     val optionParser = new OptionParser
     val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file")
       .withRequiredArg()
@@ -47,14 +49,14 @@ object Kafka extends Logging {
 
       props.putAll(CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt)))
     }
-
-    KafkaConfig.fromProps(props)
+    props
   }
 
   def main(args: Array[String]): Unit = {
     try {
-      val serverConfig = getKafkaConfigFromArgs(args)
-      KafkaMetricsReporter.startReporters(new VerifiableProperties(serverConfig.toProps))
+      val serverProps = getPropsFromArgs(args)
+      val serverConfig = KafkaConfig.fromProps(serverProps)
+      KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps))
       val kafkaServerStartable = new KafkaServerStartable(serverConfig)
 
       // attach shutdown handler to catch control-c

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 6d1c6ab..f23120e 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -174,7 +174,7 @@ object ConsumerGroupCommand {
     val offsetMap = mutable.Map[TopicAndPartition, Long]()
     val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs)
     channel.send(OffsetFetchRequest(group, topicPartitions))
-    val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer)
+    val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload())
 
     offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) =>
       if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
index a3b1b78..258d5fe 100644
--- a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
@@ -18,9 +18,10 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import kafka.network.{BoundedByteBufferSend, RequestChannel}
-import kafka.network.RequestChannel.Response
+
 import kafka.common.ErrorMapping
+import kafka.network.{RequestOrResponseSend, RequestChannel}
+import kafka.network.RequestChannel.Response
 
 object ConsumerMetadataRequest {
   val CurrentVersion = 0.shortValue
@@ -64,7 +65,7 @@ case class ConsumerMetadataRequest(group: String,
   override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
     // return ConsumerCoordinatorNotAvailable for all uncaught errors
     val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, correlationId)
-    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
   }
 
   def describe(details: Boolean) = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
index fe81635..8092007 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
@@ -18,10 +18,9 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import kafka.api.ApiUtils._
-import collection.mutable.ListBuffer
-import kafka.network.{BoundedByteBufferSend, RequestChannel}
-import kafka.common.{TopicAndPartition, ErrorMapping}
+
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
 import kafka.utils.Logging
 
@@ -63,7 +62,7 @@ case class ControlledShutdownRequest(versionId: Short,
 
   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
     val errorResponse = ControlledShutdownResponse(correlationId, ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]]), Set.empty[TopicAndPartition])
-    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
   }
 
   override def describe(details: Boolean = false): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index b038c15..5b38f85 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -149,7 +149,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
         (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, MessageSet.Empty))
     }
     val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData)
-    requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse)))
+    requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, errorResponse)))
   }
 
   override def describe(details: Boolean): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index 75aaf57..0b6b33a 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -22,8 +22,10 @@ import java.nio.channels.GatheringByteChannel
 
 import kafka.common.{TopicAndPartition, ErrorMapping}
 import kafka.message.{MessageSet, ByteBufferMessageSet}
-import kafka.network.{MultiSend, Send}
 import kafka.api.ApiUtils._
+import org.apache.kafka.common.KafkaException
+import org.apache.kafka.common.network.Send
+import org.apache.kafka.common.network.MultiSend
 
 import scala.collection._
 
@@ -62,10 +64,12 @@ class PartitionDataSend(val partitionId: Int,
   buffer.putInt(partitionData.messages.sizeInBytes)
   buffer.rewind()
 
-  override def complete = !buffer.hasRemaining && messagesSentSize >= messageSize
+  override def completed = !buffer.hasRemaining && messagesSentSize >= messageSize
 
-  override def writeTo(channel: GatheringByteChannel): Int = {
-    var written = 0
+  override def destination: String = ""
+
+  override def writeTo(channel: GatheringByteChannel): Long = {
+    var written = 0L
     if(buffer.hasRemaining)
       written += channel.write(buffer)
     if(!buffer.hasRemaining && messagesSentSize < messageSize) {
@@ -75,6 +79,8 @@ class PartitionDataSend(val partitionId: Int,
     }
     written
   }
+
+  override def size = buffer.capacity() + messageSize
 }
 
 object TopicData {
@@ -101,29 +107,32 @@ case class TopicData(topic: String, partitionData: Map[Int, FetchResponsePartiti
   val headerSize = TopicData.headerSize(topic)
 }
 
-class TopicDataSend(val topicData: TopicData) extends Send {
-  private val size = topicData.sizeInBytes
+class TopicDataSend(val dest: String, val topicData: TopicData) extends Send {
+
+  private var sent = 0L
 
-  private var sent = 0
+  override def completed: Boolean = sent >= size
 
-  override def complete = sent >= size
+  override def destination: String = dest
+
+  override def size = topicData.headerSize + sends.size()
 
   private val buffer = ByteBuffer.allocate(topicData.headerSize)
   writeShortString(buffer, topicData.topic)
   buffer.putInt(topicData.partitionData.size)
   buffer.rewind()
 
-  val sends = new MultiSend(topicData.partitionData.toList
-                                    .map(d => new PartitionDataSend(d._1, d._2))) {
-    val expectedBytesToWrite = topicData.sizeInBytes - topicData.headerSize
-  }
+  private val sends = new MultiSend(dest,
+                            JavaConversions.seqAsJavaList(topicData.partitionData.toList.map(d => new PartitionDataSend(d._1, d._2))))
 
-  def writeTo(channel: GatheringByteChannel): Int = {
-    expectIncomplete()
-    var written = 0
+  override def writeTo(channel: GatheringByteChannel): Long = {
+    if (completed)
+      throw new KafkaException("This operation cannot be completed on a complete request.")
+
+    var written = 0L
     if(buffer.hasRemaining)
       written += channel.write(buffer)
-    if(!buffer.hasRemaining && !sends.complete) {
+    if(!buffer.hasRemaining && !sends.completed) {
       written += sends.writeTo(channel)
     }
     sent += written
@@ -200,34 +209,36 @@ case class FetchResponse(correlationId: Int, data: Map[TopicAndPartition, FetchR
 }
 
 
-class FetchResponseSend(val fetchResponse: FetchResponse) extends Send {
-  private val size = fetchResponse.sizeInBytes
+class FetchResponseSend(val dest: String, val fetchResponse: FetchResponse) extends Send {
+  private val payloadSize = fetchResponse.sizeInBytes
+
+  private var sent = 0L
 
-  private var sent = 0
+  override def size = 4 /* for size byte */ + payloadSize
 
-  private val sendSize = 4 /* for size */ + size
+  override def completed = sent >= size
 
-  override def complete = sent >= sendSize
+  override def destination = dest
 
   private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize)
-  buffer.putInt(size)
+  buffer.putInt(payloadSize)
   buffer.putInt(fetchResponse.correlationId)
   buffer.putInt(fetchResponse.dataGroupedByTopic.size) // topic count
   buffer.rewind()
 
-  val sends = new MultiSend(fetchResponse.dataGroupedByTopic.toList.map {
-    case(topic, data) => new TopicDataSend(TopicData(topic,
+  private val sends = new MultiSend(dest, JavaConversions.seqAsJavaList(fetchResponse.dataGroupedByTopic.toList.map {
+    case(topic, data) => new TopicDataSend(dest, TopicData(topic,
                                                      data.map{case(topicAndPartition, message) => (topicAndPartition.partition, message)}))
-  }) {
-    val expectedBytesToWrite = fetchResponse.sizeInBytes - FetchResponse.headerSize
-  }
+    }))
+
+  override def writeTo(channel: GatheringByteChannel): Long = {
+    if (completed)
+      throw new KafkaException("This operation cannot be completed on a complete request.")
 
-  def writeTo(channel: GatheringByteChannel):Int = {
-    expectIncomplete()
-    var written = 0
+    var written = 0L
     if(buffer.hasRemaining)
       written += channel.write(buffer)
-    if(!buffer.hasRemaining && !sends.complete) {
+    if(!buffer.hasRemaining && !sends.completed) {
       written += sends.writeTo(channel)
     }
     sent += written

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index 431190a..c2584e0 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -19,14 +19,16 @@
 package kafka.api
 
 import java.nio._
-import kafka.utils._
+
 import kafka.api.ApiUtils._
 import kafka.cluster.BrokerEndPoint
-import kafka.controller.LeaderIsrAndControllerEpoch
-import kafka.network.{BoundedByteBufferSend, RequestChannel}
 import kafka.common.ErrorMapping
+import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
-import collection.Set
+import kafka.utils._
+
+import scala.collection.Set
 
 
 object LeaderAndIsr {
@@ -184,7 +186,7 @@ case class LeaderAndIsrRequest (versionId: Short,
       case (topicAndPartition, partitionAndState) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
     }
     val errorResponse = LeaderAndIsrResponse(correlationId, responseMap)
-    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
   }
 
   override def describe(details: Boolean): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index 317daed..5b362ef 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@ -18,11 +18,13 @@
 package kafka.api
 
 import java.nio.ByteBuffer
+
 import kafka.api.ApiUtils._
-import kafka.utils.{SystemTime, Logging}
-import kafka.network.{RequestChannel, BoundedByteBufferSend}
-import kafka.common.{OffsetMetadata, OffsetAndMetadata, ErrorMapping, TopicAndPartition}
+import kafka.common.{ErrorMapping, OffsetAndMetadata, TopicAndPartition}
+import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
+import kafka.utils.Logging
+
 import scala.collection._
 
 object OffsetCommitRequest extends Logging {
@@ -162,7 +164,7 @@ case class OffsetCommitRequest(groupId: String,
     val commitStatus = requestInfo.mapValues(_ => errorCode)
     val commitResponse = OffsetCommitResponse(commitStatus, correlationId)
 
-    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(commitResponse)))
+    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, commitResponse)))
   }
 
   override def describe(details: Boolean): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
index fa8bd6a..a83e147 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
@@ -17,16 +17,13 @@
 
 package kafka.api
 
+import java.nio.ByteBuffer
+
 import kafka.api.ApiUtils._
-import kafka.utils.Logging
-import kafka.network.{BoundedByteBufferSend, RequestChannel}
-import kafka.common._
-import kafka.common.TopicAndPartition
+import kafka.common.{TopicAndPartition, _}
+import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
-
-import scala.Some
-
-import java.nio.ByteBuffer
+import kafka.utils.Logging
 
 object OffsetFetchRequest extends Logging {
   val CurrentVersion: Short = 1
@@ -99,7 +96,7 @@ case class OffsetFetchRequest(groupId: String,
       ))
     }.toMap
     val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId)
-    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
   }
 
   override def describe(details: Boolean): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala
index 3d483bc..f418868 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -18,9 +18,10 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import kafka.common.{ErrorMapping, TopicAndPartition}
+
 import kafka.api.ApiUtils._
-import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
 
 
@@ -117,7 +118,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
         (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), null))
     }
     val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap)
-    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
   }
 
   override def describe(details: Boolean): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index 570b2da..c866180 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -18,11 +18,12 @@
 package kafka.api
 
 import java.nio._
-import kafka.message._
+
 import kafka.api.ApiUtils._
 import kafka.common._
+import kafka.message._
+import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
-import kafka.network.{RequestChannel, BoundedByteBufferSend}
 
 object ProducerRequest {
   val CurrentVersion = 0.shortValue
@@ -136,7 +137,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
           (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l))
       }
       val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
-      requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+      requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/RequestKeys.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala
index ef7a86e..155cb65 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -20,6 +20,8 @@ package kafka.api
 import kafka.common.KafkaException
 import java.nio.ByteBuffer
 
+import kafka.network.InvalidRequestException
+
 object RequestKeys {
   val ProduceKey: Short = 0
   val FetchKey: Short = 1
@@ -59,7 +61,7 @@ object RequestKeys {
   def deserializerForKey(key: Short): (ByteBuffer) => RequestOrResponse = {
     keyToNameAndDeserializerMap.get(key) match {
       case Some(nameAndSerializer) => nameAndSerializer._2
-      case None => throw new KafkaException("Wrong request type %d".format(key))
+      case None => throw new InvalidRequestException("Wrong request type %d".format(key))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/StopReplicaRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
index 5e14987..4441fc6 100644
--- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
@@ -19,7 +19,7 @@ package kafka.api
 
 import java.nio._
 import kafka.api.ApiUtils._
-import kafka.network.{BoundedByteBufferSend, RequestChannel, InvalidRequestException}
+import kafka.network.{RequestOrResponseSend, RequestChannel, InvalidRequestException}
 import kafka.common.{TopicAndPartition, ErrorMapping}
 import kafka.network.RequestChannel.Response
 import kafka.utils.Logging
@@ -106,7 +106,7 @@ case class StopReplicaRequest(versionId: Short,
       case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
     }.toMap
     val errorResponse = StopReplicaResponse(correlationId, responseMap)
-    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
   }
 
   override def describe(details: Boolean): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index 363bae0..401c583 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -18,13 +18,15 @@
 package kafka.api
 
 import java.nio.ByteBuffer
+
 import kafka.api.ApiUtils._
-import collection.mutable.ListBuffer
-import kafka.network.{BoundedByteBufferSend, RequestChannel}
 import kafka.common.ErrorMapping
+import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
 import kafka.utils.Logging
 
+import scala.collection.mutable.ListBuffer
+
 object TopicMetadataRequest extends Logging {
   val CurrentVersion = 0.shortValue
   val DefaultClientId = ""
@@ -80,7 +82,7 @@ case class TopicMetadataRequest(versionId: Short,
       topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
     }
     val errorResponse = TopicMetadataResponse(Seq(), topicMetadata, correlationId)
-    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
   }
 
   override def describe(details: Boolean): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
index 69f0397..d59de82 100644
--- a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
@@ -21,8 +21,8 @@ import java.nio.ByteBuffer
 import kafka.api.ApiUtils._
 import kafka.cluster.{Broker, BrokerEndPoint}
 import kafka.common.{ErrorMapping, KafkaException, TopicAndPartition}
+import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
-import kafka.network.{BoundedByteBufferSend, RequestChannel}
 import org.apache.kafka.common.protocol.SecurityProtocol
 
 import scala.collection.Set
@@ -128,7 +128,7 @@ case class UpdateMetadataRequest (versionId: Short,
 
   override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
     val errorResponse = new UpdateMetadataResponse(correlationId, ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]]))
-    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
   }
 
   override def describe(details: Boolean): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 62394c0..68c7e7f 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -153,7 +153,7 @@ object ClientUtils extends Logging{
            debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, group))
            queryChannel.send(ConsumerMetadataRequest(group))
            val response = queryChannel.receive()
-           val consumerMetadataResponse =  ConsumerMetadataResponse.readFrom(response.buffer)
+           val consumerMetadataResponse =  ConsumerMetadataResponse.readFrom(response.payload())
            debug("Consumer metadata response: " + consumerMetadataResponse.toString)
            if (consumerMetadataResponse.errorCode == ErrorMapping.NoError)
              coordinatorOpt = consumerMetadataResponse.coordinatorOpt

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 31a2639..c16f7ed 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -24,6 +24,7 @@ import kafka.api._
 import kafka.network._
 import kafka.utils._
 import kafka.common.{ErrorMapping, TopicAndPartition}
+import org.apache.kafka.common.network.{NetworkReceive, Receive}
 import org.apache.kafka.common.utils.Utils._
 
 /**
@@ -65,9 +66,9 @@ class SimpleConsumer(val host: String,
     }
   }
   
-  private def sendRequest(request: RequestOrResponse): Receive = {
+  private def sendRequest(request: RequestOrResponse): NetworkReceive = {
     lock synchronized {
-      var response: Receive = null
+      var response: NetworkReceive = null
       try {
         getOrMakeConnection()
         blockingChannel.send(request)
@@ -94,12 +95,12 @@ class SimpleConsumer(val host: String,
 
   def send(request: TopicMetadataRequest): TopicMetadataResponse = {
     val response = sendRequest(request)
-    TopicMetadataResponse.readFrom(response.buffer)
+    TopicMetadataResponse.readFrom(response.payload())
   }
 
   def send(request: ConsumerMetadataRequest): ConsumerMetadataResponse = {
     val response = sendRequest(request)
-    ConsumerMetadataResponse.readFrom(response.buffer)
+    ConsumerMetadataResponse.readFrom(response.payload())
   }
 
   /**
@@ -109,7 +110,7 @@ class SimpleConsumer(val host: String,
    *  @return a set of fetched messages
    */
   def fetch(request: FetchRequest): FetchResponse = {
-    var response: Receive = null
+    var response: NetworkReceive = null
     val specificTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestTimer
     val aggregateTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestTimer
     aggregateTimer.time {
@@ -117,7 +118,7 @@ class SimpleConsumer(val host: String,
         response = sendRequest(request)
       }
     }
-    val fetchResponse = FetchResponse.readFrom(response.buffer)
+    val fetchResponse = FetchResponse.readFrom(response.payload())
     val fetchedSize = fetchResponse.sizeInBytes
     fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestSizeHist.update(fetchedSize)
     fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestSizeHist.update(fetchedSize)
@@ -129,7 +130,7 @@ class SimpleConsumer(val host: String,
    *  @param request a [[kafka.api.OffsetRequest]] object.
    *  @return a [[kafka.api.OffsetResponse]] object.
    */
-  def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).buffer)
+  def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).payload())
 
   /**
    * Commit offsets for a topic
@@ -140,7 +141,7 @@ class SimpleConsumer(val host: String,
   def commitOffsets(request: OffsetCommitRequest) = {
     // TODO: With KAFKA-1012, we have to first issue a ConsumerMetadataRequest and connect to the coordinator before
     // we can commit offsets.
-    OffsetCommitResponse.readFrom(sendRequest(request).buffer)
+    OffsetCommitResponse.readFrom(sendRequest(request).payload())
   }
 
   /**
@@ -149,7 +150,7 @@ class SimpleConsumer(val host: String,
    * @param request a [[kafka.api.OffsetFetchRequest]] object.
    * @return a [[kafka.api.OffsetFetchResponse]] object.
    */
-  def fetchOffsets(request: OffsetFetchRequest) = OffsetFetchResponse.readFrom(sendRequest(request).buffer)
+  def fetchOffsets(request: OffsetFetchRequest) = OffsetFetchResponse.readFrom(sendRequest(request).payload())
 
   private def getOrMakeConnection() {
     if(!isClosed && !blockingChannel.isConnected) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index a7f2acc..e42d104 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -334,7 +334,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
             try {
               kafkaCommitMeter.mark(offsetsToCommit.size)
               offsetsChannel.send(offsetCommitRequest)
-              val offsetCommitResponse = OffsetCommitResponse.readFrom(offsetsChannel.receive().buffer)
+              val offsetCommitResponse = OffsetCommitResponse.readFrom(offsetsChannel.receive().payload())
               trace("Offset commit response: %s.".format(offsetCommitResponse))
 
               val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = {
@@ -421,7 +421,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
           ensureOffsetManagerConnected()
           try {
             offsetsChannel.send(offsetFetchRequest)
-            val offsetFetchResponse = OffsetFetchResponse.readFrom(offsetsChannel.receive().buffer)
+            val offsetFetchResponse = OffsetFetchResponse.readFrom(offsetsChannel.receive().payload())
             trace("Offset fetch response: %s.".format(offsetFetchResponse))
 
             val (leaderChanged, loadInProgress) =

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 6cf13f0..9f521fa 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -16,8 +16,9 @@
 */
 package kafka.controller
 
-import kafka.network.{Receive, BlockingChannel}
+import kafka.network.BlockingChannel
 import kafka.utils.{CoreUtils, Logging, ShutdownableThread}
+import org.apache.kafka.common.network.NetworkReceive
 import collection.mutable.HashMap
 import kafka.cluster.Broker
 import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
@@ -120,7 +121,7 @@ class RequestSendThread(val controllerId: Int,
     val queueItem = queue.take()
     val request = queueItem._1
     val callback = queueItem._2
-    var receive: Receive = null
+    var receive: NetworkReceive = null
     try {
       lock synchronized {
         var isSendSuccessful = false
@@ -147,11 +148,11 @@ class RequestSendThread(val controllerId: Int,
           var response: RequestOrResponse = null
           request.requestId.get match {
             case RequestKeys.LeaderAndIsrKey =>
-              response = LeaderAndIsrResponse.readFrom(receive.buffer)
+              response = LeaderAndIsrResponse.readFrom(receive.payload())
             case RequestKeys.StopReplicaKey =>
-              response = StopReplicaResponse.readFrom(receive.buffer)
+              response = StopReplicaResponse.readFrom(receive.payload())
             case RequestKeys.UpdateMetadataKey =>
-              response = UpdateMetadataResponse.readFrom(receive.buffer)
+              response = UpdateMetadataResponse.readFrom(receive.payload())
           }
           stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s"
             .format(controllerId, controllerContext.epoch, response.toString, toBroker.toString))

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
index b0b7be1..568d0ac 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
@@ -16,12 +16,11 @@
  */
 package kafka.javaapi
 
-import kafka.api._
 import java.nio.ByteBuffer
+
+import kafka.api._
+
 import scala.collection.mutable
-import kafka.network.{BoundedByteBufferSend, RequestChannel}
-import kafka.common.ErrorMapping
-import kafka.network.RequestChannel.Response
 
 class TopicMetadataRequest(val versionId: Short,
                            val correlationId: Int,

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/network/BlockingChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala
index 6e2a38e..1197259 100644
--- a/core/src/main/scala/kafka/network/BlockingChannel.scala
+++ b/core/src/main/scala/kafka/network/BlockingChannel.scala
@@ -19,8 +19,10 @@ package kafka.network
 
 import java.net.InetSocketAddress
 import java.nio.channels._
-import kafka.utils.{nonthreadsafe, Logging}
+
 import kafka.api.RequestOrResponse
+import kafka.utils.{Logging, nonthreadsafe}
+import org.apache.kafka.common.network.NetworkReceive
 
 
 object BlockingChannel{
@@ -43,6 +45,7 @@ class BlockingChannel( val host: String,
   private var writeChannel: GatheringByteChannel = null
   private val lock = new Object()
   private val connectTimeoutMs = readTimeoutMs
+  private var connectionId: String = ""
 
   def connect() = lock synchronized  {
     if(!connected) {
@@ -59,8 +62,15 @@ class BlockingChannel( val host: String,
         channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs)
 
         writeChannel = channel
+        // Need to create a new ReadableByteChannel from input stream because SocketChannel doesn't implement read with timeout
+        // See: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work
         readChannel = Channels.newChannel(channel.socket().getInputStream)
         connected = true
+        val localHost = channel.socket.getLocalAddress.getHostAddress
+        val localPort = channel.socket.getLocalPort
+        val remoteHost = channel.socket.getInetAddress.getHostAddress
+        val remotePort = channel.socket.getPort
+        connectionId = localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort
         // settings may not match what we requested above
         val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested %d), SO_SNDBUF = %d (requested %d), connectTimeoutMs = %d."
         debug(msg.format(channel.socket.getSoTimeout,
@@ -95,20 +105,21 @@ class BlockingChannel( val host: String,
 
   def isConnected = connected
 
-  def send(request: RequestOrResponse):Int = {
+  def send(request: RequestOrResponse): Long = {
     if(!connected)
       throw new ClosedChannelException()
 
-    val send = new BoundedByteBufferSend(request)
+    val send = new RequestOrResponseSend(connectionId, request)
     send.writeCompletely(writeChannel)
   }
   
-  def receive(): Receive = {
+  def receive(): NetworkReceive = {
     if(!connected)
       throw new ClosedChannelException()
 
-    val response = new BoundedByteBufferReceive()
+    val response = new NetworkReceive()
     response.readCompletely(readChannel)
+    response.payload().rewind()
 
     response
   }