You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2014/06/05 18:07:55 UTC

[2/2] git commit: [SSHD-312] Provide fully asynchronous interfaces for ClientChannel and Command

[SSHD-312] Provide fully asynchronous interfaces for ClientChannel and Command

Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/58c7a835
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/58c7a835
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/58c7a835

Branch: refs/heads/master
Commit: 58c7a835d653b45befb801175a2cb33a99889368
Parents: f138166
Author: Guillaume Nodet <gn...@apache.org>
Authored: Thu Jun 5 18:07:44 2014 +0200
Committer: Guillaume Nodet <gn...@apache.org>
Committed: Thu Jun 5 18:07:44 2014 +0200

----------------------------------------------------------------------
 .../java/org/apache/sshd/ClientChannel.java     |  17 ++
 .../java/org/apache/sshd/ClientSession.java     |   1 -
 .../main/java/org/apache/sshd/SshServer.java    |   2 +-
 .../sshd/agent/local/AgentForwardedChannel.java |   3 +
 .../sshd/agent/unix/AgentForwardedChannel.java  |   3 +
 .../client/channel/AbstractClientChannel.java   |  67 +++++++-
 .../sshd/client/channel/ChannelDirectTcpip.java |  15 +-
 .../sshd/client/channel/ChannelSession.java     |  63 ++++---
 .../sshd/common/channel/AbstractChannel.java    |   9 +-
 .../common/channel/BufferedIoOutputStream.java  |  92 ++++++++++
 .../common/channel/ChannelAsyncInputStream.java | 171 +++++++++++++++++++
 .../channel/ChannelAsyncOutputStream.java       | 161 +++++++++++++++++
 .../common/forward/DefaultTcpipForwarder.java   |   2 +-
 .../sshd/common/forward/TcpipClientChannel.java |   3 +
 .../sshd/common/forward/TcpipServerChannel.java |  15 +-
 .../apache/sshd/common/io/IoInputStream.java    |  31 ++++
 .../apache/sshd/common/io/IoOutputStream.java   |  31 ++++
 .../org/apache/sshd/common/io/IoReadFuture.java |  40 +++++
 .../org/apache/sshd/common/io/IoSession.java    |   1 +
 .../apache/sshd/common/io/IoWriteFuture.java    |  22 +--
 .../sshd/common/io/ReadPendingException.java    |  23 +++
 .../sshd/common/io/WritePendingException.java   |  23 +++
 .../apache/sshd/common/io/mina/MinaSession.java |  15 +-
 .../apache/sshd/common/io/nio2/Nio2Session.java |  15 +-
 .../sshd/common/session/AbstractSession.java    |  14 +-
 .../org/apache/sshd/common/util/Buffer.java     |  11 +-
 .../apache/sshd/common/util/CloseableUtils.java | 122 ++++++++++++-
 .../org/apache/sshd/server/AsyncCommand.java    |  53 ++++++
 .../sshd/server/channel/AsyncDataReceiver.java  |  48 ++++++
 .../sshd/server/channel/ChannelSession.java     |  48 ++++--
 .../sshd/server/x11/X11ForwardSupport.java      |   7 +-
 .../test/java/org/apache/sshd/ClientTest.java   |  95 ++++++++++-
 .../apache/sshd/util/AsyncEchoShellFactory.java | 140 +++++++++++++++
 33 files changed, 1279 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/ClientChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/ClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/ClientChannel.java
index 4b01b0a..653f490 100644
--- a/sshd-core/src/main/java/org/apache/sshd/ClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/ClientChannel.java
@@ -23,6 +23,8 @@ import java.io.InputStream;
 import java.io.OutputStream;
 
 import org.apache.sshd.client.future.OpenFuture;
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoOutputStream;
 import org.apache.sshd.common.future.CloseFuture;
 
 /**
@@ -47,6 +49,21 @@ public interface ClientChannel {
     int EXIT_SIGNAL = 0x0040;
     int OPENED =      0x0080;
 
+    enum Streaming {
+        Async,
+        Sync
+    }
+
+    Streaming getStreaming();
+
+    void setStreaming(Streaming streaming);
+
+    IoOutputStream getAsyncIn();
+
+    IoInputStream getAsyncOut();
+
+    IoInputStream getAsyncErr();
+
     /**
      * Access to an output stream to send data directly to the remote channel.
      * This can be used instead of using {@link #setIn(java.io.InputStream)} method

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/ClientSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/ClientSession.java b/sshd-core/src/main/java/org/apache/sshd/ClientSession.java
index 15b0005..bc4971d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/ClientSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/ClientSession.java
@@ -31,7 +31,6 @@ import org.apache.sshd.client.channel.ChannelShell;
 import org.apache.sshd.client.channel.ChannelSubsystem;
 import org.apache.sshd.client.future.AuthFuture;
 import org.apache.sshd.common.Session;
-import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.SshdSocketAddress;
 import org.apache.sshd.common.future.CloseFuture;
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/SshServer.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/SshServer.java b/sshd-core/src/main/java/org/apache/sshd/SshServer.java
index 7dd4403..69a5033 100644
--- a/sshd-core/src/main/java/org/apache/sshd/SshServer.java
+++ b/sshd-core/src/main/java/org/apache/sshd/SshServer.java
@@ -370,7 +370,7 @@ public class SshServer extends AbstractFactoryManager implements ServerFactoryMa
     }
 
     @Override
-    protected SshFuture<CloseFuture> doCloseGracefully() {
+    protected CloseFuture doCloseGracefully() {
         stopSessionTimeoutListener();
         CloseFuture future;
         if (acceptor != null) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
index 84d51eb..e632e2c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
@@ -75,6 +75,9 @@ public class AgentForwardedChannel extends AbstractClientChannel {
 
     @Override
     protected void doOpen() throws IOException {
+        if (streaming == Streaming.Async) {
+            throw new IllegalArgumentException("Asynchronous streaming isn't supported yet on this channel");
+        }
         invertedIn = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
index 4f097e4..ca7172a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
@@ -62,6 +62,9 @@ public class AgentForwardedChannel extends AbstractClientChannel implements Runn
 
     @Override
     protected synchronized void doOpen() throws IOException {
+        if (streaming == Streaming.Async) {
+            throw new IllegalArgumentException("Asynchronous streaming isn't supported yet on this channel");
+        }
         invertedIn = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
index 1e89c84..793e8b8 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
@@ -26,10 +26,15 @@ import org.apache.sshd.ClientChannel;
 import org.apache.sshd.client.future.DefaultOpenFuture;
 import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.Channel;
+import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.RequestHandler;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.channel.AbstractChannel;
+import org.apache.sshd.common.channel.ChannelAsyncInputStream;
+import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoOutputStream;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.util.Buffer;
@@ -45,6 +50,13 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
 
     protected volatile boolean opened;
     protected final String type;
+
+    protected Streaming streaming;
+
+    protected ChannelAsyncOutputStream asyncIn;
+    protected ChannelAsyncInputStream asyncOut;
+    protected ChannelAsyncInputStream asyncErr;
+
     protected InputStream in;
     protected OutputStream invertedIn;
     protected OutputStream out;
@@ -59,10 +71,31 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
 
     protected AbstractClientChannel(String type) {
         this.type = type;
+        this.streaming = Streaming.Sync;
         addRequestHandler(new ExitStatusChannelRequestHandler());
         addRequestHandler(new ExitSignalChannelRequestHandler());
     }
 
+    public Streaming getStreaming() {
+        return streaming;
+    }
+
+    public void setStreaming(Streaming streaming) {
+        this.streaming = streaming;
+    }
+
+    public IoOutputStream getAsyncIn() {
+        return asyncIn;
+    }
+
+    public IoInputStream getAsyncOut() {
+        return asyncOut;
+    }
+
+    public IoInputStream getAsyncErr() {
+        return asyncErr;
+    }
+
     public OutputStream getInvertedIn() {
         return invertedIn;
     }
@@ -137,6 +170,14 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
     }
 
     @Override
+    protected Closeable getInnerCloseable() {
+        return builder()
+                .parallel(asyncIn, asyncOut, asyncErr)
+                .close(super.getInnerCloseable())
+                .build();
+    }
+
+    @Override
     protected void doCloseImmediately() {
         // Close inverted streams after
         // If the inverted stream is closed before, there's a small time window
@@ -146,6 +187,8 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
         // which leads to an IOException("Pipe closed") when reading.
         IoUtils.closeQuietly(in, out, err);
         IoUtils.closeQuietly(invertedIn, invertedOut, invertedErr);
+        // TODO: graceful close ?
+        CloseableUtils.parallel(asyncIn, asyncOut, asyncErr).close(true);
         super.doCloseImmediately();
     }
 
@@ -252,23 +295,39 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
         if (state.get() != CloseableUtils.AbstractCloseable.OPENED) {
             return;
         }
-        if (out != null) {
+        if (asyncOut != null) {
+            asyncOut.write(new Buffer(data, off, len));
+        } else if (out != null) {
             out.write(data, off, len);
             out.flush();
+            localWindow.consumeAndCheck(len);
         } else {
             throw new IllegalStateException("No output stream for channel");
         }
-        localWindow.consumeAndCheck(len);
     }
 
     protected void doWriteExtendedData(byte[] data, int off, int len) throws IOException {
-        if (err != null) {
+        // If we're already closing, ignore incoming data
+        if (state.get() != CloseableUtils.AbstractCloseable.OPENED) {
+            return;
+        }
+        if (asyncErr != null) {
+            asyncErr.write(new Buffer(data, off, len));
+        } else if (err != null) {
             err.write(data, off, len);
             err.flush();
+            localWindow.consumeAndCheck(len);
         } else {
             throw new IllegalStateException("No error stream for channel");
         }
-        localWindow.consumeAndCheck(len);
+    }
+
+    @Override
+    public void handleWindowAdjust(Buffer buffer) throws IOException {
+        super.handleWindowAdjust(buffer);
+        if (asyncIn != null) {
+            asyncIn.onWindowExpanded();
+        }
     }
 
     public Integer getExitStatus() {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java
index 66d3456..36ffd28 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java
@@ -27,6 +27,8 @@ import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.SshdSocketAddress;
+import org.apache.sshd.common.channel.ChannelAsyncInputStream;
+import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
 import org.apache.sshd.common.channel.ChannelOutputStream;
 import org.apache.sshd.common.channel.ChannelPipedInputStream;
 import org.apache.sshd.common.channel.ChannelPipedOutputStream;
@@ -81,10 +83,15 @@ public class ChannelDirectTcpip extends AbstractClientChannel {
 
     @Override
     protected void doOpen() throws IOException {
-        invertedIn = out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
-        ChannelPipedInputStream pis = new ChannelPipedInputStream(localWindow);
-        pipe = new ChannelPipedOutputStream(pis);
-        invertedOut = in = pis;
+        if (streaming == Streaming.Async) {
+            asyncIn = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA);
+            asyncOut = new ChannelAsyncInputStream(this);
+        } else {
+            invertedIn = out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
+            ChannelPipedInputStream pis = new ChannelPipedInputStream(localWindow);
+            pipe = new ChannelPipedOutputStream(pis);
+            invertedOut = in = pis;
+        }
     }
 
     public OpenFuture open() throws IOException {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
index ac2b713..e02160b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
@@ -26,8 +26,9 @@ import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.channel.ChannelOutputStream;
 import org.apache.sshd.common.channel.ChannelPipedInputStream;
 import org.apache.sshd.common.channel.ChannelPipedOutputStream;
+import org.apache.sshd.common.channel.ChannelAsyncInputStream;
+import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
 import org.apache.sshd.common.future.CloseFuture;
-import org.apache.sshd.common.util.Buffer;
 import org.apache.sshd.common.util.CloseableUtils;
 
 /**
@@ -49,31 +50,47 @@ public class ChannelSession extends AbstractClientChannel {
 
     @Override
     protected void doOpen() throws IOException {
-        invertedIn = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
-        if (out == null) {
-            ChannelPipedInputStream pis = new ChannelPipedInputStream(localWindow);
-            ChannelPipedOutputStream pos = new ChannelPipedOutputStream(pis);
-            out = pos;
-            invertedOut = pis;
-        }
-        if (err == null) {
-            ChannelPipedInputStream pis = new ChannelPipedInputStream(localWindow);
-            ChannelPipedOutputStream pos = new ChannelPipedOutputStream(pis);
-            err = pos;
-            invertedErr = pis;
-        }
-        if (in != null) {
-            streamPumper = new Thread("ClientInputStreamPump") {
+        if (streaming == Streaming.Async) {
+            asyncIn = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA) {
                 @Override
-                public void run() {
-                    pumpInputStream();
+                protected CloseFuture doCloseGracefully() {
+                    try {
+                        sendEof();
+                    } catch (IOException e) {
+                        session.exceptionCaught(e);
+                    }
+                    return super.doCloseGracefully();
                 }
             };
-            // Interrupt does not really work and the thread will only exit when
-            // the call to read() will return.  So ensure this thread is a daemon
-            // to avoid blocking the whole app
-            streamPumper.setDaemon(true);
-            streamPumper.start();
+            asyncOut = new ChannelAsyncInputStream(this);
+            asyncErr = new ChannelAsyncInputStream(this);
+        } else {
+            invertedIn = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
+            if (out == null) {
+                ChannelPipedInputStream pis = new ChannelPipedInputStream(localWindow);
+                ChannelPipedOutputStream pos = new ChannelPipedOutputStream(pis);
+                out = pos;
+                invertedOut = pis;
+            }
+            if (err == null) {
+                ChannelPipedInputStream pis = new ChannelPipedInputStream(localWindow);
+                ChannelPipedOutputStream pos = new ChannelPipedOutputStream(pis);
+                err = pos;
+                invertedErr = pis;
+            }
+            if (in != null) {
+                streamPumper = new Thread("ClientInputStreamPump") {
+                    @Override
+                    public void run() {
+                        pumpInputStream();
+                    }
+                };
+                // Interrupt does not really work and the thread will only exit when
+                // the call to read() will return.  So ensure this thread is a daemon
+                // to avoid blocking the whole app
+                streamPumper.setDaemon(true);
+                streamPumper.start();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index 2e333e2..4bfb4e6 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -139,7 +139,7 @@ public abstract class AbstractChannel extends CloseableUtils.AbstractInnerClosea
     }
 
     public void handleClose() throws IOException {
-            log.debug("Received SSH_MSG_CHANNEL_CLOSE on channel {}", this);
+        log.debug("Received SSH_MSG_CHANNEL_CLOSE on channel {}", this);
         if (gracefulState.compareAndSet(0, CLOSE_RECV)) {
             close(false);
         } else if (gracefulState.compareAndSet(CLOSE_SENT, CLOSE_SENT | CLOSE_RECV)) {
@@ -147,7 +147,7 @@ public abstract class AbstractChannel extends CloseableUtils.AbstractInnerClosea
         }
     }
 
-    protected Closeable getGracefulCloseable() {
+    protected Closeable getInnerCloseable() {
         return new Closeable() {
             public boolean isClosed() {
                 return gracefulFuture.isClosed();
@@ -188,11 +188,6 @@ public abstract class AbstractChannel extends CloseableUtils.AbstractInnerClosea
     }
 
     @Override
-    protected Closeable getInnerCloseable() {
-        return getGracefulCloseable();
-    }
-
-    @Override
     protected void doCloseImmediately() {
         super.doCloseImmediately();
         service.unregisterChannel(AbstractChannel.this);

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
new file mode 100644
index 0000000..3c342c3
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.channel;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.sshd.common.Closeable;
+import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoOutputStream;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.util.Buffer;
+import org.apache.sshd.common.util.CloseableUtils;
+
+/**
+ * An IoOutputStream capable of queuing write requests
+ */
+public class BufferedIoOutputStream extends CloseableUtils.AbstractInnerCloseable implements IoOutputStream {
+
+    private final IoOutputStream out;
+    private final Queue<ChannelAsyncOutputStream.IoWriteFutureImpl> writes = new ConcurrentLinkedQueue<ChannelAsyncOutputStream.IoWriteFutureImpl>();
+    private final AtomicReference<ChannelAsyncOutputStream.IoWriteFutureImpl> currentWrite = new AtomicReference<ChannelAsyncOutputStream.IoWriteFutureImpl>();
+
+    public BufferedIoOutputStream(IoOutputStream out) {
+        this.out = out;
+    }
+
+    public IoWriteFuture write(Buffer buffer) {
+        final ChannelAsyncOutputStream.IoWriteFutureImpl future = new ChannelAsyncOutputStream.IoWriteFutureImpl(buffer);
+        if (isClosing()) {
+            future.setValue(new IOException("Closed"));
+        } else {
+            writes.add(future);
+            startWriting();
+        }
+        return future;
+    }
+
+    private void startWriting() {
+        final ChannelAsyncOutputStream.IoWriteFutureImpl future = writes.peek();
+        if (future != null) {
+            if (currentWrite.compareAndSet(null, future)) {
+                out.write(future.getBuffer()).addListener(new SshFutureListener<IoWriteFuture>() {
+                    public void operationComplete(IoWriteFuture f) {
+                        if (f.isWritten()) {
+                            future.setValue(true);
+                        } else {
+                            future.setValue(f.getException());
+                        }
+                        finishWrite();
+                    }
+                    private void finishWrite() {
+                        writes.remove(future);
+                        currentWrite.compareAndSet(future, null);
+                        startWriting();
+                    }
+                });
+            }
+        }
+    }
+
+    @Override
+    protected Closeable getInnerCloseable() {
+        return builder()
+                .when(writes)
+                .close(out)
+                .build();
+    }
+
+    @Override
+    public String toString() {
+        return "BufferedIoOutputStream[" + out + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
new file mode 100644
index 0000000..71155f1
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.channel;
+
+import java.io.IOException;
+
+import org.apache.sshd.common.Channel;
+import org.apache.sshd.common.Closeable;
+import org.apache.sshd.common.RuntimeSshException;
+import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.future.DefaultCloseFuture;
+import org.apache.sshd.common.future.DefaultSshFuture;
+import org.apache.sshd.common.future.SshFuture;
+import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoReadFuture;
+import org.apache.sshd.common.io.ReadPendingException;
+import org.apache.sshd.common.util.Buffer;
+import org.apache.sshd.common.util.CloseableUtils;
+import org.apache.sshd.common.util.Readable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ChannelAsyncInputStream extends CloseableUtils.AbstractInnerCloseable implements IoInputStream {
+
+    private final Channel channel;
+    private final Buffer buffer = new Buffer();
+    private IoReadFutureImpl pending;
+
+    public ChannelAsyncInputStream(Channel channel) {
+        this.channel = channel;
+    }
+
+    public void write(Readable src) throws IOException {
+        synchronized (buffer) {
+            buffer.putBuffer(src);
+        }
+        doRead(true);
+    }
+
+    public IoReadFuture read(Buffer buf) {
+        IoReadFutureImpl future = new IoReadFutureImpl(buf);
+        if (isClosing()) {
+            future.setValue(new IOException("Closed"));
+        } else {
+            synchronized (buffer) {
+                if (pending != null) {
+                    throw new ReadPendingException();
+                }
+                pending = future;
+            }
+            doRead(false);
+        }
+        return future;
+    }
+
+    @Override
+    protected Closeable getInnerCloseable() {
+        synchronized (buffer) {
+            if (buffer.available() == 0) {
+                if (pending != null) {
+                    pending.setValue(new SshException("Closed"));
+                }
+            }
+            return builder()
+                    .when(pending)
+                    .build();
+        }
+    }
+
+    private void doRead(boolean resume) {
+        IoReadFutureImpl future = null;
+        int nbRead = 0;
+        synchronized (buffer) {
+            if (buffer.available() > 0) {
+                if (resume) {
+//                    LOGGER.debug("Resuming read due to incoming data");
+                }
+                future = pending;
+                pending = null;
+                if (future != null) {
+                    nbRead = future.buffer.putBuffer(buffer, false);
+                    buffer.compact();
+                }
+            } else {
+                if (!resume) {
+//                    LOGGER.debug("Delaying read until data is available");
+                }
+            }
+        }
+        if (nbRead > 0) {
+            try {
+                channel.getLocalWindow().consumeAndCheck(nbRead);
+            } catch (IOException e) {
+                channel.getSession().exceptionCaught(e);
+            }
+            future.setValue(nbRead);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "ChannelAsyncInputStream[" + channel + "]";
+    }
+
+    public static class IoReadFutureImpl extends DefaultSshFuture<IoReadFuture> implements IoReadFuture {
+
+        final Buffer buffer;
+
+        public IoReadFutureImpl(Buffer buffer) {
+            super(null);
+            this.buffer = buffer;
+        }
+
+        public Buffer getBuffer() {
+            return buffer;
+        }
+
+        public void verify() throws SshException {
+            try {
+                await();
+            }
+            catch (InterruptedException e) {
+                throw new SshException("Interrupted", e);
+            }
+            if (getValue() instanceof Throwable) {
+                throw new SshException("Write failed", getException());
+            }
+        }
+        public int getRead() {
+            Object v = getValue();
+            if (v instanceof RuntimeException) {
+                throw (RuntimeException) v;
+            } else if (v instanceof Error) {
+                throw (Error) v;
+            } else if (v instanceof Throwable) {
+                throw (RuntimeSshException) new RuntimeSshException("Error reading from channel.").initCause((Throwable) v);
+            } else if (v instanceof Integer) {
+                return (Integer) v;
+            } else {
+                throw new IllegalStateException();
+            }
+        }
+
+        public Throwable getException() {
+            Object v = getValue();
+            if (v instanceof Throwable) {
+                return (Throwable) v;
+            } else {
+                return null;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
new file mode 100644
index 0000000..3455c15
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.channel;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.sshd.common.Channel;
+import org.apache.sshd.common.Closeable;
+import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.future.DefaultSshFuture;
+import org.apache.sshd.common.io.IoOutputStream;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.io.WritePendingException;
+import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.util.Buffer;
+import org.apache.sshd.common.util.CloseableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ChannelAsyncOutputStream extends CloseableUtils.AbstractInnerCloseable implements IoOutputStream {
+
+    private final Channel channel;
+    private final byte cmd;
+    private final AtomicReference<IoWriteFutureImpl> pendingWrite = new AtomicReference<IoWriteFutureImpl>();
+
+    public ChannelAsyncOutputStream(Channel channel, byte cmd) {
+        this.channel = channel;
+        this.cmd = cmd;
+    }
+
+    public void onWindowExpanded() throws IOException {
+        doWriteIfPossible(true);
+    }
+
+    public synchronized IoWriteFuture write(final Buffer buffer) {
+        final IoWriteFutureImpl future = new IoWriteFutureImpl(buffer);
+        if (isClosing()) {
+            future.setValue(new IOException("Closed"));
+        } else {
+            if (!pendingWrite.compareAndSet(null, future)) {
+                throw new WritePendingException();
+            }
+            doWriteIfPossible(false);
+        }
+        return future;
+    }
+
+    @Override
+    protected Closeable getInnerCloseable() {
+        return builder()
+                .when(pendingWrite.get())
+                .build();
+    }
+
+    protected synchronized void doWriteIfPossible(boolean resume) {
+        final IoWriteFutureImpl future = pendingWrite.get();
+        if (future != null) {
+            final Buffer buffer = future.buffer;
+            final int total = buffer.available();
+            if (total > 0) {
+                final int length = Math.min(Math.min(channel.getRemoteWindow().getSize(), total), channel.getRemoteWindow().getPacketSize());
+                if (length > 0) {
+                    if (resume) {
+                        log.debug("Resuming write due to more space available in the remote window");
+                    }
+                    Buffer buf = channel.getSession().createBuffer(cmd, length + 12);
+                    buf.putInt(channel.getRecipient());
+                    if (cmd == SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA) {
+                        buf.putInt(1);
+                    }
+                    buf.putInt(length);
+                    buf.putRawBytes(buffer.array(), buffer.rpos(), length);
+                    buffer.rpos(buffer.rpos() + length);
+                    channel.getRemoteWindow().consume(length);
+                    try {
+                        channel.getSession().writePacket(buf).addListener(new SshFutureListener<org.apache.sshd.common.io.IoWriteFuture>() {
+                            public void operationComplete(org.apache.sshd.common.io.IoWriteFuture f) {
+                                if (total > length) {
+                                    doWriteIfPossible(false);
+                                } else {
+                                    pendingWrite.compareAndSet(future, null);
+                                    future.setValue(true);
+                                }
+                            }
+                        });
+                    } catch (IOException e) {
+                        future.setValue(e);
+                    }
+                } else if (!resume) {
+                    log.debug("Delaying write until space is available in the remote window");
+                }
+            } else {
+                pendingWrite.compareAndSet(future, null);
+                future.setValue(true);
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "ChannelAsyncOutputStream[" + channel + "]";
+    }
+
+    public static class IoWriteFutureImpl extends DefaultSshFuture<IoWriteFuture> implements IoWriteFuture {
+
+        final Buffer buffer;
+
+        public IoWriteFutureImpl(Buffer buffer) {
+            super(null);
+            this.buffer = buffer;
+        }
+
+        public Buffer getBuffer() {
+            return buffer;
+        }
+
+        public void verify() throws SshException {
+            try {
+                await();
+            }
+            catch (InterruptedException e) {
+                throw new SshException("Interrupted", e);
+            }
+            if (!isWritten()) {
+                throw new SshException("Write failed", getException());
+            }
+        }
+
+        public boolean isWritten() {
+            return getValue() instanceof Boolean;
+        }
+
+        public Throwable getException() {
+            Object v = getValue();
+            if (v instanceof Throwable) {
+                return (Throwable) v;
+            } else {
+                return null;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
index 57e73f0..6d664d7 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
@@ -164,7 +164,7 @@ public class DefaultTcpipForwarder extends CloseableUtils.AbstractInnerCloseable
 
     @Override
     protected synchronized Closeable getInnerCloseable() {
-        return acceptor != null ? acceptor : new CloseableUtils.AbstractCloseable() { };
+        return builder().close(acceptor).build();
     }
 
     //

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
index 479fb24..e449c8d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
@@ -96,6 +96,9 @@ public class TcpipClientChannel extends AbstractClientChannel {
 
     @Override
     protected synchronized void doOpen() throws IOException {
+        if (streaming == Streaming.Async) {
+            throw new IllegalArgumentException("Asynchronous streaming isn't supported yet on this channel");
+        }
         invertedIn = out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
index 667876a..9bbdea3 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
@@ -36,6 +36,7 @@ import org.apache.sshd.common.io.IoConnectFuture;
 import org.apache.sshd.common.io.IoConnector;
 import org.apache.sshd.common.io.IoHandler;
 import org.apache.sshd.common.io.IoSession;
+import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.util.Buffer;
 import org.apache.sshd.common.util.Readable;
 import org.apache.sshd.server.channel.AbstractServerChannel;
@@ -107,6 +108,7 @@ public class TcpipServerChannel extends AbstractServerChannel {
             return f;
         }
 
+        // TODO: revisit for better threading. Use async io ?
         out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
         IoHandler handler = new IoHandler() {
             public void messageReceived(IoSession session, Readable message) throws Exception {
@@ -182,12 +184,19 @@ public class TcpipServerChannel extends AbstractServerChannel {
         });
     }
 
-    protected void doWriteData(byte[] data, int off, int len) throws IOException {
-        localWindow.consumeAndCheck(len);
+    protected void doWriteData(byte[] data, int off, final int len) throws IOException {
         // Make sure we copy the data as the incoming buffer may be reused
         Buffer buf = new Buffer(data, off, len);
         buf = new Buffer(buf.getCompactData());
-        ioSession.write(buf);
+        ioSession.write(buf).addListener(new SshFutureListener<IoWriteFuture>() {
+            public void operationComplete(IoWriteFuture future) {
+                try {
+                    localWindow.consumeAndCheck(len);
+                } catch (IOException e) {
+                    session.exceptionCaught(e);
+                }
+            }
+        });
     }
 
     protected void doWriteExtendedData(byte[] data, int off, int len) throws IOException {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/io/IoInputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoInputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoInputStream.java
new file mode 100644
index 0000000..c585b67
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoInputStream.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io;
+
+import org.apache.sshd.common.Closeable;
+import org.apache.sshd.common.util.Buffer;
+
+public interface IoInputStream extends Closeable {
+
+    /**
+     * NOTE: the buffer must not be touched until the returned read future is completed.
+     */
+    IoReadFuture read(Buffer buffer);
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/io/IoOutputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoOutputStream.java
new file mode 100644
index 0000000..f2b8994
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoOutputStream.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io;
+
+import org.apache.sshd.common.Closeable;
+import org.apache.sshd.common.util.Buffer;
+
+public interface IoOutputStream extends Closeable {
+
+    /**
+     * NOTE: the buffer must not be touched until the returned write future is completed.
+     */
+    IoWriteFuture write(Buffer buffer);
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/io/IoReadFuture.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoReadFuture.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoReadFuture.java
new file mode 100644
index 0000000..d4f07a9
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoReadFuture.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io;
+
+import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.future.SshFuture;
+import org.apache.sshd.common.util.Buffer;
+
+public interface IoReadFuture extends SshFuture<IoReadFuture> {
+
+    /**
+     * Wait and verify that the read succeeded.
+     *
+     * @throws org.apache.sshd.common.SshException if the write failed for any reason
+     */
+    void verify() throws SshException;
+
+    Buffer getBuffer();
+
+    int getRead();
+
+    Throwable getException();
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/io/IoSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoSession.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoSession.java
index 9e78fba..caccfca 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/IoSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoSession.java
@@ -62,6 +62,7 @@ public interface IoSession extends Closeable {
 
     /**
      * Write a packet on the socket.
+     * Multiple writes can be issued concurrently and will be queued.
      */
     IoWriteFuture write(Buffer buffer);
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/io/IoWriteFuture.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoWriteFuture.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoWriteFuture.java
index 79d1ae2..121d135 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/IoWriteFuture.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoWriteFuture.java
@@ -18,11 +18,19 @@
  */
 package org.apache.sshd.common.io;
 
+import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.future.SshFuture;
 
 public interface IoWriteFuture extends SshFuture<IoWriteFuture> {
 
     /**
+     * Wait and verify that the write succeeded.
+     *
+     * @throws SshException if the write failed for any reason
+     */
+    void verify() throws SshException;
+
+    /**
      * Returns <tt>true</tt> if the write operation is finished successfully.
      */
     boolean isWritten();
@@ -34,18 +42,4 @@ public interface IoWriteFuture extends SshFuture<IoWriteFuture> {
      */
     Throwable getException();
 
-    /**
-     * Sets the message is written, and notifies all threads waiting for
-     * this future.  This method is invoked by MINA internally.  Please do
-     * not call this method directly.
-     */
-    void setWritten();
-
-    /**
-     * Sets the cause of the write failure, and notifies all threads waiting
-     * for this future.  This method is invoked by MINA internally.  Please
-     * do not call this method directly.
-     */
-    void setException(Throwable cause);
-
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/io/ReadPendingException.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/ReadPendingException.java b/sshd-core/src/main/java/org/apache/sshd/common/io/ReadPendingException.java
new file mode 100644
index 0000000..261acac
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/ReadPendingException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io;
+
+public class ReadPendingException extends IllegalStateException {
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/io/WritePendingException.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/WritePendingException.java b/sshd-core/src/main/java/org/apache/sshd/common/io/WritePendingException.java
new file mode 100644
index 0000000..5fde663
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/WritePendingException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io;
+
+public class WritePendingException extends IllegalStateException {
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java b/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java
index d366fb6..6f07207 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java
@@ -25,6 +25,7 @@ import org.apache.mina.core.future.IoFuture;
 import org.apache.mina.core.future.IoFutureListener;
 import org.apache.mina.core.future.WriteFuture;
 import org.apache.sshd.common.Closeable;
+import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.future.DefaultCloseFuture;
 import org.apache.sshd.common.future.DefaultSshFuture;
 import org.apache.sshd.common.io.IoService;
@@ -106,6 +107,18 @@ public class MinaSession extends CloseableUtils.AbstractInnerCloseable implement
                 super(lock);
             }
 
+            public void verify() throws SshException {
+                try {
+                    await();
+                }
+                catch (InterruptedException e) {
+                    throw new SshException("Interrupted", e);
+                }
+                if (!isWritten()) {
+                    throw new SshException("Write failed", getException());
+                }
+            }
+
             public boolean isWritten() {
                 return getValue() instanceof Boolean;
             }
@@ -126,7 +139,7 @@ public class MinaSession extends CloseableUtils.AbstractInnerCloseable implement
                 setValue(exception);
             }
         }
-        final IoWriteFuture future = new Future(null);
+        final Future future = new Future(null);
         session.write(MinaSupport.asIoBuffer(buffer)).addListener(new IoFutureListener<WriteFuture>() {
             public void operationComplete(WriteFuture cf) {
                 if (cf.getException() != null) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
index c044815..03b17e6 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
@@ -31,6 +31,7 @@ import java.util.concurrent.LinkedTransferQueue;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.DefaultSshFuture;
 import org.apache.sshd.common.future.SshFuture;
@@ -118,7 +119,7 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
 
     private void exceptionCaught(Throwable exc) {
         if (!closeFuture.isClosed()) {
-            if (state.get() != OPENED || !socket.isOpen()) {
+            if (isClosing() || !socket.isOpen()) {
                 close(true);
             } else {
                 try {
@@ -253,6 +254,18 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
             super(lock);
             this.buffer = buffer;
         }
+        public void verify() throws SshException {
+            try {
+                await();
+            }
+            catch (InterruptedException e) {
+                throw new SshException("Interrupted", e);
+            }
+            if (!isWritten()) {
+                throw new SshException("Write failed", getException());
+            }
+        }
+
         public boolean isWritten() {
             return getValue() instanceof Boolean;
         }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
index f2cddf9..ac859d7 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
@@ -1266,6 +1266,18 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
             return buffer;
         }
 
+        public void verify() throws SshException {
+            try {
+                await();
+            }
+            catch (InterruptedException e) {
+                throw new SshException("Interrupted", e);
+            }
+            if (!isWritten()) {
+                throw new SshException("Write failed", getException());
+            }
+        }
+
         public boolean isWritten() {
             return getValue() instanceof Boolean;
         }
@@ -1290,7 +1302,7 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
             if (future.isWritten()) {
                 setWritten();
             } else {
-                future.setException(future.getException());
+                setException(future.getException());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/util/Buffer.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/Buffer.java b/sshd-core/src/main/java/org/apache/sshd/common/util/Buffer.java
index 42c0d50..824144b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/Buffer.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/Buffer.java
@@ -116,6 +116,10 @@ public final class Buffer implements Readable {
         return wpos - rpos;
     }
 
+    public int capacity() {
+        return data.length - wpos;
+    }
+
     public byte[] array() {
         return data;
     }
@@ -372,10 +376,15 @@ public final class Buffer implements Readable {
     }
 
     public void putBuffer(Readable buffer) {
-        int r = buffer.available();
+        putBuffer(buffer, true);
+    }
+
+    public int putBuffer(Readable buffer, boolean expand) {
+        int r = expand ? buffer.available() : Math.min(buffer.available(), capacity());
         ensureCapacity(r);
         buffer.getRawBytes(data, wpos, r);
         wpos += r;
+        return r;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java b/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java
index 2ae71c9..5d1d97f 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.sshd.common.Closeable;
+import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.DefaultCloseFuture;
 import org.apache.sshd.common.future.DefaultSshFuture;
@@ -197,7 +198,11 @@ public class CloseableUtils {
     }
 
     public static <T extends SshFuture> CloseFuture parallel(final SshFuture<T>... futures) {
-        final CloseFuture future = new DefaultCloseFuture(null);
+        return parallel(null, futures);
+    }
+
+    public static <T extends SshFuture> CloseFuture parallel(Object lock, final SshFuture<T>... futures) {
+        final CloseFuture future = new DefaultCloseFuture(lock);
         if (futures.length > 0) {
             final AtomicInteger count = new AtomicInteger(futures.length);
             SshFutureListener<T> listener = new SshFutureListener<T>() {
@@ -220,6 +225,98 @@ public class CloseableUtils {
         return future;
     }
 
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static Builder builder(Logger logger, Object lock) {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private final Object lock;
+        private Closeable closeable = null;
+        public Builder() {
+            this(null);
+        }
+        public Builder(Object lock) {
+            this.lock = lock;
+        }
+        public <T extends SshFuture> Builder when(final SshFuture<T>... futures) {
+            return close(new Closeable() {
+                private volatile boolean closing;
+                private volatile boolean closed;
+                public CloseFuture close(boolean immediately) {
+                    closing = true;
+                    if (immediately) {
+                        for (SshFuture<?> future : futures) {
+                            if (future instanceof DefaultSshFuture) {
+                                ((DefaultSshFuture<?>) future).setValue(new SshException("Closed"));
+                            }
+                        }
+                        closed = true;
+                        return closed();
+                    } else {
+                        return CloseableUtils.parallel(lock, futures).addListener(new SshFutureListener<CloseFuture>() {
+                            public void operationComplete(CloseFuture future) {
+                                closed = true;
+                            }
+                        });
+                    }
+                }
+
+                public boolean isClosed() {
+                    return closed;
+                }
+
+                public boolean isClosing() {
+                    return closing || closed;
+                }
+            });
+        }
+        public <T extends SshFuture> Builder when(Collection<? extends SshFuture<T>> futures) {
+            return when(futures.toArray(new SshFuture[futures.size()]));
+        }
+        public Builder sequential(Closeable... closeables) {
+            return close(CloseableUtils.sequential(lock, closeables));
+        }
+        public Builder sequential(Collection<Closeable> closeables) {
+            return close(CloseableUtils.sequential(lock, closeables));
+        }
+        public Builder parallel(Closeable... closeables) {
+            return close(CloseableUtils.parallel(lock, closeables));
+        }
+        public Builder parallel(Collection<? extends Closeable> closeables) {
+            return close(CloseableUtils.parallel(lock, closeables));
+        }
+        public Builder close(Closeable c) {
+            if (closeable == null) {
+                closeable = c;
+            } else {
+                closeable = CloseableUtils.sequential(lock, closeable, c);
+            }
+            return this;
+        }
+        public Closeable build() {
+            if (closeable == null) {
+                closeable = new Closeable() {
+                    private volatile boolean closed;
+                    public CloseFuture close(boolean immediately) {
+                        closed = true;
+                        return closed();
+                    }
+                    public boolean isClosed() {
+                        return closed;
+                    }
+                    public boolean isClosing() {
+                        return closed;
+                    }
+                };
+            }
+            return closeable;
+        }
+    }
+
     public static abstract class AbstractCloseable implements Closeable {
 
         protected static final int OPENED = 0;
@@ -230,11 +327,20 @@ public class CloseableUtils {
         /** Our logger */
         protected final Logger log = LoggerFactory.getLogger(getClass());
         /** Lock object for this session state */
-        protected final Object lock = new Object();
+        protected final Object lock;
         /** State of this object */
         protected final AtomicInteger state = new AtomicInteger(OPENED);
         /** A future that will be set 'closed' when the object is actually closed */
-        protected final CloseFuture closeFuture = new DefaultCloseFuture(lock);
+        protected final CloseFuture closeFuture;
+
+        protected AbstractCloseable() {
+            this(new Object());
+        }
+
+        protected AbstractCloseable(Object lock) {
+            this.lock = lock;
+            this.closeFuture = new DefaultCloseFuture(lock);
+        }
 
         public CloseFuture close(boolean immediately) {
             if (immediately) {
@@ -256,7 +362,7 @@ public class CloseableUtils {
                             public void operationComplete(CloseFuture future) {
                                 if (state.compareAndSet(GRACEFUL, IMMEDIATE)) {
                                     doCloseImmediately();
-                                    log.debug("{} closed", this);
+                                    log.debug("{} closed", AbstractCloseable.this);
                                 }
                             }
                         });
@@ -284,7 +390,7 @@ public class CloseableUtils {
         protected void preClose() {
         }
 
-        protected SshFuture<CloseFuture> doCloseGracefully() {
+        protected CloseFuture doCloseGracefully() {
             return null;
         }
 
@@ -293,6 +399,10 @@ public class CloseableUtils {
             state.set(CLOSED);
         }
 
+        protected Builder builder() {
+            return new Builder(lock);
+        }
+
     }
 
     public static abstract class AbstractInnerCloseable extends AbstractCloseable {
@@ -300,7 +410,7 @@ public class CloseableUtils {
         protected abstract Closeable getInnerCloseable();
 
         @Override
-        protected SshFuture<CloseFuture> doCloseGracefully() {
+        protected CloseFuture doCloseGracefully() {
             return getInnerCloseable().close(false);
         }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/server/AsyncCommand.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/AsyncCommand.java b/sshd-core/src/main/java/org/apache/sshd/server/AsyncCommand.java
new file mode 100644
index 0000000..8654dce
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/server/AsyncCommand.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.server;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoOutputStream;
+
+/**
+ * Represents a command capable of doing non-blocking io.
+ * If this interface is implemented by a command, the usual
+ * blocking input / output / error streams won't be set.
+ */
+public interface AsyncCommand extends Command {
+
+    /**
+     * Set the input stream that can be used by the shell to read input.
+     * @param in
+     */
+    void setIoInputStream(IoInputStream in);
+
+    /**
+     * Set the output stream that can be used by the shell to write its output.
+     * @param out
+     */
+    void setIoOutputStream(IoOutputStream out);
+
+    /**
+     * Set the error stream that can be used by the shell to write its errors.
+     * @param err
+     */
+    void setIoErrorStream(IoOutputStream err);
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/server/channel/AsyncDataReceiver.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/AsyncDataReceiver.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/AsyncDataReceiver.java
new file mode 100644
index 0000000..24d19b3
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/AsyncDataReceiver.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.server.channel;
+
+import java.io.IOException;
+
+import org.apache.sshd.common.Channel;
+import org.apache.sshd.common.channel.ChannelAsyncInputStream;
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.util.Buffer;
+
+public class AsyncDataReceiver implements ChannelDataReceiver {
+
+    private ChannelAsyncInputStream in;
+
+    public AsyncDataReceiver(Channel channel) {
+        in = new ChannelAsyncInputStream(channel);
+    }
+
+    public IoInputStream getIn() {
+        return in;
+    }
+
+    public int data(ChannelSession channel, byte[] buf, int start, int len) throws IOException {
+        in.write(new Buffer(buf, start, len));
+        return 0;
+    }
+
+    public void close() throws IOException {
+        in.close(false);
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
index 5866a59..b797346 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
@@ -39,6 +39,8 @@ import org.apache.sshd.common.NamedFactory;
 import org.apache.sshd.common.PtyMode;
 import org.apache.sshd.common.RequestHandler;
 import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.channel.ChannelAsyncInputStream;
+import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
 import org.apache.sshd.common.channel.ChannelOutputStream;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.DefaultCloseFuture;
@@ -47,6 +49,7 @@ import org.apache.sshd.common.util.Buffer;
 import org.apache.sshd.common.util.CloseableUtils;
 import org.apache.sshd.common.util.IoUtils;
 import org.apache.sshd.common.util.LoggingFilterOutputStream;
+import org.apache.sshd.server.AsyncCommand;
 import org.apache.sshd.server.ChannelSessionAware;
 import org.apache.sshd.server.Command;
 import org.apache.sshd.server.Environment;
@@ -173,6 +176,8 @@ public class ChannelSession extends AbstractServerChannel {
     }
 
     protected String type;
+    protected ChannelAsyncOutputStream asyncOut;
+    protected ChannelAsyncOutputStream asyncErr;
     protected OutputStream out;
     protected OutputStream err;
     protected Command command;
@@ -238,6 +243,8 @@ public class ChannelSession extends AbstractServerChannel {
         }
         remoteWindow.notifyClosed();
         IoUtils.closeQuietly(out, err, receiver);
+        // TODO: graceful close ?
+        CloseableUtils.parallel(asyncOut, asyncErr).close(true);
         super.doCloseImmediately();
     }
 
@@ -436,7 +443,8 @@ public class ChannelSession extends AbstractServerChannel {
 
     /**
      * For {@link Command} to install {@link ChannelDataReceiver}.
-     * When you do this, {@link Command#setInputStream(InputStream)}
+     * When you do this, {@link Command#setInputStream(InputStream)} or
+     * {@link org.apache.sshd.server.AsyncCommand#setIoInputStream(org.apache.sshd.common.io.IoInputStream)}
      * will no longer be invoked. If you call this method from {@link Command#start(Environment)},
      * the input stream you received in {@link Command#setInputStream(InputStream)} will
      * not read any data.
@@ -447,7 +455,7 @@ public class ChannelSession extends AbstractServerChannel {
 
     protected void prepareCommand() throws IOException {
         // Add the user
-        addEnvVariable(Environment.ENV_USER, ((ServerSession) session).getUsername());
+        addEnvVariable(Environment.ENV_USER, session.getUsername());
         // If the shell wants to be aware of the session, let's do that
         if (command instanceof SessionAware) {
             ((SessionAware) command).setSession((ServerSession) session);
@@ -460,21 +468,35 @@ public class ChannelSession extends AbstractServerChannel {
             FileSystemFactory factory = ((ServerSession) session).getFactoryManager().getFileSystemFactory();
             ((FileSystemAware) command).setFileSystemView(factory.createFileSystemView(session));
         }
-        out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
-        err = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA);
-        if (log != null && log.isTraceEnabled()) {
-            // Wrap in logging filters
-            out = new LoggingFilterOutputStream(out, "OUT:", log);
-            err = new LoggingFilterOutputStream(err, "ERR:", log);
+        // If the shell wants to use non-blocking io
+        if (command instanceof AsyncCommand) {
+            asyncOut = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA);
+            asyncErr = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA);
+            ((AsyncCommand) command).setIoOutputStream(asyncOut);
+            ((AsyncCommand) command).setIoErrorStream(asyncErr);
+        } else {
+            out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
+            err = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA);
+            if (log.isTraceEnabled()) {
+                // Wrap in logging filters
+                out = new LoggingFilterOutputStream(out, "OUT:", log);
+                err = new LoggingFilterOutputStream(err, "ERR:", log);
+            }
+            command.setOutputStream(out);
+            command.setErrorStream(err);
         }
-        command.setOutputStream(out);
-        command.setErrorStream(err);
         if (this.receiver==null) {
             // if the command hasn't installed any ChannelDataReceiver, install the default
             // and give the command an InputStream
-            PipeDataReceiver recv = new PipeDataReceiver(localWindow);
-            setDataReceiver(recv);
-            command.setInputStream(recv.getIn());
+            if (command instanceof AsyncCommand) {
+                AsyncDataReceiver recv = new AsyncDataReceiver(this);
+                setDataReceiver(recv);
+                ((AsyncCommand) command).setIoInputStream(recv.getIn());
+            } else {
+                PipeDataReceiver recv = new PipeDataReceiver(localWindow);
+                setDataReceiver(recv);
+                command.setInputStream(recv.getIn());
+            }
         }
         if (tempBuffer != null) {
             Buffer buffer = tempBuffer;

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java b/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
index f43f528..2b878b8 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
@@ -101,7 +101,7 @@ public class X11ForwardSupport extends CloseableUtils.AbstractInnerCloseable imp
         }
 
         int displayNumber, port;
-        InetSocketAddress addr = null;
+        InetSocketAddress addr;
 
         for (displayNumber = X11_DISPLAY_OFFSET; displayNumber < MAX_DISPLAYS; displayNumber++) {
             port = 6000 + displayNumber;
@@ -124,7 +124,7 @@ public class X11ForwardSupport extends CloseableUtils.AbstractInnerCloseable imp
 
         // only support non windows systems
         String os = System.getProperty("os.name").toLowerCase();
-        if (os.indexOf("windows") < 0) {
+        if (!os.contains("windows")) {
             try {
                 String authDisplay = "unix:" + displayNumber + "." + screen;
                 Process p = new ProcessBuilder(xauthCommand, "remove", authDisplay).start();
@@ -204,6 +204,9 @@ public class X11ForwardSupport extends CloseableUtils.AbstractInnerCloseable imp
 
         @Override
         protected synchronized void doOpen() throws IOException {
+            if (streaming == Streaming.Async) {
+                throw new IllegalArgumentException("Asynchronous streaming isn't supported yet on this channel");
+            }
             invertedIn = out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
         }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/ClientTest.java b/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
index a391e9e..b1dfa41 100644
--- a/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
@@ -53,6 +53,8 @@ import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.forward.TcpipServerChannel;
 import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoReadFuture;
 import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.io.mina.MinaSession;
@@ -71,6 +73,7 @@ import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
 import org.apache.sshd.server.session.ServerConnectionService;
 import org.apache.sshd.server.session.ServerSession;
 import org.apache.sshd.server.session.ServerUserAuthService;
+import org.apache.sshd.util.AsyncEchoShellFactory;
 import org.apache.sshd.util.BaseTest;
 import org.apache.sshd.util.BogusPasswordAuthenticator;
 import org.apache.sshd.util.BogusPublickeyAuthenticator;
@@ -83,7 +86,6 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -145,6 +147,11 @@ public class ClientTest extends BaseTest {
                                 }
                                 return super.open(recipient, rwsize, rmpsize, buffer);
                             }
+
+                            @Override
+                            public String toString() {
+                                return "ChannelSession" + "[id=" + id + ", recipient=" + recipient + "]";
+                            }
                         };
                     }
                 },
@@ -161,6 +168,92 @@ public class ClientTest extends BaseTest {
     }
 
     @Test
+    public void testAsyncClient() throws Exception {
+        sshd.getProperties().put(SshServer.WINDOW_SIZE, "1024");
+        sshd.setShellFactory(new AsyncEchoShellFactory());
+
+        SshClient client = SshClient.setUpDefaultClient();
+        client.getProperties().put(SshClient.WINDOW_SIZE, "1024");
+        client.start();
+        ClientSession session = client.connect("smx", "localhost", port).await().getSession();
+        session.addPasswordIdentity("smx");
+        session.auth().verify();
+        final ChannelShell channel = session.createShellChannel();
+        channel.setStreaming(ClientChannel.Streaming.Async);
+        channel.open().verify();
+
+
+        final byte[] message = "0123456789\n".getBytes();
+        final int nbMessages = 1000;
+
+        final ByteArrayOutputStream baosOut = new ByteArrayOutputStream();
+        final ByteArrayOutputStream baosErr = new ByteArrayOutputStream();
+        final AtomicInteger writes = new AtomicInteger(nbMessages);
+
+        channel.getAsyncIn().write(new Buffer(message))
+                .addListener(new SshFutureListener<IoWriteFuture>() {
+                    public void operationComplete(IoWriteFuture future) {
+                        try {
+                            if (future.isWritten()) {
+                                if (writes.decrementAndGet() > 0) {
+                                    channel.getAsyncIn().write(new Buffer(message)).addListener(this);
+                                } else {
+                                    channel.getAsyncIn().close(false);
+                                }
+                            } else {
+                                throw new SshException("Error writing", future.getException());
+                            }
+                        } catch (IOException e) {
+                            if (!channel.isClosing()) {
+                                e.printStackTrace();
+                                channel.close(true);
+                            }
+                        }
+                    }
+                });
+        channel.getAsyncOut().read(new Buffer())
+                .addListener(new SshFutureListener<IoReadFuture>() {
+                    public void operationComplete(IoReadFuture future) {
+                        try {
+                            future.verify();
+                            Buffer buffer = future.getBuffer();
+                            baosOut.write(buffer.array(), buffer.rpos(), buffer.available());
+                            buffer.rpos(buffer.rpos() + buffer.available());
+                            buffer.compact();
+                            channel.getAsyncOut().read(buffer).addListener(this);
+                        } catch (IOException e) {
+                            if (!channel.isClosing()) {
+                                e.printStackTrace();
+                                channel.close(true);
+                            }
+                        }
+                    }
+                });
+        channel.getAsyncErr().read(new Buffer())
+                .addListener(new SshFutureListener<IoReadFuture>() {
+                    public void operationComplete(IoReadFuture future) {
+                        try {
+                            future.verify();
+                            Buffer buffer = future.getBuffer();
+                            baosErr.write(buffer.array(), buffer.rpos(), buffer.available());
+                            buffer.rpos(buffer.rpos() + buffer.available());
+                            buffer.compact();
+                            channel.getAsyncErr().read(buffer).addListener(this);
+                        } catch (IOException e) {
+                            if (!channel.isClosing()) {
+                                e.printStackTrace();
+                                channel.close(true);
+                            }
+                        }
+                    }
+                });
+
+        channel.waitFor(ClientChannel.CLOSED, 0);
+
+        assertEquals(nbMessages * message.length, baosOut.size());
+    }
+
+    @Test
     public void testCommandDeadlock() throws Exception {
         SshClient client = SshClient.setUpDefaultClient();
         client.start();