You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2014/10/16 23:19:14 UTC

[6/8] git commit: [SSHD-360] Rework CloseableUtils code

[SSHD-360] Rework CloseableUtils code

Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/d8386644
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/d8386644
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/d8386644

Branch: refs/heads/master
Commit: d8386644d897daccace5808338b24c5ee9b1bef8
Parents: b98694d
Author: Guillaume Nodet <gn...@apache.org>
Authored: Thu Oct 16 17:27:04 2014 +0200
Committer: Guillaume Nodet <gn...@apache.org>
Committed: Thu Oct 16 22:57:11 2014 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/sshd/SshClient.java    |  36 +-
 .../main/java/org/apache/sshd/SshServer.java    |  43 +-
 .../sshd/agent/common/AgentForwardSupport.java  |  11 +-
 .../sshd/agent/local/AgentForwardedChannel.java |   5 -
 .../sshd/agent/unix/AgentForwardedChannel.java  |   5 -
 .../client/channel/AbstractClientChannel.java   |  74 +---
 .../sshd/client/channel/ChannelDirectTcpip.java |   6 +-
 .../sshd/client/channel/ChannelSession.java     |   8 +-
 .../client/session/ClientUserAuthService.java   |   2 +-
 .../sshd/common/AbstractFactoryManager.java     |   4 +-
 .../sshd/common/channel/AbstractChannel.java    |  89 ++--
 .../common/channel/ChannelAsyncInputStream.java |  20 +-
 .../channel/ChannelAsyncOutputStream.java       |  15 +-
 .../common/forward/DefaultTcpipForwarder.java   |   8 +-
 .../sshd/common/forward/TcpipClientChannel.java |   2 +-
 .../sshd/common/forward/TcpipServerChannel.java |   2 +-
 .../apache/sshd/common/io/nio2/Nio2Service.java |   7 +-
 .../apache/sshd/common/io/nio2/Nio2Session.java |  12 +-
 .../session/AbstractConnectionService.java      |  36 +-
 .../sshd/common/session/AbstractSession.java    |  21 +-
 .../apache/sshd/common/util/CloseableUtils.java | 439 ++++++++-----------
 .../sshd/server/channel/ChannelSession.java     |  91 ++--
 .../sshd/server/session/ServerSession.java      |   2 +-
 .../sshd/server/x11/X11ForwardSupport.java      |  20 +-
 .../test/java/org/apache/sshd/CipherTest.java   |   2 +-
 .../test/java/org/apache/sshd/ClientTest.java   |  32 +-
 .../java/org/apache/sshd/CompressionTest.java   |   2 +-
 .../src/test/java/org/apache/sshd/KexTest.java  |   2 +-
 .../java/org/apache/sshd/KeyReExchangeTest.java |   2 +-
 .../src/test/java/org/apache/sshd/LoadTest.java |  60 +--
 .../src/test/java/org/apache/sshd/MacTest.java  |  35 +-
 .../org/apache/sshd/PortForwardingLoadTest.java |   4 +-
 .../org/apache/sshd/PortForwardingTest.java     |   4 +-
 .../src/test/java/org/apache/sshd/ScpTest.java  |   2 +-
 .../src/test/java/org/apache/sshd/SftpTest.java |   2 +-
 .../file/virtualfs/VirtualFileSystemTest.java   |   4 -
 36 files changed, 482 insertions(+), 627 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/SshClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/SshClient.java b/sshd-core/src/main/java/org/apache/sshd/SshClient.java
index 9eb47b8..0949574 100644
--- a/sshd-core/src/main/java/org/apache/sshd/SshClient.java
+++ b/sshd-core/src/main/java/org/apache/sshd/SshClient.java
@@ -62,7 +62,6 @@ import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.Factory;
 import org.apache.sshd.common.KeyPairProvider;
 import org.apache.sshd.common.NamedFactory;
-import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.io.DefaultIoServiceFactoryFactory;
 import org.apache.sshd.common.io.IoConnectFuture;
@@ -249,26 +248,21 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
         start();
     }
 
-    public CloseFuture close(boolean immediately) {
-        CloseFuture future;
-        if (connector != null) {
-            future = CloseableUtils.sequential(connector, ioServiceFactory).close(immediately);
-        } else if (ioServiceFactory != null) {
-            future = ioServiceFactory.close(immediately);
-        } else {
-            future = CloseableUtils.closed();
-        }
-        future.addListener(new SshFutureListener<CloseFuture>() {
-            public void operationComplete(CloseFuture future) {
-                connector = null;
-                ioServiceFactory = null;
-                if (shutdownExecutor && executor != null) {
-                    executor.shutdown();
-                    executor = null;
-                }
-            }
-        });
-        return future;
+    @Override
+    protected Closeable getInnerCloseable() {
+        return builder()
+                .sequential(connector, ioServiceFactory)
+                .run(new Runnable() {
+                    public void run() {
+                        connector = null;
+                        ioServiceFactory = null;
+                        if (shutdownExecutor && executor != null) {
+                            executor.shutdownNow();
+                            executor = null;
+                        }
+                    }
+                })
+                .build();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/SshServer.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/SshServer.java b/sshd-core/src/main/java/org/apache/sshd/SshServer.java
index 9d7d0c0..8d5d583 100644
--- a/sshd-core/src/main/java/org/apache/sshd/SshServer.java
+++ b/sshd-core/src/main/java/org/apache/sshd/SshServer.java
@@ -36,7 +36,6 @@ import org.apache.sshd.common.ForwardingFilter;
 import org.apache.sshd.common.NamedFactory;
 import org.apache.sshd.common.Session;
 import org.apache.sshd.common.SshdSocketAddress;
-import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.io.DefaultIoServiceFactoryFactory;
 import org.apache.sshd.common.io.IoAcceptor;
 import org.apache.sshd.common.io.IoServiceFactory;
@@ -324,29 +323,25 @@ public class SshServer extends AbstractFactoryManager implements ServerFactoryMa
     }
 
     @Override
-    protected CloseFuture doCloseGracefully() {
-        stopSessionTimeoutListener();
-        CloseFuture future;
-        if (acceptor != null) {
-            future = CloseableUtils.sequential(acceptor, ioServiceFactory).close(false);
-        } else if (ioServiceFactory != null) {
-            future = ioServiceFactory.close(false);
-        } else {
-            future = CloseableUtils.closed();
-        }
-        return future;
-    }
-
-    @Override
-    protected void doCloseImmediately() {
-        CloseableUtils.sequential(acceptor, ioServiceFactory).close(true);
-        acceptor = null;
-        ioServiceFactory = null;
-        if (shutdownExecutor && executor != null) {
-            executor.shutdown();
-            executor = null;
-        }
-        super.doCloseImmediately();
+    protected Closeable getInnerCloseable() {
+        return builder()
+                .run(new Runnable() {
+                    public void run() {
+                        stopSessionTimeoutListener();
+                    }
+                })
+                .sequential(acceptor, ioServiceFactory)
+                .run(new Runnable() {
+                    public void run() {
+                        acceptor = null;
+                        ioServiceFactory = null;
+                        if (shutdownExecutor && executor != null) {
+                            executor.shutdownNow();
+                            executor = null;
+                        }
+                    }
+                })
+                .build();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/agent/common/AgentForwardSupport.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/common/AgentForwardSupport.java b/sshd-core/src/main/java/org/apache/sshd/agent/common/AgentForwardSupport.java
index 4f6a464..c1a775f 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/common/AgentForwardSupport.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/common/AgentForwardSupport.java
@@ -21,22 +21,15 @@ package org.apache.sshd.agent.common;
 import java.io.IOException;
 
 import org.apache.sshd.agent.SshAgentServer;
-import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.SshException;
-import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.util.CloseableUtils;
-import org.apache.sshd.server.session.ServerSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * The server side fake agent, acting as an agent, but actually forwarding the requests to the auth channel on the client side.
  */
 public class AgentForwardSupport extends CloseableUtils.AbstractCloseable {
 
-    private static final Logger log = LoggerFactory.getLogger(AgentForwardSupport.class);
-
     private final ConnectionService service;
     private String agentId;
     private SshAgentServer agentServer;
@@ -73,4 +66,8 @@ public class AgentForwardSupport extends CloseableUtils.AbstractCloseable {
         super.doCloseImmediately();
     }
 
+    public String toString() {
+        return getClass().getSimpleName() + "[" + service.getSession() + "]";
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
index e632e2c..7abf714 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ArrayBlockingQueue;
 import org.apache.sshd.agent.SshAgent;
 import org.apache.sshd.agent.common.AbstractAgentProxy;
 import org.apache.sshd.client.channel.AbstractClientChannel;
-import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.channel.ChannelOutputStream;
 import org.apache.sshd.common.util.Buffer;
@@ -69,10 +68,6 @@ public class AgentForwardedChannel extends AbstractClientChannel {
         }
     }
 
-    public OpenFuture open() throws IOException {
-        return internalOpen();
-    }
-
     @Override
     protected void doOpen() throws IOException {
         if (streaming == Streaming.Async) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
index 327bf66..486c837 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
@@ -21,7 +21,6 @@ package org.apache.sshd.agent.unix;
 import java.io.IOException;
 
 import org.apache.sshd.client.channel.AbstractClientChannel;
-import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.channel.ChannelOutputStream;
 import org.apache.tomcat.jni.Socket;
@@ -56,10 +55,6 @@ public class AgentForwardedChannel extends AbstractClientChannel implements Runn
         }
     }
 
-    public synchronized OpenFuture open() throws IOException {
-        return internalOpen();
-    }
-
     @Override
     protected synchronized void doOpen() throws IOException {
         if (streaming == Streaming.Async) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
index 793e8b8..964ef3a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
@@ -35,10 +35,7 @@ import org.apache.sshd.common.channel.ChannelAsyncInputStream;
 import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
 import org.apache.sshd.common.io.IoInputStream;
 import org.apache.sshd.common.io.IoOutputStream;
-import org.apache.sshd.common.future.CloseFuture;
-import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.util.Buffer;
-import org.apache.sshd.common.util.CloseableUtils;
 import org.apache.sshd.common.util.IoUtils;
 
 /**
@@ -142,56 +139,31 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
     }
 
     @Override
-    public CloseFuture close(final boolean immediately) {
-        if (!closeFuture.isDone()) {
-            if (opened) {
-                super.close(immediately);
-            } else if (openFuture != null) {
-                if (immediately) {
-                    openFuture.setException(new SshException("Channel closed"));
-                    super.close(immediately);
-                } else {
-                    openFuture.addListener(new SshFutureListener<OpenFuture>() {
-                        public void operationComplete(OpenFuture future) {
-                            if (future.isOpened()) {
-                                close(immediately);
-                            } else {
-                                close(true);
-                            }
-                        }
-                    });
-                }
-            } else {
-                closeFuture.setClosed();
-                notifyStateChanged();
-            }
-        }
-        return closeFuture;
-    }
-
-    @Override
     protected Closeable getInnerCloseable() {
         return builder()
+                .when(openFuture)
+                .run(new Runnable() {
+                    public void run() {
+                        // If the channel has not been opened yet,
+                        // skip the SSH_MSG_CHANNEL_CLOSE exchange
+                        if (openFuture == null) {
+                            gracefulFuture.setClosed();
+                        }
+                        // Close inverted streams after
+                        // If the inverted stream is closed before, there's a small time window
+                        // in which we have:
+                        //    ChannePipedInputStream#closed = true
+                        //    ChannePipedInputStream#writerClosed = false
+                        // which leads to an IOException("Pipe closed") when reading.
+                        IoUtils.closeQuietly(in, out, err);
+                        IoUtils.closeQuietly(invertedIn, invertedOut, invertedErr);
+                    }
+                })
                 .parallel(asyncIn, asyncOut, asyncErr)
-                .close(super.getInnerCloseable())
+                .close(new GracefulChannelCloseable())
                 .build();
     }
 
-    @Override
-    protected void doCloseImmediately() {
-        // Close inverted streams after
-        // If the inverted stream is closed before, there's a small time window
-        // in which we have:
-        //    ChannePipedInputStream#closed = true
-        //    ChannePipedInputStream#writerClosed = false
-        // which leads to an IOException("Pipe closed") when reading.
-        IoUtils.closeQuietly(in, out, err);
-        IoUtils.closeQuietly(invertedIn, invertedOut, invertedErr);
-        // TODO: graceful close ?
-        CloseableUtils.parallel(asyncIn, asyncOut, asyncErr).close(true);
-        super.doCloseImmediately();
-    }
-
     public int waitFor(int mask, long timeout) {
         long t = 0;
         synchronized (lock) {
@@ -242,8 +214,8 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
         }
     }
 
-    protected OpenFuture internalOpen() throws IOException {
-        if (closeFuture.isClosed()) {
+    public synchronized OpenFuture open() throws IOException {
+        if (isClosing()) {
             throw new SshException("Session has been closed");
         }
         openFuture = new DefaultOpenFuture(lock);
@@ -292,7 +264,7 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
 
     protected void doWriteData(byte[] data, int off, int len) throws IOException {
         // If we're already closing, ignore incoming data
-        if (state.get() != CloseableUtils.AbstractCloseable.OPENED) {
+        if (isClosing()) {
             return;
         }
         if (asyncOut != null) {
@@ -308,7 +280,7 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
 
     protected void doWriteExtendedData(byte[] data, int off, int len) throws IOException {
         // If we're already closing, ignore incoming data
-        if (state.get() != CloseableUtils.AbstractCloseable.OPENED) {
+        if (isClosing()) {
             return;
         }
         if (asyncErr != null) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java
index 36ffd28..fdc1c99 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java
@@ -62,7 +62,7 @@ public class ChannelDirectTcpip extends AbstractClientChannel {
     }
 
     @Override
-    protected OpenFuture internalOpen() throws IOException {
+    public OpenFuture open() throws IOException {
         if (closeFuture.isClosed()) {
             throw new SshException("Session has been closed");
         }
@@ -94,10 +94,6 @@ public class ChannelDirectTcpip extends AbstractClientChannel {
         }
     }
 
-    public OpenFuture open() throws IOException {
-        return internalOpen();
-    }
-
     @Override
     protected void doWriteData(byte[] data, int off, int len) throws IOException {
         pipe.write(data, off, len);

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
index e02160b..92555df 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
@@ -21,7 +21,6 @@ package org.apache.sshd.client.channel;
 import java.io.IOException;
 import java.io.InputStream;
 
-import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.channel.ChannelOutputStream;
 import org.apache.sshd.common.channel.ChannelPipedInputStream;
@@ -29,7 +28,6 @@ import org.apache.sshd.common.channel.ChannelPipedOutputStream;
 import org.apache.sshd.common.channel.ChannelAsyncInputStream;
 import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
 import org.apache.sshd.common.future.CloseFuture;
-import org.apache.sshd.common.util.CloseableUtils;
 
 /**
  * TODO Add javadoc
@@ -44,10 +42,6 @@ public class ChannelSession extends AbstractClientChannel {
         super("session");
     }
 
-    public OpenFuture open() throws IOException {
-        return internalOpen();
-    }
-
     @Override
     protected void doOpen() throws IOException {
         if (streaming == Streaming.Async) {
@@ -117,7 +111,7 @@ public class ChannelSession extends AbstractClientChannel {
                 }
             }
         } catch (Exception e) {
-            if (state.get() == CloseableUtils.AbstractCloseable.OPENED) {
+            if (!isClosing()) {
                 log.info("Caught exception", e);
                 close(false);
             }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java
index a454874..3dfcced 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java
@@ -92,7 +92,7 @@ public class ClientUserAuthService extends CloseableUtils.AbstractInnerCloseable
         } else if (delegateOld != null) {
             return delegateOld;
         } else {
-            return new CloseableUtils.AbstractCloseable() { };
+            return builder().build();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/common/AbstractFactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/AbstractFactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/common/AbstractFactoryManager.java
index e10f20c..515bc9e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/AbstractFactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/AbstractFactoryManager.java
@@ -31,15 +31,13 @@ import org.apache.sshd.common.io.IoServiceFactory;
 import org.apache.sshd.common.io.IoServiceFactoryFactory;
 import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.util.CloseableUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * TODO Add javadoc
  *
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public abstract class AbstractFactoryManager extends CloseableUtils.AbstractCloseable implements FactoryManager {
+public abstract class AbstractFactoryManager extends CloseableUtils.AbstractInnerCloseable implements FactoryManager {
 
     protected Map<String,String> properties = new HashMap<String,String>();
     protected IoServiceFactoryFactory ioServiceFactoryFactory;

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index 4bfb4e6..41a0b4a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -21,7 +21,7 @@ package org.apache.sshd.common.channel;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.sshd.common.Channel;
 import org.apache.sshd.common.Closeable;
@@ -48,8 +48,9 @@ public abstract class AbstractChannel extends CloseableUtils.AbstractInnerClosea
     public static final int DEFAULT_WINDOW_SIZE = 0x200000;
     public static final int DEFAULT_PACKET_SIZE = 0x8000;
 
-    protected static final int CLOSE_SENT = 0x01;
-    protected static final int CLOSE_RECV = 0x02;
+    protected static enum GracefulState {
+        Opened, CloseSent, CloseReceived, Closed
+    }
 
     protected final Window localWindow = new Window(this, null, getClass().getName().contains(".client."), true);
     protected final Window remoteWindow = new Window(this, null, getClass().getName().contains(".client."), false);
@@ -58,7 +59,7 @@ public abstract class AbstractChannel extends CloseableUtils.AbstractInnerClosea
     protected int id;
     protected int recipient;
     protected volatile boolean eof;
-    protected AtomicInteger gracefulState = new AtomicInteger();
+    protected AtomicReference<GracefulState> gracefulState = new AtomicReference<GracefulState>(GracefulState.Opened);
     protected final DefaultCloseFuture gracefulFuture = new DefaultCloseFuture(lock);
     protected final List<RequestHandler<Channel>> handlers = new ArrayList<RequestHandler<Channel>>();
 
@@ -140,61 +141,67 @@ public abstract class AbstractChannel extends CloseableUtils.AbstractInnerClosea
 
     public void handleClose() throws IOException {
         log.debug("Received SSH_MSG_CHANNEL_CLOSE on channel {}", this);
-        if (gracefulState.compareAndSet(0, CLOSE_RECV)) {
+        if (gracefulState.compareAndSet(GracefulState.Opened, GracefulState.CloseReceived)) {
             close(false);
-        } else if (gracefulState.compareAndSet(CLOSE_SENT, CLOSE_SENT | CLOSE_RECV)) {
+        } else if (gracefulState.compareAndSet(GracefulState.CloseSent, GracefulState.Closed)) {
             gracefulFuture.setClosed();
         }
     }
 
     protected Closeable getInnerCloseable() {
-        return new Closeable() {
-            public boolean isClosed() {
-                return gracefulFuture.isClosed();
-            }
-            public boolean isClosing() {
-                return true;
-            }
-            public CloseFuture close(boolean immediately) {
-                if (!immediately) {
-                    log.debug("Send SSH_MSG_CHANNEL_CLOSE on channel {}", AbstractChannel.this);
-                    Buffer buffer = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_CLOSE);
-                    buffer.putInt(recipient);
-                    try {
-                        session.writePacket(buffer).addListener(new SshFutureListener<IoWriteFuture>() {
-                            public void operationComplete(IoWriteFuture future) {
-                                if (future.isWritten()) {
-                                    log.debug("Message SSH_MSG_CHANNEL_CLOSE written on channel {}", AbstractChannel.this);
-                                    if (gracefulState.compareAndSet(0, CLOSE_SENT)) {
-                                        // Waiting for CLOSE message to come back from the remote side
-                                    } else if (gracefulState.compareAndSet(CLOSE_RECV, CLOSE_SENT | CLOSE_RECV)) {
-                                        gracefulFuture.setValue(true);
-                                    }
-                                } else {
-                                    close(true);
+        return new GracefulChannelCloseable();
+    }
+
+    public class GracefulChannelCloseable implements Closeable {
+
+        protected volatile boolean closing;
+
+        public boolean isClosing() {
+            return closing;
+        }
+        public boolean isClosed() {
+            return gracefulFuture.isClosed();
+        }
+        public CloseFuture close(boolean immediately) {
+            closing = true;
+            if (immediately) {
+                gracefulFuture.setClosed();
+            } else if (!gracefulFuture.isClosed()) {
+                log.debug("Send SSH_MSG_CHANNEL_CLOSE on channel {}", AbstractChannel.this);
+                Buffer buffer = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_CLOSE);
+                buffer.putInt(recipient);
+                try {
+                    session.writePacket(buffer).addListener(new SshFutureListener<IoWriteFuture>() {
+                        public void operationComplete(IoWriteFuture future) {
+                            if (future.isWritten()) {
+                                log.debug("Message SSH_MSG_CHANNEL_CLOSE written on channel {}", AbstractChannel.this);
+                                if (gracefulState.compareAndSet(GracefulState.Opened, GracefulState.CloseSent)) {
+                                    // Waiting for CLOSE message to come back from the remote side
+                                } else if (gracefulState.compareAndSet(GracefulState.CloseReceived, GracefulState.Closed)) {
+                                    gracefulFuture.setClosed();
                                 }
+                            } else {
+                                AbstractChannel.this.close(true);
                             }
-                        });
-                    } catch (IOException e) {
-                        log.debug("Exception caught while writing SSH_MSG_CHANNEL_CLOSE packet on channel " + AbstractChannel.this, e);
-                        close(true);
-                    }
-                } else {
-                    gracefulFuture.setClosed();
+                        }
+                    });
+                } catch (IOException e) {
+                    log.debug("Exception caught while writing SSH_MSG_CHANNEL_CLOSE packet on channel " + AbstractChannel.this, e);
+                    AbstractChannel.this.close(true);
                 }
-                return gracefulFuture;
             }
-        };
+            return gracefulFuture;
+        }
     }
 
     @Override
     protected void doCloseImmediately() {
-        super.doCloseImmediately();
         service.unregisterChannel(AbstractChannel.this);
+        super.doCloseImmediately();
     }
 
     protected void writePacket(Buffer buffer) throws IOException {
-        if (state.get() == OPENED) {
+        if (!isClosing()) {
             session.writePacket(buffer);
         } else {
             log.debug("Discarding output packet because channel is being closed");

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
index 71155f1..b7cbbb5 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
@@ -21,24 +21,18 @@ package org.apache.sshd.common.channel;
 import java.io.IOException;
 
 import org.apache.sshd.common.Channel;
-import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.RuntimeSshException;
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.future.CloseFuture;
-import org.apache.sshd.common.future.DefaultCloseFuture;
 import org.apache.sshd.common.future.DefaultSshFuture;
-import org.apache.sshd.common.future.SshFuture;
-import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.io.IoInputStream;
 import org.apache.sshd.common.io.IoReadFuture;
 import org.apache.sshd.common.io.ReadPendingException;
 import org.apache.sshd.common.util.Buffer;
 import org.apache.sshd.common.util.CloseableUtils;
 import org.apache.sshd.common.util.Readable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class ChannelAsyncInputStream extends CloseableUtils.AbstractInnerCloseable implements IoInputStream {
+public class ChannelAsyncInputStream extends CloseableUtils.AbstractCloseable implements IoInputStream {
 
     private final Channel channel;
     private final Buffer buffer = new Buffer();
@@ -72,16 +66,20 @@ public class ChannelAsyncInputStream extends CloseableUtils.AbstractInnerCloseab
     }
 
     @Override
-    protected Closeable getInnerCloseable() {
+    protected void preClose() {
         synchronized (buffer) {
             if (buffer.available() == 0) {
                 if (pending != null) {
                     pending.setValue(new SshException("Closed"));
                 }
             }
-            return builder()
-                    .when(pending)
-                    .build();
+        }
+    }
+
+    @Override
+    protected CloseFuture doCloseGracefully() {
+        synchronized (buffer) {
+            return builder().when(pending).build().close(false);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
index 3455c15..6aabdeb 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
@@ -19,24 +19,21 @@
 package org.apache.sshd.common.channel;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.sshd.common.Channel;
-import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.DefaultSshFuture;
+import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.io.IoOutputStream;
 import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.io.WritePendingException;
-import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.util.Buffer;
 import org.apache.sshd.common.util.CloseableUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class ChannelAsyncOutputStream extends CloseableUtils.AbstractInnerCloseable implements IoOutputStream {
+public class ChannelAsyncOutputStream extends CloseableUtils.AbstractCloseable implements IoOutputStream {
 
     private final Channel channel;
     private final byte cmd;
@@ -65,10 +62,8 @@ public class ChannelAsyncOutputStream extends CloseableUtils.AbstractInnerClosea
     }
 
     @Override
-    protected Closeable getInnerCloseable() {
-        return builder()
-                .when(pendingWrite.get())
-                .build();
+    protected CloseFuture doCloseGracefully() {
+        return builder().when(pendingWrite.get()).build().close(false);
     }
 
     protected synchronized void doWriteIfPossible(boolean resume) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
index 6d664d7..d3256ad 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
@@ -35,8 +35,6 @@ import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.SshdSocketAddress;
 import org.apache.sshd.common.TcpipForwarder;
-import org.apache.sshd.common.future.CloseFuture;
-import org.apache.sshd.common.future.DefaultCloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.io.IoAcceptor;
 import org.apache.sshd.common.io.IoHandler;
@@ -45,8 +43,6 @@ import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.util.Buffer;
 import org.apache.sshd.common.util.CloseableUtils;
 import org.apache.sshd.common.util.Readable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * TODO Add javadoc
@@ -244,4 +240,8 @@ public class DefaultTcpipForwarder extends CloseableUtils.AbstractInnerCloseable
         }
     }
 
+    public String toString() {
+        return getClass().getSimpleName() + "[" + session + "]";
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
index e449c8d..0d95acf 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
@@ -104,7 +104,7 @@ public class TcpipClientChannel extends AbstractClientChannel {
 
     @Override
     protected Closeable getInnerCloseable() {
-        return CloseableUtils.sequential(serverSession, super.getInnerCloseable());
+        return builder().sequential(serverSession, super.getInnerCloseable()).build();
     }
 
     protected synchronized void doWriteData(byte[] data, int off, int len) throws IOException {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
index 9bbdea3..8ed89d4 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
@@ -112,7 +112,7 @@ public class TcpipServerChannel extends AbstractServerChannel {
         out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
         IoHandler handler = new IoHandler() {
             public void messageReceived(IoSession session, Readable message) throws Exception {
-                if (state.get() != OPENED) {
+                if (isClosing()) {
                     log.debug("Ignoring write to channel {} in CLOSING state", id);
                 } else {
                     Buffer buffer = new Buffer();

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
index 44019ec..3c5a2b7 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
@@ -19,17 +19,13 @@
 package org.apache.sshd.common.io.nio2;
 
 import java.nio.channels.AsynchronousChannelGroup;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.FactoryManager;
-import org.apache.sshd.common.future.CloseFuture;
-import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.io.IoHandler;
 import org.apache.sshd.common.io.IoService;
 import org.apache.sshd.common.io.IoSession;
@@ -66,8 +62,7 @@ public abstract class Nio2Service extends CloseableUtils.AbstractInnerCloseable
 
     @Override
     protected Closeable getInnerCloseable() {
-        List<IoSession> s = new ArrayList<IoSession>(sessions.values());
-        return CloseableUtils.parallel(s);
+        return builder().parallel(sessions.values()).build();
     }
 
     public Map<Long, IoSession> getManagedSessions() {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
index e8a8c4f..8fb1f3c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.DefaultSshFuture;
-import org.apache.sshd.common.future.SshFuture;
 import org.apache.sshd.common.io.IoHandler;
 import org.apache.sshd.common.io.IoService;
 import org.apache.sshd.common.io.IoSession;
@@ -105,7 +104,7 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
         log.debug("Writing {} bytes", buffer.available());
         ByteBuffer buf = ByteBuffer.wrap(buffer.array(), buffer.rpos(), buffer.available());
         final DefaultIoWriteFuture future = new DefaultIoWriteFuture(null, buf);
-        if (state.get() != OPENED) {
+        if (isClosing()) {
             Throwable exc = new ClosedChannelException();
             future.setException(exc);
             exceptionCaught(exc);
@@ -134,9 +133,7 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
 
     @Override
     protected CloseFuture doCloseGracefully() {
-        synchronized (writes) {
-            return CloseableUtils.parallel(writes.toArray(new SshFuture[writes.size()]));
-        }
+        return builder().when(writes).build().close(false);
     }
 
     @Override
@@ -231,10 +228,7 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
                             finishWrite();
                         }
                         private void finishWrite() {
-                            synchronized (writes) {
-                                writes.remove(future);
-                                writes.notifyAll();
-                            }
+                            writes.remove(future);
                             currentWrite.compareAndSet(future, null);
                             startWriting();
                         }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionService.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionService.java
index 72c8c86..6bd467d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionService.java
@@ -19,7 +19,6 @@
 package org.apache.sshd.common.session;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -30,23 +29,31 @@ import org.apache.sshd.client.channel.AbstractClientChannel;
 import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.Channel;
 import org.apache.sshd.common.Closeable;
-import org.apache.sshd.common.RequestHandler;
 import org.apache.sshd.common.NamedFactory;
+import org.apache.sshd.common.RequestHandler;
 import org.apache.sshd.common.Session;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.TcpipForwarder;
-import org.apache.sshd.common.future.CloseFuture;
-import org.apache.sshd.common.future.DefaultCloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.util.Buffer;
 import org.apache.sshd.common.util.CloseableUtils;
 import org.apache.sshd.server.channel.OpenChannelException;
 import org.apache.sshd.server.x11.X11ForwardSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import static org.apache.sshd.common.SshConstants.*;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_CHANNEL_CLOSE;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_CHANNEL_DATA;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_CHANNEL_EOF;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_CHANNEL_FAILURE;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_CHANNEL_OPEN;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_CHANNEL_OPEN_CONFIRMATION;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_CHANNEL_OPEN_FAILURE;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_CHANNEL_REQUEST;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_CHANNEL_WINDOW_ADJUST;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_GLOBAL_REQUEST;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_REQUEST_FAILURE;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_REQUEST_SUCCESS;
 
 /**
  * Base implementation of ConnectionService.
@@ -88,10 +95,10 @@ public abstract class AbstractConnectionService extends CloseableUtils.AbstractI
 
     @Override
     protected Closeable getInnerCloseable() {
-        return CloseableUtils.sequential(
-                tcpipForwarder, agentForward, x11Forward,
-                CloseableUtils.parallel(channels.values())
-        );
+        return builder()
+                .sequential(tcpipForwarder, agentForward, x11Forward)
+                .parallel(channels.values())
+                .build();
     }
 
     protected int getNextChannelId() {
@@ -106,7 +113,7 @@ public abstract class AbstractConnectionService extends CloseableUtils.AbstractI
      * @throws IOException
      */
     public int registerChannel(Channel channel) throws IOException {
-        if (state.get() != OPENED) {
+        if (isClosing()) {
             throw new IllegalStateException("Session is being closed");
         }
         int channelId = getNextChannelId();
@@ -298,7 +305,7 @@ public abstract class AbstractConnectionService extends CloseableUtils.AbstractI
 
         log.debug("Received SSH_MSG_CHANNEL_OPEN {}", type);
 
-        if (state.get() != OPENED) {
+        if (isClosing()) {
             Buffer buf = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_OPEN_FAILURE);
             buf.putInt(id);
             buf.putInt(SshConstants.SSH_OPEN_CONNECT_FAILED);
@@ -416,6 +423,7 @@ public abstract class AbstractConnectionService extends CloseableUtils.AbstractI
     }
 
     public String toString() {
-        return getClass().getSimpleName();
+        return getClass().getSimpleName() + "[" + session + "]";
     }
+
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
index ac859d7..d301e0a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
@@ -20,7 +20,6 @@ package org.apache.sshd.common.session;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -54,7 +53,14 @@ import org.apache.sshd.common.util.BufferUtils;
 import org.apache.sshd.common.util.CloseableUtils;
 import org.apache.sshd.common.util.Readable;
 
-import static org.apache.sshd.common.SshConstants.*;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_DEBUG;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_DISCONNECT;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_IGNORE;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_KEXINIT;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_NEWKEYS;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_SERVICE_ACCEPT;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_SERVICE_REQUEST;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_UNIMPLEMENTED;
 
 /**
  * The AbstractSession handles all the basic SSH protocol such as key exchange, authentication,
@@ -417,7 +423,7 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
     public void exceptionCaught(Throwable t) {
         // Ignore exceptions that happen while closing
         synchronized (lock) {
-            if (state.get() != OPENED) {
+            if (isClosing()) {
                 return;
             }
         }
@@ -438,16 +444,17 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
 
     @Override
     protected Closeable getInnerCloseable() {
-        return CloseableUtils.sequential(lock,
-                CloseableUtils.parallel(lock, getServices()), ioSession);
+        return builder()
+                .parallel(getServices())
+                .close(ioSession)
+                .build();
     }
 
     @Override
     protected void doCloseImmediately() {
         super.doCloseImmediately();
         // Fire 'close' event
-        final ArrayList<SessionListener> l = new ArrayList<SessionListener>(listeners);
-        for (SessionListener sl : l) {
+        for (SessionListener sl : listeners) {
             sl.sessionClosed(this);
         }
     }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java b/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java
index 5d1d97f..94acdf5 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java
@@ -18,11 +18,14 @@
  */
 package org.apache.sshd.common.util;
 
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.SshException;
@@ -47,346 +50,273 @@ public class CloseableUtils {
         return future;
     }
 
-    public static Closeable parallel(final Collection<? extends Closeable> closeables) {
-        return parallel(null, closeables);
-    }
+    public static class Builder {
 
-    public static Closeable parallel(final Object lock, final Collection<? extends Closeable> closeables) {
-        return parallel(lock, closeables.toArray(new Closeable[closeables.size()]));
-    }
+        private final Object lock;
+        private final List<Closeable> closeables = new ArrayList<Closeable>();
 
-    public static Closeable parallel(final Closeable... closeables) {
-        return parallel(null, closeables);
-    }
+        private Builder(Object lock) {
+            this.lock = lock;
+        }
 
-    public static Closeable parallel(final Object lock, final Closeable... closeables) {
-        int nbNonNulls = 0;
-        for (Closeable closeable : closeables) {
-            if (closeable != null) {
-                nbNonNulls++;
+        public Builder run(final Runnable r) {
+            return close(new SimpleCloseable(lock) {
+                @Override
+                protected void doClose(boolean immediately) {
+                    try {
+                        r.run();
+                    } finally {
+                        super.doClose(immediately);
+                    }
+                }
+            });
+        }
+
+        public <T extends SshFuture> Builder when(SshFuture<T> future) {
+            if (future != null) {
+                when(Collections.singleton(future));
             }
+            return this;
         }
-        if (nbNonNulls == 0) {
-            return new Closeable() {
-                final CloseFuture future = new DefaultCloseFuture(lock);
-                public boolean isClosed() {
-                    return future.isClosed();
-                }
-                public boolean isClosing() {
-                    return isClosed();
-                }
-                public CloseFuture close(boolean immediately) {
-                    future.setClosed();
-                    return future;
-                }
-            };
-        } else if (nbNonNulls == 1) {
+
+        public <T extends SshFuture> Builder when(SshFuture<T>... futures) {
+            return when(Arrays.asList(futures));
+        }
+
+        public <T extends SshFuture> Builder when(final Iterable<? extends SshFuture<T>> futures) {
+            return close(new FuturesCloseable<T>(lock, futures));
+        }
+
+        public Builder sequential(Closeable... closeables) {
             for (Closeable closeable : closeables) {
-                if (closeable != null) {
-                    return closeable;
-                }
+                close(closeable);
             }
-            throw new IllegalStateException();
-        } else {
-            return new Closeable() {
-                final CloseFuture future = new DefaultCloseFuture(lock);
-                final AtomicBoolean closing = new AtomicBoolean();
-                public boolean isClosed() {
-                    return future.isClosed();
-                }
-                public boolean isClosing() {
-                    return closing.get();
-                }
-                public CloseFuture close(boolean immediately) {
-                    final AtomicInteger count = new AtomicInteger(closeables.length);
-                    if (closing.compareAndSet(false, true)) {
-                        SshFutureListener<CloseFuture> listener = new SshFutureListener<CloseFuture>() {
-                            public void operationComplete(CloseFuture f) {
-                                if (count.decrementAndGet() == 0) {
-                                    future.setClosed();
-                                }
-                            }
-                        };
-                        for (Closeable c : closeables) {
-                            if (c != null) {
-                                c.close(immediately).addListener(listener);
-                            } else {
-                                listener.operationComplete(null);
-                            }
-                        }
-                    }
-                    return future;
-                }
-            };
+            return this;
         }
-    }
 
-    public static Closeable sequential(final Collection<? extends Closeable> closeables) {
-        return sequential(null, closeables);
-    }
+        public Builder sequential(Iterable<Closeable> closeables) {
+            return close(new SequentialCloseable(lock, closeables));
+        }
 
-    public static Closeable sequential(final Object lock, final Collection<? extends Closeable> closeables) {
-        return sequential(lock, closeables.toArray(new Closeable[closeables.size()]));
-    }
+        public Builder parallel(Closeable... closeables) {
+            if (closeables.length == 1) {
+                close(closeables[0]);
+            } else if (closeables.length > 0) {
+                parallel(Arrays.asList(closeables));
+            }
+            return this;
+        }
 
-    public static Closeable sequential(final Closeable... closeables) {
-        return sequential(null, closeables);
-    }
+        public Builder parallel(Iterable<? extends Closeable> closeables) {
+            return close(new ParallelCloseable(lock, closeables));
+        }
 
-    public static Closeable sequential(final Object lock, final Closeable... closeables) {
-        int nbNonNulls = 0;
-        for (Closeable closeable : closeables) {
-            if (closeable != null) {
-                nbNonNulls++;
+        public Builder close(Closeable c) {
+            if (c != null) {
+                closeables.add(c);
             }
+            return this;
         }
-        if (nbNonNulls == 0) {
-            return new Closeable() {
-                final CloseFuture future = new DefaultCloseFuture(lock);
-                public boolean isClosed() {
-                    return future.isClosed();
-                }
-                public boolean isClosing() {
-                    return isClosed();
-                }
-                public CloseFuture close(boolean immediately) {
-                    future.setClosed();
-                    return future;
-                }
-            };
-        } else if (nbNonNulls == 1) {
-            for (Closeable closeable : closeables) {
-                if (closeable != null) {
-                    return closeable;
-                }
+
+        public Closeable build() {
+            if (closeables.isEmpty()) {
+                return new SimpleCloseable(lock);
+            } else if (closeables.size() == 1) {
+                return closeables.get(0);
+            } else {
+                return new SequentialCloseable(lock, closeables);
             }
-            throw new IllegalStateException();
-        } else {
-            return new Closeable() {
-                final DefaultCloseFuture future = new DefaultCloseFuture(lock);
-                final AtomicBoolean closing = new AtomicBoolean();
-                public boolean isClosed() {
-                    return future.isClosed();
-                }
-                public boolean isClosing() {
-                    return closing.get();
-                }
-                public CloseFuture close(final boolean immediately) {
-                    if (closing.compareAndSet(false, true)) {
-                        final Iterator<Closeable> iterator = Arrays.asList(closeables).iterator();
-                        SshFutureListener<CloseFuture> listener = new SshFutureListener<CloseFuture>() {
-                            public void operationComplete(CloseFuture previousFuture) {
-                                while (iterator.hasNext()) {
-                                    Closeable c = iterator.next();
-                                    if (c != null) {
-                                        CloseFuture nextFuture = c.close(immediately);
-                                        nextFuture.addListener(this);
-                                        return;
-                                    }
-                                }
-                                if (!iterator.hasNext()) {
-                                    future.setClosed();
-                                }
-                            }
-                        };
-                        listener.operationComplete(null);
-                    }
-                    return future;
-                }
-            };
         }
+
     }
 
-    public static <T extends SshFuture> CloseFuture parallel(final SshFuture<T>... futures) {
-        return parallel(null, futures);
+    private static class SimpleCloseable implements Closeable {
+
+        protected final DefaultCloseFuture future;
+        protected final AtomicBoolean closing;
+
+        public SimpleCloseable(Object lock) {
+            future = new DefaultCloseFuture(lock);
+            closing = new AtomicBoolean();
+        }
+
+        public boolean isClosed() {
+            return future.isClosed();
+        }
+        public boolean isClosing() {
+            return closing.get();
+        }
+        public CloseFuture close(boolean immediately) {
+            if (closing.compareAndSet(false, true)) {
+                doClose(immediately);
+            }
+            return future;
+        }
+
+        protected void doClose(boolean immediately) {
+            future.setClosed();
+        }
     }
 
-    public static <T extends SshFuture> CloseFuture parallel(Object lock, final SshFuture<T>... futures) {
-        final CloseFuture future = new DefaultCloseFuture(lock);
-        if (futures.length > 0) {
-            final AtomicInteger count = new AtomicInteger(futures.length);
-            SshFutureListener<T> listener = new SshFutureListener<T>() {
-                public void operationComplete(T f) {
+    private static class ParallelCloseable extends SimpleCloseable {
+
+        final Iterable<? extends Closeable> closeables;
+
+        private ParallelCloseable(Object lock, Iterable<? extends Closeable> closeables) {
+            super(lock);
+            this.closeables = closeables;
+        }
+
+        protected void doClose(final boolean immediately) {
+            final AtomicInteger count = new AtomicInteger(1);
+            SshFutureListener<CloseFuture> listener = new SshFutureListener<CloseFuture>() {
+                public void operationComplete(CloseFuture f) {
                     if (count.decrementAndGet() == 0) {
                         future.setClosed();
                     }
                 }
             };
-            for (SshFuture<T> f : futures) {
-                if (f != null) {
-                    f.addListener(listener);
-                } else {
-                    listener.operationComplete(null);
+            for (Closeable c : closeables) {
+                if (c != null) {
+                    count.incrementAndGet();
+                    c.close(immediately).addListener(listener);
                 }
             }
-        } else {
-            future.setClosed();
+            listener.operationComplete(null);
         }
-        return future;
     }
 
-    public static Builder builder() {
-        return new Builder();
-    }
+    private static class SequentialCloseable extends SimpleCloseable {
 
-    public static Builder builder(Logger logger, Object lock) {
-        return new Builder();
-    }
+        private final Iterable<? extends Closeable> closeables;
 
-    public static class Builder {
-        private final Object lock;
-        private Closeable closeable = null;
-        public Builder() {
-            this(null);
+        public SequentialCloseable(Object lock, Iterable<? extends Closeable> closeables) {
+            super(lock);
+            this.closeables = closeables;
         }
-        public Builder(Object lock) {
-            this.lock = lock;
-        }
-        public <T extends SshFuture> Builder when(final SshFuture<T>... futures) {
-            return close(new Closeable() {
-                private volatile boolean closing;
-                private volatile boolean closed;
-                public CloseFuture close(boolean immediately) {
-                    closing = true;
-                    if (immediately) {
-                        for (SshFuture<?> future : futures) {
-                            if (future instanceof DefaultSshFuture) {
-                                ((DefaultSshFuture<?>) future).setValue(new SshException("Closed"));
-                            }
+
+        protected void doClose(final boolean immediately) {
+            final Iterator<? extends Closeable> iterator = closeables.iterator();
+            SshFutureListener<CloseFuture> listener = new SshFutureListener<CloseFuture>() {
+                public void operationComplete(CloseFuture previousFuture) {
+                    while (iterator.hasNext()) {
+                        Closeable c = iterator.next();
+                        if (c != null) {
+                            CloseFuture nextFuture = c.close(immediately);
+                            nextFuture.addListener(this);
+                            return;
                         }
-                        closed = true;
-                        return closed();
-                    } else {
-                        return CloseableUtils.parallel(lock, futures).addListener(new SshFutureListener<CloseFuture>() {
-                            public void operationComplete(CloseFuture future) {
-                                closed = true;
-                            }
-                        });
+                    }
+                    if (!iterator.hasNext()) {
+                        future.setClosed();
                     }
                 }
+            };
+            listener.operationComplete(null);
+        }
+    }
 
-                public boolean isClosed() {
-                    return closed;
-                }
+    private static class FuturesCloseable<T extends SshFuture> extends CloseableUtils.SimpleCloseable {
 
-                public boolean isClosing() {
-                    return closing || closed;
-                }
-            });
-        }
-        public <T extends SshFuture> Builder when(Collection<? extends SshFuture<T>> futures) {
-            return when(futures.toArray(new SshFuture[futures.size()]));
-        }
-        public Builder sequential(Closeable... closeables) {
-            return close(CloseableUtils.sequential(lock, closeables));
-        }
-        public Builder sequential(Collection<Closeable> closeables) {
-            return close(CloseableUtils.sequential(lock, closeables));
-        }
-        public Builder parallel(Closeable... closeables) {
-            return close(CloseableUtils.parallel(lock, closeables));
-        }
-        public Builder parallel(Collection<? extends Closeable> closeables) {
-            return close(CloseableUtils.parallel(lock, closeables));
-        }
-        public Builder close(Closeable c) {
-            if (closeable == null) {
-                closeable = c;
-            } else {
-                closeable = CloseableUtils.sequential(lock, closeable, c);
-            }
-            return this;
+        private final Iterable<? extends SshFuture<T>> futures;
+
+        public FuturesCloseable(Object lock, Iterable<? extends SshFuture<T>> futures) {
+            super(lock);
+            this.futures = futures;
         }
-        public Closeable build() {
-            if (closeable == null) {
-                closeable = new Closeable() {
-                    private volatile boolean closed;
-                    public CloseFuture close(boolean immediately) {
-                        closed = true;
-                        return closed();
-                    }
-                    public boolean isClosed() {
-                        return closed;
+
+        protected void doClose(boolean immediately) {
+            if (immediately) {
+                for (SshFuture<?> f : futures) {
+                    if (f instanceof DefaultSshFuture) {
+                        ((DefaultSshFuture<?>) f).setValue(new SshException("Closed"));
                     }
-                    public boolean isClosing() {
-                        return closed;
+                }
+                future.setClosed();
+            } else {
+                final AtomicInteger count = new AtomicInteger(1);
+                SshFutureListener<T> listener = new SshFutureListener<T>() {
+                    public void operationComplete(T f) {
+                        if (count.decrementAndGet() == 0) {
+                            future.setClosed();
+                        }
                     }
                 };
+                for (SshFuture<T> f : futures) {
+                    if (f != null) {
+                        count.incrementAndGet();
+                        f.addListener(listener);
+                    }
+                }
+                listener.operationComplete(null);
             }
-            return closeable;
         }
     }
 
     public static abstract class AbstractCloseable implements Closeable {
 
-        protected static final int OPENED = 0;
-        protected static final int GRACEFUL = 1;
-        protected static final int IMMEDIATE = 2;
-        protected static final int CLOSED = 3;
-
+        protected enum State {
+            Opened, Graceful, Immediate, Closed
+        }
         /** Our logger */
         protected final Logger log = LoggerFactory.getLogger(getClass());
         /** Lock object for this session state */
-        protected final Object lock;
+        protected final Object lock = new Object();
         /** State of this object */
-        protected final AtomicInteger state = new AtomicInteger(OPENED);
+        protected final AtomicReference<State> state = new AtomicReference<State>(State.Opened);
         /** A future that will be set 'closed' when the object is actually closed */
-        protected final CloseFuture closeFuture;
-
-        protected AbstractCloseable() {
-            this(new Object());
-        }
-
-        protected AbstractCloseable(Object lock) {
-            this.lock = lock;
-            this.closeFuture = new DefaultCloseFuture(lock);
-        }
+        protected final CloseFuture closeFuture = new DefaultCloseFuture(lock);
 
         public CloseFuture close(boolean immediately) {
             if (immediately) {
-                if (state.compareAndSet(OPENED, IMMEDIATE) || state.compareAndSet(GRACEFUL, IMMEDIATE)) {
+                if (state.compareAndSet(State.Opened, State.Immediate)
+                        || state.compareAndSet(State.Graceful, State.Immediate)) {
                     log.debug("Closing {} immediately", this);
                     preClose();
                     doCloseImmediately();
                     log.debug("{} closed", this);
                 } else {
-                    log.debug("{} is already {}", this, state.get() == CLOSED ? "closed" : "closing");
+                    log.debug("{} is already {}", this, state.get() == State.Closed ? "closed" : "closing");
                 }
             } else {
-                if (state.compareAndSet(OPENED, GRACEFUL)) {
+                if (state.compareAndSet(State.Opened, State.Graceful)) {
                     log.debug("Closing {} gracefully", this);
                     preClose();
                     SshFuture<CloseFuture> grace = doCloseGracefully();
                     if (grace != null) {
                         grace.addListener(new SshFutureListener<CloseFuture>() {
                             public void operationComplete(CloseFuture future) {
-                                if (state.compareAndSet(GRACEFUL, IMMEDIATE)) {
+                                if (state.compareAndSet(State.Graceful, State.Immediate)) {
                                     doCloseImmediately();
                                     log.debug("{} closed", AbstractCloseable.this);
                                 }
                             }
                         });
                     } else {
-                        if (state.compareAndSet(GRACEFUL, IMMEDIATE)) {
+                        if (state.compareAndSet(State.Graceful, State.Immediate)) {
                             doCloseImmediately();
                             log.debug("{} closed", this);
                         }
                     }
                 } else {
-                    log.debug("{} is already {}", this, state.get() == CLOSED ? "closed" : "closing");
+                    log.debug("{} is already {}", this, state.get() == State.Closed ? "closed" : "closing");
                 }
             }
             return closeFuture;
         }
 
         public boolean isClosed() {
-            return state.get() == CLOSED;
+            return state.get() == State.Closed;
         }
 
         public boolean isClosing() {
-            return state.get() != OPENED;
+            return state.get() != State.Opened;
         }
 
+        /**
+         * preClose is guaranteed to be called before doCloseGracefully or doCloseImmediately.
+         * When preClose() is called, isClosing() == true
+         */
         protected void preClose() {
         }
 
@@ -394,9 +324,16 @@ public class CloseableUtils {
             return null;
         }
 
+        /**
+         * doCloseImmediately is called once and only once
+         * with state == Immediate
+         *
+         * Overriding methods should always call the base implementation.
+         * It may be called concurrently while preClose() or doCloseGracefully is executing
+         */
         protected void doCloseImmediately() {
             closeFuture.setClosed();
-            state.set(CLOSED);
+            state.set(State.Closed);
         }
 
         protected Builder builder() {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
index fa8abba..b2ad04a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
@@ -45,7 +45,6 @@ import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.DefaultCloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.util.Buffer;
-import org.apache.sshd.common.util.CloseableUtils;
 import org.apache.sshd.common.util.IoUtils;
 import org.apache.sshd.common.util.LoggingFilterOutputStream;
 import org.apache.sshd.server.AsyncCommand;
@@ -192,49 +191,51 @@ public class ChannelSession extends AbstractServerChannel {
 
     @Override
     protected Closeable getInnerCloseable() {
-        return CloseableUtils.sequential(getCommandCloseable(), super.getInnerCloseable());
+        return builder()
+                .sequential(new CommandCloseable(), new GracefulChannelCloseable())
+                .parallel(asyncOut, asyncErr)
+                .build();
     }
 
-    protected Closeable getCommandCloseable() {
-        return new Closeable() {
-            public boolean isClosed() {
-                return commandExitFuture.isClosed();
-            }
-            public boolean isClosing() {
-                return isClosed();
-            }
-            public CloseFuture close(boolean immediately) {
-                if (immediately) {
-                    commandExitFuture.setClosed();
-                } else if (!commandExitFuture.isClosed()) {
-                    IoUtils.closeQuietly(receiver);
-                    final TimerTask task = new TimerTask() {
-                        @Override
-                        public void run() {
-                            commandExitFuture.setClosed();
-                        }
-                    };
-                    long timeout = DEFAULT_COMMAND_EXIT_TIMEOUT;
-                    String val = getSession().getFactoryManager().getProperties().get(ServerFactoryManager.COMMAND_EXIT_TIMEOUT);
-                    if (val != null) {
-                        try {
-                            timeout = Long.parseLong(val);
-                        } catch (NumberFormatException e) {
-                            // Ignore
-                        }
+    public class CommandCloseable implements Closeable {
+        public boolean isClosed() {
+            return commandExitFuture.isClosed();
+        }
+        public boolean isClosing() {
+            return isClosed();
+        }
+        public CloseFuture close(boolean immediately) {
+            if (immediately || command == null) {
+                commandExitFuture.setClosed();
+            } else if (!commandExitFuture.isClosed()) {
+                IoUtils.closeQuietly(receiver);
+                final TimerTask task = new TimerTask() {
+                    @Override
+                    public void run() {
+                        commandExitFuture.setClosed();
+                    }
+                };
+                long timeout = DEFAULT_COMMAND_EXIT_TIMEOUT;
+                String val = getSession().getFactoryManager().getProperties().get(ServerFactoryManager.COMMAND_EXIT_TIMEOUT);
+                if (val != null) {
+                    try {
+                        timeout = Long.parseLong(val);
+                    } catch (NumberFormatException e) {
+                        // Ignore
                     }
-                    log.debug("Wait {} ms for shell to exit cleanly", timeout);
-                    getSession().getFactoryManager().getScheduledExecutorService().schedule(task, timeout, TimeUnit.MILLISECONDS);
-                    commandExitFuture.addListener(new SshFutureListener<CloseFuture>() {
-                        public void operationComplete(CloseFuture future) {
-                            task.cancel();
-                        }
-                    });
                 }
-                return commandExitFuture;
+                log.debug("Wait {} ms for shell to exit cleanly", timeout);
+                getSession().getFactoryManager().getScheduledExecutorService().schedule(task, timeout, TimeUnit.MILLISECONDS);
+                commandExitFuture.addListener(new SshFutureListener<CloseFuture>() {
+                    public void operationComplete(CloseFuture future) {
+                        task.cancel();
+                    }
+                });
             }
-        };
+            return commandExitFuture;
+        }
     }
+
     @Override
     protected void doCloseImmediately() {
         if (command != null) {
@@ -243,8 +244,6 @@ public class ChannelSession extends AbstractServerChannel {
         }
         remoteWindow.notifyClosed();
         IoUtils.closeQuietly(out, err, receiver);
-        // TODO: graceful close ?
-        CloseableUtils.parallel(asyncOut, asyncErr).close(true);
         super.doCloseImmediately();
     }
 
@@ -256,7 +255,7 @@ public class ChannelSession extends AbstractServerChannel {
 
     protected void doWriteData(byte[] data, int off, int len) throws IOException {
         // If we're already closing, ignore incoming data
-        if (state.get() != OPENED) {
+        if (isClosing()) {
             return;
         }
         if (receiver != null) {
@@ -393,6 +392,10 @@ public class ChannelSession extends AbstractServerChannel {
     }
 
     protected boolean handleShell(Buffer buffer) throws IOException {
+        // If we're already closing, ignore incoming data
+        if (isClosing()) {
+            return false;
+        }
         if (((ServerSession) session).getFactoryManager().getShellFactory() == null) {
             return false;
         }
@@ -403,6 +406,10 @@ public class ChannelSession extends AbstractServerChannel {
     }
 
     protected boolean handleExec(Buffer buffer) throws IOException {
+        // If we're already closing, ignore incoming data
+        if (isClosing()) {
+            return false;
+        }
         String commandLine = buffer.getString();
         if (((ServerSession) session).getFactoryManager().getCommandFactory() == null) {
             log.warn("Unsupported command: {}", commandLine);
@@ -561,7 +568,7 @@ public class ChannelSession extends AbstractServerChannel {
     }
 
     protected void closeShell(int exitValue) throws IOException {
-        if (state.get() == OPENED) {
+        if (!isClosing()) {
             sendEof();
             sendExitStatus(exitValue);
             commandExitFuture.setClosed();

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java
index 169e434..39a64a7 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java
@@ -88,7 +88,7 @@ public class ServerSession extends AbstractSession {
      * @throws IOException
      */
     protected void checkForTimeouts() throws IOException {
-        if (state.get() == OPENED) {
+        if (!isClosing()) {
             long now = System.currentTimeMillis();
             if (!authed && now > authTimeoutTimestamp) {
                 disconnect(SshConstants.SSH2_DISCONNECT_PROTOCOL_ERROR, "Session has timed out waiting for authentication after " + authTimeoutMs + " ms.");

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java b/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
index 2b878b8..ba8eb36 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
@@ -29,7 +29,6 @@ import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.channel.ChannelOutputStream;
-import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.io.IoAcceptor;
 import org.apache.sshd.common.io.IoHandler;
 import org.apache.sshd.common.io.IoSession;
@@ -37,8 +36,6 @@ import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.util.Buffer;
 import org.apache.sshd.common.util.CloseableUtils;
 import org.apache.sshd.common.util.Readable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
@@ -69,20 +66,7 @@ public class X11ForwardSupport extends CloseableUtils.AbstractInnerCloseable imp
 
     @Override
     protected Closeable getInnerCloseable() {
-        return acceptor != null ? acceptor : new CloseableUtils.AbstractCloseable() { };
-    }
-
-    public CloseFuture close(boolean immediately) {
-        IoAcceptor a;
-        synchronized (this) {
-            a = acceptor;
-            acceptor = null;
-        }
-        if (a != null) {
-            return a.close(immediately);
-        } else {
-            return CloseableUtils.closed();
-        }
+        return builder().close(acceptor).build();
     }
 
     public synchronized String createDisplay(boolean singleConnection,
@@ -212,7 +196,7 @@ public class X11ForwardSupport extends CloseableUtils.AbstractInnerCloseable imp
 
         @Override
         protected Closeable getInnerCloseable() {
-            return CloseableUtils.sequential(serverSession, super.getInnerCloseable());
+            return builder().sequential(serverSession, super.getInnerCloseable()).build();
         }
 
         protected synchronized void doWriteData(byte[] data, int off, int len) throws IOException {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/test/java/org/apache/sshd/CipherTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/CipherTest.java b/sshd-core/src/test/java/org/apache/sshd/CipherTest.java
index 28e3b68..7d2334f 100644
--- a/sshd-core/src/test/java/org/apache/sshd/CipherTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/CipherTest.java
@@ -130,7 +130,7 @@ public class CipherTest extends BaseTest {
     @After
     public void tearDown() throws Exception {
         if (sshd != null) {
-            sshd.stop();
+            sshd.stop(true);
         }
     }