You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/11/19 09:34:45 UTC

kafka git commit: KAFKA-3703; Graceful close for consumers and producer with acks=0

Repository: kafka
Updated Branches:
  refs/heads/trunk edfa28067 -> e53babab9


KAFKA-3703; Graceful close for consumers and producer with acks=0

Process requests received from channels before they were closed. For consumers, wait for coordinator requests to complete before returning from close.

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Jason Gustafson <ja...@confluent.io>, Jun Rao <ju...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #1836 from rajinisivaram/KAFKA-3703


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e53babab
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e53babab
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e53babab

Branch: refs/heads/trunk
Commit: e53babab9cada20cc54a18c0fd63aa5ab84fd012
Parents: edfa280
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Sat Nov 19 09:33:48 2016 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sat Nov 19 09:33:48 2016 +0000

----------------------------------------------------------------------
 .../consumer/internals/ConsumerCoordinator.java |  14 ++
 .../kafka/common/network/KafkaChannel.java      |  21 ++-
 .../apache/kafka/common/network/Selector.java   | 133 ++++++++++++++-----
 .../kafka/common/network/SslTransportLayer.java |   8 +-
 .../clients/consumer/KafkaConsumerTest.java     |  81 +++++++++++
 .../kafka/common/network/NioEchoServer.java     |  46 +++++--
 .../common/network/SslTransportLayerTest.java   |  48 ++++++-
 .../main/scala/kafka/network/SocketServer.scala |  13 +-
 .../unit/kafka/network/SocketServerTest.scala   |  15 +++
 9 files changed, 321 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e53babab/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 4889872..56f6951 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -63,6 +63,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
     private static final Logger log = LoggerFactory.getLogger(ConsumerCoordinator.class);
 
+    private static final long CLOSE_TIMEOUT_MS = 5000;
+
     private final List<PartitionAssignor> assignors;
     private final Metadata metadata;
     private final ConsumerCoordinatorMetrics sensors;
@@ -405,6 +407,18 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         client.disableWakeups();
         try {
             maybeAutoCommitOffsetsSync();
+
+            Node coordinator;
+            long endTimeMs = time.milliseconds() + CLOSE_TIMEOUT_MS;
+            while ((coordinator = coordinator()) != null && client.pendingRequestCount(coordinator) > 0) {
+                long remainingTimeMs = endTimeMs - time.milliseconds();
+                if (remainingTimeMs > 0)
+                    client.poll(remainingTimeMs);
+                else {
+                    log.warn("Close timed out with {} pending requests to coordinator, terminating client connections for group {}.", client.pendingRequestCount(coordinator), groupId);
+                    break;
+                }
+            }
         } finally {
             super.close();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e53babab/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index 16002eb..9b05aeb 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -35,15 +35,22 @@ public class KafkaChannel {
     private final int maxReceiveSize;
     private NetworkReceive receive;
     private Send send;
+    // Track connection and mute state of channels to enable outstanding requests on channels to be
+    // processed after the channel is disconnected.
+    private boolean disconnected;
+    private boolean muted;
 
     public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize) throws IOException {
         this.id = id;
         this.transportLayer = transportLayer;
         this.authenticator = authenticator;
         this.maxReceiveSize = maxReceiveSize;
+        this.disconnected = false;
+        this.muted = false;
     }
 
     public void close() throws IOException {
+        this.disconnected = true;
         Utils.closeAll(transportLayer, authenticator);
     }
 
@@ -65,6 +72,7 @@ public class KafkaChannel {
     }
 
     public void disconnect() {
+        disconnected = true;
         transportLayer.disconnect();
     }
 
@@ -82,15 +90,22 @@ public class KafkaChannel {
     }
 
     public void mute() {
-        transportLayer.removeInterestOps(SelectionKey.OP_READ);
+        if (!disconnected)
+            transportLayer.removeInterestOps(SelectionKey.OP_READ);
+        muted = true;
     }
 
     public void unmute() {
-        transportLayer.addInterestOps(SelectionKey.OP_READ);
+        if (!disconnected)
+            transportLayer.addInterestOps(SelectionKey.OP_READ);
+        muted = false;
     }
 
+    /**
+     * Returns true if this channel has been explicitly muted using {@link KafkaChannel#mute()}
+     */
     public boolean isMute() {
-        return transportLayer.isMute();
+        return muted;
     }
 
     public boolean ready() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e53babab/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 5244710..df35266 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -86,6 +86,7 @@ public class Selector implements Selectable {
     private final List<NetworkReceive> completedReceives;
     private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
     private final Set<SelectionKey> immediatelyConnectedKeys;
+    private final Map<String, KafkaChannel> closingChannels;
     private final List<String> disconnected;
     private final List<String> connected;
     private final List<String> failedSends;
@@ -132,6 +133,7 @@ public class Selector implements Selectable {
         this.completedReceives = new ArrayList<>();
         this.stagedReceives = new HashMap<>();
         this.immediatelyConnectedKeys = new HashSet<>();
+        this.closingChannels = new HashMap<>();
         this.connected = new ArrayList<>();
         this.disconnected = new ArrayList<>();
         this.failedSends = new ArrayList<>();
@@ -237,12 +239,17 @@ public class Selector implements Selectable {
      * @param send The request to send
      */
     public void send(Send send) {
-        KafkaChannel channel = channelOrFail(send.destination());
-        try {
-            channel.setSend(send);
-        } catch (CancelledKeyException e) {
-            this.failedSends.add(send.destination());
-            close(channel);
+        String connectionId = send.destination();
+        if (closingChannels.containsKey(connectionId))
+            this.failedSends.add(connectionId);
+        else {
+            KafkaChannel channel = channelOrFail(connectionId, false);
+            try {
+                channel.setSend(send);
+            } catch (CancelledKeyException e) {
+                this.failedSends.add(connectionId);
+                close(channel, false);
+            }
         }
     }
 
@@ -266,6 +273,11 @@ public class Selector implements Selectable {
      * the poll to add the completedReceives. If there are any active channels in the "stagedReceives" we set "timeout" to 0
      * and pop response and add to the completedReceives.
      *
+     * Atmost one entry is added to "completedReceives" for a channel in each poll. This is necessary to guarantee that
+     * requests from a channel are processed on the broker in the order they are sent. Since outstanding requests added
+     * by SocketServer to the request queue may be processed by different request handler threads, requests on each
+     * channel must be processed one-at-a-time to guarantee ordering.
+     *
      * @param timeout The amount of time to wait, in milliseconds, which must be non-negative
      * @throws IllegalArgumentException If `timeout` is negative
      * @throws IllegalStateException If a send is given for which we have no existing connection or for which there is
@@ -354,10 +366,8 @@ public class Selector implements Selectable {
                 }
 
                 /* cancel any defunct sockets */
-                if (!key.isValid()) {
-                    close(channel);
-                    this.disconnected.add(channel.id());
-                }
+                if (!key.isValid())
+                    close(channel, true);
 
             } catch (Exception e) {
                 String desc = channel.socketDescription();
@@ -365,8 +375,7 @@ public class Selector implements Selectable {
                     log.debug("Connection with {} disconnected", desc, e);
                 else
                     log.warn("Unexpected error from {}; closing connection", desc, e);
-                close(channel);
-                this.disconnected.add(channel.id());
+                close(channel, true);
             }
         }
     }
@@ -393,7 +402,7 @@ public class Selector implements Selectable {
 
     @Override
     public void mute(String id) {
-        KafkaChannel channel = channelOrFail(id);
+        KafkaChannel channel = channelOrFail(id, true);
         mute(channel);
     }
 
@@ -403,7 +412,7 @@ public class Selector implements Selectable {
 
     @Override
     public void unmute(String id) {
-        KafkaChannel channel = channelOrFail(id);
+        KafkaChannel channel = channelOrFail(id, true);
         unmute(channel);
     }
 
@@ -430,13 +439,13 @@ public class Selector implements Selectable {
         Map.Entry<String, Long> expiredConnection = idleExpiryManager.pollExpiredConnection(currentTimeNanos);
         if (expiredConnection != null) {
             String connectionId = expiredConnection.getKey();
-
-            if (log.isTraceEnabled())
-                log.trace("About to close the idle connection from {} due to being idle for {} millis",
-                        connectionId, (currentTimeNanos - expiredConnection.getValue()) / 1000 / 1000);
-
-            disconnected.add(connectionId);
-            close(connectionId);
+            KafkaChannel channel = this.channels.get(connectionId);
+            if (channel != null) {
+                if (log.isTraceEnabled())
+                    log.trace("About to close the idle connection from {} due to being idle for {} millis",
+                            connectionId, (currentTimeNanos - expiredConnection.getValue()) / 1000 / 1000);
+                close(channel, true);
+            }
         }
     }
 
@@ -448,6 +457,16 @@ public class Selector implements Selectable {
         this.completedReceives.clear();
         this.connected.clear();
         this.disconnected.clear();
+        // Remove closed channels after all their staged receives have been processed or if a send was requested
+        for (Iterator<Map.Entry<String, KafkaChannel>> it = closingChannels.entrySet().iterator(); it.hasNext(); ) {
+            KafkaChannel channel = it.next().getValue();
+            Deque<NetworkReceive> deque = this.stagedReceives.get(channel);
+            boolean sendFailed = failedSends.remove(channel.id());
+            if (deque == null || deque.isEmpty() || sendFailed) {
+                doClose(channel, true);
+                it.remove();
+            }
+        }
         this.disconnected.addAll(this.failedSends);
         this.failedSends.clear();
     }
@@ -476,27 +495,58 @@ public class Selector implements Selectable {
     public void close(String id) {
         KafkaChannel channel = this.channels.get(id);
         if (channel != null)
-            close(channel);
+            close(channel, false);
     }
 
     /**
-     * Begin closing this connection
+     * Begin closing this connection.
+     *
+     * If 'processOutstanding' is true, the channel is disconnected here, but staged receives are
+     * processed. The channel is closed when there are no outstanding receives or if a send
+     * is requested. The channel will be added to disconnect list when it is actually closed.
+     *
+     * If 'processOutstanding' is false, outstanding receives are discarded and the channel is
+     * closed immediately. The channel will not be added to disconnected list and it is the
+     * responsibility of the caller to handle disconnect notifications.
      */
-    private void close(KafkaChannel channel) {
+    private void close(KafkaChannel channel, boolean processOutstanding) {
+
+        channel.disconnect();
+
+        // Keep track of closed channels with pending receives so that all received records
+        // may be processed. For example, when producer with acks=0 sends some records and
+        // closes its connections, a single poll() in the broker may receive records and
+        // handle close(). When the remote end closes its connection, the channel is retained until
+        // a send fails or all outstanding receives are processed. Mute state of disconnected channels
+        // are tracked to ensure that requests are processed one-by-one by the broker to preserve ordering.
+        Deque<NetworkReceive> deque = this.stagedReceives.get(channel);
+        if (processOutstanding && deque != null && !deque.isEmpty()) {
+            if (!channel.isMute()) {
+                addToCompletedReceives(channel, deque);
+                if (deque.isEmpty())
+                    this.stagedReceives.remove(channel);
+            }
+            closingChannels.put(channel.id(), channel);
+        } else
+            doClose(channel, processOutstanding);
+        this.channels.remove(channel.id());
+
+        if (idleExpiryManager != null)
+            idleExpiryManager.remove(channel.id());
+    }
+
+    private void doClose(KafkaChannel channel, boolean notifyDisconnect) {
         try {
             channel.close();
         } catch (IOException e) {
             log.error("Exception closing connection to node {}:", channel.id(), e);
         }
-        this.stagedReceives.remove(channel);
-        this.channels.remove(channel.id());
         this.sensors.connectionClosed.record();
-
-        if (idleExpiryManager != null)
-            idleExpiryManager.remove(channel.id());
+        this.stagedReceives.remove(channel);
+        if (notifyDisconnect)
+            this.disconnected.add(channel.id());
     }
 
-
     /**
      * check if channel is ready
      */
@@ -506,10 +556,12 @@ public class Selector implements Selectable {
         return channel != null && channel.ready();
     }
 
-    private KafkaChannel channelOrFail(String id) {
+    private KafkaChannel channelOrFail(String id, boolean maybeClosing) {
         KafkaChannel channel = this.channels.get(id);
+        if (channel == null && maybeClosing)
+            channel = this.closingChannels.get(id);
         if (channel == null)
-            throw new IllegalStateException("Attempt to retrieve channel for which there is no open connection. Connection id " + id + " existing connections " + channels.keySet());
+            throw new IllegalStateException("Attempt to retrieve channel for which there is no connection. Connection id " + id + " existing connections " + channels.keySet());
         return channel;
     }
 
@@ -529,6 +581,14 @@ public class Selector implements Selectable {
     }
 
     /**
+     * Return the channel with the specified id if it was disconnected, but not yet closed
+     * since there are outstanding messages to be processed.
+     */
+    public KafkaChannel closingChannel(String id) {
+        return closingChannels.get(id);
+    }
+
+    /**
      * Get the channel associated with selectionKey
      */
     private KafkaChannel channel(SelectionKey key) {
@@ -576,9 +636,7 @@ public class Selector implements Selectable {
                 KafkaChannel channel = entry.getKey();
                 if (!channel.isMute()) {
                     Deque<NetworkReceive> deque = entry.getValue();
-                    NetworkReceive networkReceive = deque.poll();
-                    this.completedReceives.add(networkReceive);
-                    this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
+                    addToCompletedReceives(channel, deque);
                     if (deque.isEmpty())
                         iter.remove();
                 }
@@ -586,6 +644,11 @@ public class Selector implements Selectable {
         }
     }
 
+    private void addToCompletedReceives(KafkaChannel channel, Deque<NetworkReceive> stagedDeque) {
+        NetworkReceive networkReceive = stagedDeque.poll();
+        this.completedReceives.add(networkReceive);
+        this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
+    }
 
     private class SelectorMetrics {
         private final Metrics metrics;

http://git-wip-us.apache.org/repos/asf/kafka/blob/e53babab/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index ffa980b..9ccf33d 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -447,7 +447,7 @@ public class SslTransportLayer implements TransportLayer {
             netReadBuffer = Utils.ensureCapacity(netReadBuffer, netReadBufferSize());
             if (netReadBuffer.remaining() > 0) {
                 int netread = socketChannel.read(netReadBuffer);
-                if (netread == 0 && netReadBuffer.position() == 0) return netread;
+                if (netread == 0 && netReadBuffer.position() == 0) return read;
                 else if (netread < 0) throw new EOFException("EOF during read");
             }
             do {
@@ -488,7 +488,11 @@ public class SslTransportLayer implements TransportLayer {
                     }
                     break;
                 } else if (unwrapResult.getStatus() == Status.CLOSED) {
-                    throw new EOFException();
+                    // If data has been read and unwrapped, return the data. Close will be handled on the next poll.
+                    if (appReadBuffer.position() == 0 && read == 0)
+                        throw new EOFException();
+                    else
+                        break;
                 }
             } while (netReadBuffer.position() != 0);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e53babab/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index fa30a4b..e34f438 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -72,6 +72,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 
@@ -831,6 +836,7 @@ public class KafkaConsumerTest {
         assertTrue(consumer.subscription().isEmpty());
         assertTrue(consumer.assignment().isEmpty());
 
+        client.requests().clear();
         consumer.close();
     }
 
@@ -905,6 +911,7 @@ public class KafkaConsumerTest {
         for (ClientRequest req: client.requests())
             assertTrue(req.header().apiKey() != ApiKeys.OFFSET_COMMIT.id);
 
+        client.requests().clear();
         consumer.close();
     }
 
@@ -969,6 +976,7 @@ public class KafkaConsumerTest {
         // verify that the offset commits occurred as expected
         assertTrue(commitReceived.get());
 
+        client.requests().clear();
         consumer.close();
     }
 
@@ -1032,6 +1040,7 @@ public class KafkaConsumerTest {
         for (ClientRequest req : client.requests())
             assertTrue(req.header().apiKey() != ApiKeys.OFFSET_COMMIT.id);
 
+        client.requests().clear();
         consumer.close();
     }
 
@@ -1067,6 +1076,78 @@ public class KafkaConsumerTest {
         }
     }
 
+    @Test
+    public void testGracefulClose() throws Exception {
+        consumerCloseTest(true);
+    }
+
+    @Test
+    public void testCloseTimeout() throws Exception {
+        consumerCloseTest(false);
+    }
+
+    private void consumerCloseTest(boolean graceful) throws Exception {
+        int rebalanceTimeoutMs = 60000;
+        int sessionTimeoutMs = 30000;
+        int heartbeatIntervalMs = 5000;
+
+        Time time = new MockTime();
+        Cluster cluster = TestUtils.singletonCluster(topic, 1);
+        Node node = cluster.nodes().get(0);
+
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        metadata.update(cluster, time.milliseconds());
+
+        MockClient client = new MockClient(time);
+        client.setNode(node);
+        PartitionAssignor assignor = new RoundRobinAssignor();
+
+        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
+                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, 1000);
+
+        consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer));
+        Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);
+
+        // Poll with responses
+        client.prepareResponseFrom(fetchResponse(tp0, 0, 1), node);
+        client.prepareResponseFrom(fetchResponse(tp0, 1, 0), node);
+        consumer.poll(0);
+
+        // Initiate close() after a commit request on another thread.
+        // Kafka consumer is single-threaded, but the implementation allows calls on a
+        // different thread as long as the calls are not executed concurrently. So this is safe.
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        try {
+            Future<?> future = executor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    consumer.commitAsync();
+                    consumer.close();
+                }
+            });
+
+            // Close task should not complete until commit succeeds or close times out
+            try {
+                future.get(100, TimeUnit.MILLISECONDS);
+                fail("Close completed without waiting for commit response");
+            } catch (TimeoutException e) {
+                // Expected exception
+            }
+
+            // In graceful mode, commit response results in close() completing immediately without a timeout
+            // In non-graceful mode, close() times out without an exception even though commit response is pending
+            if (graceful) {
+                Map<TopicPartition, Short> response = new HashMap<>();
+                response.put(tp0, Errors.NONE.code());
+                client.respondFrom(offsetCommitResponse(response), coordinator);
+            } else
+                time.sleep(5000);
+            future.get(500, TimeUnit.MILLISECONDS); // Should succeed without TimeoutException or ExecutionException
+        } finally {
+            executor.shutdownNow();
+        }
+    }
+
     private ConsumerRebalanceListener getConsumerRebalanceListener(final KafkaConsumer<String, String> consumer) {
         return new ConsumerRebalanceListener() {
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/e53babab/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index e99a399..cc4befa 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -14,15 +14,16 @@ package org.apache.kafka.common.network;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
+import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.SecurityProtocol;
@@ -40,7 +41,7 @@ public class NioEchoServer extends Thread {
     private final List<SocketChannel> socketChannels;
     private final AcceptorThread acceptorThread;
     private final Selector selector;
-    private final ConcurrentLinkedQueue<NetworkSend> inflightSends = new ConcurrentLinkedQueue<NetworkSend>();
+    private volatile WritableByteChannel outputChannel;
 
     public NioEchoServer(SecurityProtocol securityProtocol, Map<String, ?> configs, String serverHost) throws Exception {
         serverSocketChannel = ServerSocketChannel.open();
@@ -72,22 +73,23 @@ public class NioEchoServer extends Thread {
                     socketChannels.add(socketChannel);
                 }
                 newChannels.clear();
-                while (true) {
-                    NetworkSend send = inflightSends.peek();
-                    if (send != null && !selector.channel(send.destination()).hasSend()) {
-                        send = inflightSends.poll();
-                        selector.send(send);
-                    } else
-                        break;
-                }
+
                 List<NetworkReceive> completedReceives = selector.completedReceives();
                 for (NetworkReceive rcv : completedReceives) {
+                    KafkaChannel channel = channel(rcv.source());
+                    channel.mute();
                     NetworkSend send = new NetworkSend(rcv.source(), rcv.payload());
-                    if (!selector.channel(send.destination()).hasSend())
+                    if (outputChannel == null)
                         selector.send(send);
-                    else
-                        inflightSends.add(send);
+                    else {
+                        for (ByteBuffer buffer : send.buffers)
+                            outputChannel.write(buffer);
+                        channel.unmute();
+                    }
                 }
+                for (Send send : selector.completedSends())
+                    selector.unmute(send.destination());
+
             }
         } catch (IOException e) {
             // ignore
@@ -99,6 +101,24 @@ public class NioEchoServer extends Thread {
                 channel.socket().getInetAddress().getHostAddress() + ":" + channel.socket().getPort();
     }
 
+    private KafkaChannel channel(String id) {
+        KafkaChannel channel = selector.channel(id);
+        return channel == null ? selector.closingChannel(id) : channel;
+    }
+
+    /**
+     * Sets the output channel to which messages received on this server are echoed.
+     * This is useful in tests where the clients sending the messages don't receive
+     * the responses (eg. testing graceful close).
+     */
+    public void outputChannel(WritableByteChannel channel) {
+        this.outputChannel = channel;
+    }
+
+    public Selector selector() {
+        return selector;
+    }
+
     public void closeConnections() throws IOException {
         for (SocketChannel channel : socketChannels)
             channel.close();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e53babab/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index a044dc9..01d8a25 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -16,10 +16,12 @@ import static org.junit.Assert.fail;
 
 import java.util.Arrays;
 import java.util.Map;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 
@@ -32,6 +34,8 @@ import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.common.config.types.Password;
 import org.junit.After;
 import org.junit.Before;
@@ -390,7 +394,49 @@ public class SslTransportLayerTest {
 
         NetworkTestUtils.checkClientConnection(selector, node, 64000, 10);
     }
-    
+
+    @Test
+    public void testCloseSsl() throws Exception {
+        testClose(SecurityProtocol.SSL, new SslChannelBuilder(Mode.CLIENT));
+    }
+
+    @Test
+    public void testClosePlaintext() throws Exception {
+        testClose(SecurityProtocol.PLAINTEXT, new PlaintextChannelBuilder());
+    }
+
+    private void testClose(SecurityProtocol securityProtocol, ChannelBuilder clientChannelBuilder) throws Exception {
+        String node = "0";
+        server = NetworkTestUtils.createEchoServer(securityProtocol, sslServerConfigs);
+        clientChannelBuilder.configure(sslClientConfigs);
+        this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", clientChannelBuilder);
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        NetworkTestUtils.waitForChannelReady(selector, node);
+
+        final ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+        server.outputChannel(Channels.newChannel(bytesOut));
+        server.selector().muteAll();
+        byte[] message = TestUtils.randomString(100).getBytes();
+        int count = 20;
+        final int totalSendSize = count * (message.length + 4);
+        for (int i = 0; i < count; i++) {
+            selector.send(new NetworkSend(node, ByteBuffer.wrap(message)));
+            do {
+                selector.poll(0L);
+            } while (selector.completedSends().isEmpty());
+        }
+        server.selector().unmuteAll();
+        selector.close(node);
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return bytesOut.toByteArray().length == totalSendSize;
+            }
+        }, 5000, "All requests sent were not processed");
+    }
+
     private void createSelector(Map<String, Object> sslClientConfigs) {
         createSelector(sslClientConfigs, null, null, null);
     }      

http://git-wip-us.apache.org/repos/asf/kafka/blob/e53babab/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 90a0fee..e98445f 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -443,7 +443,9 @@ private[kafka] class Processor(val id: Int,
             // that are sitting in the server's socket buffer
             curr.request.updateRequestMetrics
             trace("Socket server received empty response to send, registering for read: " + curr)
-            selector.unmute(curr.request.connectionId)
+            val channelId = curr.request.connectionId
+            if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null)
+                selector.unmute(channelId)
           case RequestChannel.SendAction =>
             sendResponse(curr)
           case RequestChannel.CloseConnectionAction =>
@@ -486,9 +488,12 @@ private[kafka] class Processor(val id: Int,
   private def processCompletedReceives() {
     selector.completedReceives.asScala.foreach { receive =>
       try {
-        val channel = selector.channel(receive.source)
-        val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
-          channel.socketAddress)
+        val openChannel = selector.channel(receive.source)
+        val session = {
+          // Only methods that are safe to call on a disconnected channel should be invoked on 'channel'.
+          val channel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
+          RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress)
+        }
         val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
         requestChannel.sendRequest(req)
         selector.mute(receive.source)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e53babab/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index f7b5da5..317f3d6 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -163,6 +163,21 @@ class SocketServerTest extends JUnitSuite {
   }
 
   @Test
+  def testGracefulClose() {
+    val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
+    val serializedBytes = producerRequestBytes
+
+    for (i <- 0 until 10)
+      sendRequest(plainSocket, serializedBytes)
+    plainSocket.close()
+    for (i <- 0 until 10) {
+      val request = server.requestChannel.receiveRequest(2000)
+      assertNotNull("receiveRequest timed out", request)
+      server.requestChannel.noOperation(request.processor, request)
+    }
+  }
+
+  @Test
   def testSocketsCloseOnShutdown() {
     // open a connection
     val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)