You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2018/04/09 08:05:43 UTC

mina-sshd git commit: [SSHD-812] Make SftpSubsystem a bit more asynchronous

Repository: mina-sshd
Updated Branches:
  refs/heads/SSHD-812-async [created] e05aa13c7


[SSHD-812] Make SftpSubsystem a bit more asynchronous


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/e05aa13c
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/e05aa13c
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/e05aa13c

Branch: refs/heads/SSHD-812-async
Commit: e05aa13c781d135105700f5aba314c4788c1c613
Parents: 4651d23
Author: Guillaume Nodet <gn...@apache.org>
Authored: Mon Apr 9 10:05:32 2018 +0200
Committer: Guillaume Nodet <gn...@apache.org>
Committed: Mon Apr 9 10:05:32 2018 +0200

----------------------------------------------------------------------
 .../sshd/server/channel/ChannelSession.java     |   4 +-
 .../sftp/AbstractSftpSubsystemHelper.java       | 124 +++++++------
 .../server/subsystem/sftp/SftpSubsystem.java    | 186 +++++++++----------
 3 files changed, 167 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e05aa13c/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
index 806d6c7..bcc1e33 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
@@ -667,7 +667,9 @@ public class ChannelSession extends AbstractServerChannel {
         if (this.receiver == null) {
             // if the command hasn't installed any ChannelDataReceiver, install the default
             // and give the command an InputStream
-            if (command instanceof AsyncCommand) {
+            if (command instanceof ChannelDataReceiver) {
+                setDataReceiver((ChannelDataReceiver) command);
+            } else if (command instanceof AsyncCommand) {
                 AsyncDataReceiver recv = new AsyncDataReceiver(this);
                 setDataReceiver(recv);
                 ((AsyncCommand) command).setIoInputStream(recv.getIn());

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e05aa13c/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpSubsystemHelper.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpSubsystemHelper.java b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpSubsystemHelper.java
index 08213ee..027b8c2 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpSubsystemHelper.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpSubsystemHelper.java
@@ -312,7 +312,7 @@ public abstract class AbstractSftpSubsystemHelper
         }
 
         if ((proposed < low) || (proposed > hig)) {
-            sendStatus(BufferUtils.clear(buffer), id, failureOpcode, "Proposed version (" + proposed + ") not in supported range: " + available);
+            sendStatus(buffer, id, failureOpcode, "Proposed version (" + proposed + ") not in supported range: " + available);
             return null;
         }
 
@@ -379,11 +379,11 @@ public abstract class AbstractSftpSubsystemHelper
         try {
             handle = doOpen(id, path, pflags, access, attrs);
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_OPEN, path);
+            sendStatus(buffer, id, e, SftpConstants.SSH_FXP_OPEN, path);
             return;
         }
 
-        sendHandle(BufferUtils.clear(buffer), id, handle);
+        sendHandle(buffer, id, handle);
     }
 
     /**
@@ -402,11 +402,11 @@ public abstract class AbstractSftpSubsystemHelper
         try {
             doClose(id, handle);
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_CLOSE, handle);
+            sendStatus(buffer, id, e, SftpConstants.SSH_FXP_CLOSE, handle);
             return;
         }
 
-        sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "", "");
+        sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "", "");
     }
 
     protected abstract void doClose(int id, String handle) throws IOException;
@@ -428,6 +428,7 @@ public abstract class AbstractSftpSubsystemHelper
 
             buffer.clear();
             buffer.ensureCapacity(readLen + Long.SIZE /* the header */, IntUnaryOperator.identity());
+            buffer.putInt(0);
 
             buffer.putByte((byte) SftpConstants.SSH_FXP_DATA);
             buffer.putInt(id);
@@ -442,7 +443,7 @@ public abstract class AbstractSftpSubsystemHelper
             buffer.wpos(startPos + len);
             BufferUtils.updateLengthPlaceholder(buffer, lenPos, len);
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_READ, handle, offset, requestedLength);
+            sendStatus(buffer, id, e, SftpConstants.SSH_FXP_READ, handle, offset, requestedLength);
             return;
         }
 
@@ -458,11 +459,11 @@ public abstract class AbstractSftpSubsystemHelper
         try {
             doWrite(id, handle, offset, length, buffer.array(), buffer.rpos(), buffer.available());
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_WRITE, handle, offset, length);
+            sendStatus(buffer, id, e, SftpConstants.SSH_FXP_WRITE, handle, offset, length);
             return;
         }
 
-        sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+        sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
     }
 
     protected abstract void doWrite(int id, String handle, long offset, int length, byte[] data, int doff, int remaining) throws IOException;
@@ -479,11 +480,11 @@ public abstract class AbstractSftpSubsystemHelper
         try {
             attrs = doLStat(id, path, flags);
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_LSTAT, path, flags);
+            sendStatus(buffer, id, e, SftpConstants.SSH_FXP_LSTAT, path, flags);
             return;
         }
 
-        sendAttrs(BufferUtils.clear(buffer), id, attrs);
+        sendAttrs(buffer, id, attrs);
     }
 
     protected Map<String, Object> doLStat(int id, String path, int flags) throws IOException {
@@ -506,11 +507,11 @@ public abstract class AbstractSftpSubsystemHelper
         try {
             doSetStat(id, path, attrs);
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_SETSTAT, path);
+            sendStatus(buffer, id, e, SftpConstants.SSH_FXP_SETSTAT, path);
             return;
         }
 
-        sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+        sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
     }
 
     protected void doSetStat(int id, String path, Map<String, ?> attrs) throws IOException {
@@ -534,11 +535,11 @@ public abstract class AbstractSftpSubsystemHelper
         try {
             attrs = doFStat(id, handle, flags);
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_FSTAT, handle, flags);
+            sendStatus(buffer, id, e, SftpConstants.SSH_FXP_FSTAT, handle, flags);
             return;
         }
 
-        sendAttrs(BufferUtils.clear(buffer), id, attrs);
+        sendAttrs(buffer, id, attrs);
     }
 
     protected abstract Map<String, Object> doFStat(int id, String handle, int flags) throws IOException;
@@ -549,11 +550,11 @@ public abstract class AbstractSftpSubsystemHelper
         try {
             doFSetStat(id, handle, attrs);
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_FSETSTAT, handle, attrs);
+            sendStatus(buffer, id, e, SftpConstants.SSH_FXP_FSETSTAT, handle, attrs);
             return;
         }
 
-        sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+        sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
     }
 
     protected abstract void doFSetStat(int id, String handle, Map<String, ?> attrs) throws IOException;
@@ -573,11 +574,11 @@ public abstract class AbstractSftpSubsystemHelper
                 getPathResolutionLinkOption(SftpConstants.SSH_FXP_OPENDIR, "", p);
             handle = doOpenDir(id, path, p, options);
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_OPENDIR, path);
+            sendStatus(buffer, id, e, SftpConstants.SSH_FXP_OPENDIR, path);
             return;
         }
 
-        sendHandle(BufferUtils.clear(buffer), id, handle);
+        sendHandle(buffer, id, handle);
     }
 
     protected abstract String doOpenDir(int id, String path, Path p, LinkOption... options) throws IOException;
@@ -597,11 +598,11 @@ public abstract class AbstractSftpSubsystemHelper
 
             doLink(id, targetPath, linkPath, symLink);
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_LINK, targetPath, linkPath, symLink);
+            sendStatus(buffer, id, e, SftpConstants.SSH_FXP_LINK, targetPath, linkPath, symLink);
             return;
         }
 
-        sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+        sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
     }
 
     protected void doLink(int id, String targetPath, String linkPath, boolean symLink) throws IOException {
@@ -618,11 +619,11 @@ public abstract class AbstractSftpSubsystemHelper
             }
             doSymLink(id, targetPath, linkPath);
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_SYMLINK, targetPath, linkPath);
+            sendStatus(buffer, id, e, SftpConstants.SSH_FXP_SYMLINK, targetPath, linkPath);
             return;
         }
 
-        sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+        sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
     }
 
     protected void doSymLink(int id, String targetPath, String linkPath) throws IOException {
@@ -639,11 +640,11 @@ public abstract class AbstractSftpSubsystemHelper
         try {
             doOpenSSHHardLink(id, srcFile, dstFile);
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_EXTENDED, HardLinkExtensionParser.NAME, srcFile, dstFile);
+            sendStatus(buffer, id, e, SftpConstants.SSH_FXP_EXTENDED, HardLinkExtensionParser.NAME, srcFile, dstFile);
             return;
         }
 
-        sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+        sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
     }
 
     protected void doOpenSSHHardLink(int id, String srcFile, String dstFile) throws IOException {
@@ -661,11 +662,12 @@ public abstract class AbstractSftpSubsystemHelper
         try {
             info = doSpaceAvailable(id, path);
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_EXTENDED, SftpConstants.EXT_SPACE_AVAILABLE, path);
+            sendStatus(buffer, id, e, SftpConstants.SSH_FXP_EXTENDED, SftpConstants.EXT_SPACE_AVAILABLE, path);
             return;
         }
 
         buffer.clear();
+        buffer.putInt(0);
         buffer.putByte((byte) SftpConstants.SSH_FXP_EXTENDED_REPLY);
         buffer.putInt(id);
         SpaceAvailableExtensionInfo.encode(buffer, info);
@@ -694,11 +696,11 @@ public abstract class AbstractSftpSubsystemHelper
             // TODO : implement text-seek - see https://tools.ietf.org/html/draft-ietf-secsh-filexfer-03#section-6.3
             doTextSeek(id, handle, line);
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_EXTENDED, SftpConstants.EXT_TEXT_SEEK, handle, line);
+            sendStatus(buffer, id, e, SftpConstants.SSH_FXP_EXTENDED, SftpConstants.EXT_TEXT_SEEK, handle, line);
             return;
         }
 
-        sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+        sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
     }
 
     protected abstract void doTextSeek(int id, String handle, long line) throws IOException;
@@ -709,11 +711,11 @@ public abstract class AbstractSftpSubsystemHelper
         try {
             doOpenSSHFsync(id, handle);
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_EXTENDED, FsyncExtensionParser.NAME, handle);
+            sendStatus(buffer, id, e, SftpConstants.SSH_FXP_EXTENDED, FsyncExtensionParser.NAME, handle);
             return;
         }
 
-        sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+        sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
     }
 
     protected abstract void doOpenSSHFsync(int id, String handle) throws IOException;
@@ -727,12 +729,13 @@ public abstract class AbstractSftpSubsystemHelper
         int blockSize = buffer.getInt();
         try {
             buffer.clear();
+            buffer.putInt(0);
             buffer.putByte((byte) SftpConstants.SSH_FXP_EXTENDED_REPLY);
             buffer.putInt(id);
             buffer.putString(SftpConstants.EXT_CHECK_FILE);
             doCheckFileHash(id, targetType, target, Arrays.asList(algos), startOffset, length, blockSize, buffer);
         } catch (Exception e) {
-            sendStatus(BufferUtils.clear(buffer), id, e,
+            sendStatus(buffer, id, e,
                 SftpConstants.SSH_FXP_EXTENDED, targetType, target, algList, startOffset, length, blockSize);
             return;
         }
@@ -844,12 +847,13 @@ public abstract class AbstractSftpSubsystemHelper
             }
 
         } catch (Exception e) {
-            sendStatus(BufferUtils.clear(buffer), id, e,
+            sendStatus(buffer, id, e,
                 SftpConstants.SSH_FXP_EXTENDED, targetType, target, startOffset, length, quickCheckHash);
             return;
         }
 
         buffer.clear();
+        buffer.putInt(0);
         buffer.putByte((byte) SftpConstants.SSH_FXP_EXTENDED_REPLY);
         buffer.putInt(id);
         buffer.putString(targetType);
@@ -976,11 +980,11 @@ public abstract class AbstractSftpSubsystemHelper
             }
             l = doReadLink(id, path);
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_READLINK, path);
+            sendStatus(buffer, id, e, SftpConstants.SSH_FXP_READLINK, path);
             return;
         }
 
-        sendLink(BufferUtils.clear(buffer), id, l);
+        sendLink(buffer, id, l);
     }
 
     protected String doReadLink(int id, String path) throws IOException {
@@ -1004,11 +1008,11 @@ public abstract class AbstractSftpSubsystemHelper
         try {
             doRename(id, oldPath, newPath, flags);
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_RENAME, oldPath, newPath, flags);
+            sendStatus(buffer, id, e, SftpConstants.SSH_FXP_RENAME, oldPath, newPath, flags);
             return;
         }
 
-        sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+        sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
     }
 
     protected void doRename(int id, String oldPath, String newPath, int flags) throws IOException {
@@ -1057,13 +1061,13 @@ public abstract class AbstractSftpSubsystemHelper
         try {
             doCopyData(id, readHandle, readOffset, readLength, writeHandle, writeOffset);
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e,
+            sendStatus(buffer, id, e,
                 SftpConstants.SSH_FXP_EXTENDED, SftpConstants.EXT_COPY_DATA,
                 readHandle, readOffset, readLength, writeHandle, writeOffset);
             return;
         }
 
-        sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+        sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
     }
 
     protected abstract void doCopyData(int id, String readHandle, long readOffset, long readLength, String writeHandle, long writeOffset) throws IOException;
@@ -1077,12 +1081,12 @@ public abstract class AbstractSftpSubsystemHelper
         try {
             doCopyFile(id, srcFile, dstFile, overwriteDestination);
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e,
+            sendStatus(buffer, id, e,
                 SftpConstants.SSH_FXP_EXTENDED, SftpConstants.EXT_COPY_FILE, srcFile, dstFile, overwriteDestination);
             return;
         }
 
-        sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+        sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
     }
 
     protected void doCopyFile(int id, String srcFile, String dstFile, boolean overwriteDestination) throws IOException {
@@ -1113,11 +1117,11 @@ public abstract class AbstractSftpSubsystemHelper
         try {
             doBlock(id, handle, offset, length, mask);
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_BLOCK, handle, offset, length, mask);
+            sendStatus(buffer, id, e, SftpConstants.SSH_FXP_BLOCK, handle, offset, length, mask);
             return;
         }
 
-        sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+        sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
     }
 
     protected abstract void doBlock(int id, String handle, long offset, long length, int mask) throws IOException;
@@ -1129,11 +1133,11 @@ public abstract class AbstractSftpSubsystemHelper
         try {
             doUnblock(id, handle, offset, length);
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_UNBLOCK, handle, offset, length);
+            sendStatus(buffer, id, e, SftpConstants.SSH_FXP_UNBLOCK, handle, offset, length);
             return;
         }
 
-        sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+        sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
     }
 
     protected abstract void doUnblock(int id, String handle, long offset, long length) throws IOException;
@@ -1150,11 +1154,11 @@ public abstract class AbstractSftpSubsystemHelper
         try {
             attrs = doStat(id, path, flags);
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_STAT, path, flags);
+            sendStatus(buffer, id, e, SftpConstants.SSH_FXP_STAT, path, flags);
             return;
         }
 
-        sendAttrs(BufferUtils.clear(buffer), id, attrs);
+        sendAttrs(buffer, id, attrs);
     }
 
     protected Map<String, Object> doStat(int id, String path, int flags) throws IOException {
@@ -1267,11 +1271,11 @@ public abstract class AbstractSftpSubsystemHelper
                 }
             }
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_REALPATH, path);
+            sendStatus(buffer, id, e, SftpConstants.SSH_FXP_REALPATH, path);
             return;
         }
 
-        sendPath(BufferUtils.clear(buffer), id, result.getKey(), attrs);
+        sendPath(buffer, id, result.getKey(), attrs);
     }
 
     protected SimpleImmutableEntry<Path, Boolean> doRealPathV6(
@@ -1322,11 +1326,11 @@ public abstract class AbstractSftpSubsystemHelper
         try {
             doRemoveDirectory(id, path, IoUtils.getLinkOptions(false));
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_RMDIR, path);
+            sendStatus(buffer, id, e, SftpConstants.SSH_FXP_RMDIR, path);
             return;
         }
 
-        sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+        sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
     }
 
     protected void doRemoveDirectory(int id, String path, LinkOption... options) throws IOException {
@@ -1368,11 +1372,11 @@ public abstract class AbstractSftpSubsystemHelper
         try {
             doMakeDirectory(id, path, attrs, IoUtils.getLinkOptions(false));
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_MKDIR, path, attrs);
+            sendStatus(buffer, id, e, SftpConstants.SSH_FXP_MKDIR, path, attrs);
             return;
         }
 
-        sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+        sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
     }
 
     protected void doMakeDirectory(int id, String path, Map<String, ?> attrs, LinkOption... options) throws IOException {
@@ -1417,11 +1421,11 @@ public abstract class AbstractSftpSubsystemHelper
              */
             doRemove(id, path, IoUtils.getLinkOptions(false));
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_REMOVE, path);
+            sendStatus(buffer, id, e, SftpConstants.SSH_FXP_REMOVE, path);
             return;
         }
 
-        sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+        sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
     }
 
     protected void doRemove(int id, String path, LinkOption... options) throws IOException {
@@ -1774,6 +1778,8 @@ public abstract class AbstractSftpSubsystemHelper
     }
 
     protected void sendHandle(Buffer buffer, int id, String handle) throws IOException {
+        buffer.clear();
+        buffer.putInt(0);
         buffer.putByte((byte) SftpConstants.SSH_FXP_HANDLE);
         buffer.putInt(id);
         buffer.putString(handle);
@@ -1781,6 +1787,8 @@ public abstract class AbstractSftpSubsystemHelper
     }
 
     protected void sendAttrs(Buffer buffer, int id, Map<String, ?> attributes) throws IOException {
+        buffer.clear();
+        buffer.putInt(0);
         buffer.putByte((byte) SftpConstants.SSH_FXP_ATTRS);
         buffer.putInt(id);
         writeAttrs(buffer, attributes);
@@ -1791,6 +1799,9 @@ public abstract class AbstractSftpSubsystemHelper
         //in case we are running on Windows
         String unixPath = link.replace(File.separatorChar, '/');
 
+        buffer.clear();
+        buffer.putInt(0);
+
         buffer.putByte((byte) SftpConstants.SSH_FXP_NAME);
         buffer.putInt(id);
         buffer.putInt(1);   // one response
@@ -1814,6 +1825,9 @@ public abstract class AbstractSftpSubsystemHelper
     }
 
     protected void sendPath(Buffer buffer, int id, Path f, Map<String, ?> attrs) throws IOException {
+        buffer.clear();
+        buffer.putInt(0);
+
         buffer.putByte((byte) SftpConstants.SSH_FXP_NAME);
         buffer.putInt(id);
         buffer.putInt(1);   // one reply
@@ -2537,11 +2551,15 @@ public abstract class AbstractSftpSubsystemHelper
                       getServerSession(), id, SftpConstants.getStatusName(substatus), lang, msg);
         }
 
+        buffer.clear();
+        buffer.putInt(0);
+
         buffer.putByte((byte) SftpConstants.SSH_FXP_STATUS);
         buffer.putInt(id);
         buffer.putInt(substatus);
         buffer.putString(msg);
         buffer.putString(lang);
+
         send(buffer);
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e05aa13c/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
index 5cfbf01..39ebce2 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
@@ -40,7 +40,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.TreeMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.sshd.common.Factory;
@@ -48,6 +47,8 @@ import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.digest.BuiltinDigests;
 import org.apache.sshd.common.digest.DigestFactory;
 import org.apache.sshd.common.file.FileSystemAware;
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoOutputStream;
 import org.apache.sshd.common.random.Random;
 import org.apache.sshd.common.subsystem.sftp.SftpConstants;
 import org.apache.sshd.common.subsystem.sftp.SftpHelper;
@@ -61,10 +62,13 @@ import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.common.util.io.IoUtils;
 import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
 import org.apache.sshd.common.util.threads.ThreadUtils;
-import org.apache.sshd.server.Command;
+import org.apache.sshd.server.AsyncCommand;
+import org.apache.sshd.server.ChannelSessionAware;
 import org.apache.sshd.server.Environment;
 import org.apache.sshd.server.ExitCallback;
 import org.apache.sshd.server.SessionAware;
+import org.apache.sshd.server.channel.ChannelDataReceiver;
+import org.apache.sshd.server.channel.ChannelSession;
 import org.apache.sshd.server.session.ServerSession;
 
 /**
@@ -74,7 +78,7 @@ import org.apache.sshd.server.session.ServerSession;
  */
 public class SftpSubsystem
         extends AbstractSftpSubsystemHelper
-        implements Command, Runnable, SessionAware, FileSystemAware, ExecutorServiceCarrier {
+        implements AsyncCommand, ChannelDataReceiver, ChannelSessionAware, SessionAware, FileSystemAware, ExecutorServiceCarrier {
 
     /**
      * Properties key for the maximum of available open handles per session.
@@ -114,14 +118,13 @@ public class SftpSubsystem
     public static final int DEFAULT_MAX_READDIR_DATA_SIZE = 16 * 1024;
 
     protected ExitCallback callback;
-    protected InputStream in;
-    protected OutputStream out;
-    protected OutputStream err;
+    protected IoInputStream in;
+    protected IoOutputStream out;
+    protected IoOutputStream err;
     protected Environment env;
     protected Random randomizer;
     protected int fileHandleSize = DEFAULT_FILE_HANDLE_SIZE;
     protected int maxFileHandleRounds = DEFAULT_FILE_HANDLE_ROUNDS;
-    protected Future<?> pendingFuture;
     protected byte[] workBuf = new byte[Math.max(DEFAULT_FILE_HANDLE_SIZE, Integer.BYTES)];
     protected FileSystem fileSystem = FileSystems.getDefault();
     protected Path defaultDir = fileSystem.getPath(System.getProperty("user.dir"));
@@ -129,8 +132,10 @@ public class SftpSubsystem
     protected int version;
     protected final Map<String, byte[]> extensions = new TreeMap<>(Comparator.naturalOrder());
     protected final Map<String, Handle> handles = new HashMap<>();
+    private Buffer buffer = new ByteArrayBuffer(1024);
 
     private ServerSession serverSession;
+    private ChannelSession channelSession;
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private ExecutorService executorService;
     private boolean shutdownOnExit;
@@ -227,75 +232,86 @@ public class SftpSubsystem
 
     @Override
     public void setInputStream(InputStream in) {
-        this.in = in;
     }
 
     @Override
     public void setOutputStream(OutputStream out) {
-        this.out = out;
     }
 
     @Override
     public void setErrorStream(OutputStream err) {
+    }
+
+    @Override
+    public void setIoInputStream(IoInputStream in) {
+        this.in = in;
+    }
+
+    @Override
+    public void setIoOutputStream(IoOutputStream out) {
+        this.out = out;
+    }
+
+    @Override
+    public void setIoErrorStream(IoOutputStream err) {
         this.err = err;
     }
 
     @Override
     public void start(Environment env) throws IOException {
         this.env = env;
-        try {
-            ExecutorService executor = getExecutorService();
-            pendingFuture = executor.submit(this);
-        } catch (RuntimeException e) {    // e.g., RejectedExecutionException
-            log.error("Failed (" + e.getClass().getSimpleName() + ") to start command: " + e.toString(), e);
-            throw new IOException(e);
-        }
     }
 
     @Override
-    public void run() {
-        try {
-            for (long count = 1L;; count++) {
-                int length = BufferUtils.readInt(in, workBuf, 0, workBuf.length);
-                ValidateUtils.checkTrue(length >= (Integer.BYTES + 1 /* command */), "Bad length to read: %d", length);
-
-                Buffer buffer = new ByteArrayBuffer(length + Integer.BYTES + Long.SIZE /* a bit extra */, false);
-                buffer.putInt(length);
-                for (int remainLen = length; remainLen > 0;) {
-                    int l = in.read(buffer.array(), buffer.wpos(), remainLen);
-                    if (l < 0) {
-                        throw new IllegalArgumentException("Premature EOF at buffer #" + count + " while read length=" + length + " and remain=" + remainLen);
-                    }
-                    buffer.wpos(buffer.wpos() + l);
-                    remainLen -= l;
-                }
+    public void setChannelSession(ChannelSession session) {
+        this.channelSession = session;
+    }
 
-                process(buffer);
+    @Override
+    public int data(ChannelSession channel, byte[] buf, int start, int len) throws IOException {
+        buffer.putRawBytes(buf, start, len);
+        while (buffer.available() >= Integer.BYTES) {
+            int rpos = buffer.rpos();
+            int msglen = buffer.getInt();
+            if (buffer.available() >= msglen) {
+                Buffer b = new ByteArrayBuffer(msglen + Integer.BYTES + Long.SIZE /* a bit extra */, false);
+                b.putInt(msglen);
+                b.putRawBytes(buffer.array(), buffer.rpos(), msglen);
+                buffer.rpos(buffer.rpos() + msglen);
+                getExecutorService().submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            process(b);
+                            channelSession.getLocalWindow().consumeAndCheck(msglen + Integer.BYTES);
+                        } catch (IOException e) {
+                            serverSession.exceptionCaught(e);
+                        }
+                    }
+                });
+            } else {
+                buffer.rpos(rpos);
+                break;
             }
-        } catch (Throwable t) {
-            if ((!closed.get()) && (!(t instanceof EOFException))) { // Ignore
-                log.error("run({}) {} caught in SFTP subsystem: {}",
-                          getServerSession(), t.getClass().getSimpleName(), t.getMessage());
-                if (log.isDebugEnabled()) {
-                    log.debug("run(" + getServerSession() + ") caught exception details", t);
+        }
+        return 0;
+    }
+
+    @Override
+    public void close() throws IOException {
+        boolean debugEnabled = log.isDebugEnabled();
+        handles.forEach((id, handle) -> {
+            try {
+                handle.close();
+                if (debugEnabled) {
+                    log.debug("run({}) closed pending handle {} [{}]", getServerSession(), id, handle);
                 }
+            } catch (IOException ioe) {
+                log.error("run({}) failed ({}) to close handle={}[{}]: {}",
+                        getServerSession(), ioe.getClass().getSimpleName(), id, handle, ioe.getMessage());
             }
-        } finally {
-            boolean debugEnabled = log.isDebugEnabled();
-            handles.forEach((id, handle) -> {
-                try {
-                    handle.close();
-                    if (debugEnabled) {
-                        log.debug("run({}) closed pending handle {} [{}]", getServerSession(), id, handle);
-                    }
-                } catch (IOException ioe) {
-                    log.error("run({}) failed ({}) to close handle={}[{}]: {}",
-                          getServerSession(), ioe.getClass().getSimpleName(), id, handle, ioe.getMessage());
-                }
-            });
-
-            callback.onExit(0);
-        }
+        });
+        callback.onExit(0);
     }
 
     @Override
@@ -383,7 +399,7 @@ public class SftpSubsystem
                 String name = SftpConstants.getCommandMessageName(type);
                 log.warn("process({})[length={}, type={}, id={}] unknown command",
                          getServerSession(), length, name, id);
-                sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OP_UNSUPPORTED, "Command " + name + " is unsupported or not implemented");
+                sendStatus(buffer, id, SftpConstants.SSH_FX_OP_UNSUPPORTED, "Command " + name + " is unsupported or not implemented");
             }
         }
 
@@ -428,7 +444,7 @@ public class SftpSubsystem
                 if (log.isDebugEnabled()) {
                     log.debug("executeExtendedCommand({}) received unsupported SSH_FXP_EXTENDED({})", getServerSession(), extension);
                 }
-                sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OP_UNSUPPORTED, "Command SSH_FXP_EXTENDED(" + extension + ") is unsupported or not implemented");
+                sendStatus(buffer, id, SftpConstants.SSH_FX_OP_UNSUPPORTED, "Command SSH_FXP_EXTENDED(" + extension + ") is unsupported or not implemented");
                 break;
         }
     }
@@ -602,7 +618,7 @@ public class SftpSubsystem
          * channel.
          */
         if (requestsCount > 0L) {
-            sendStatus(BufferUtils.clear(buffer), id,
+            sendStatus(buffer, id,
                        SftpConstants.SSH_FX_FAILURE,
                        "Version selection not the 1st request for proposal = " + proposed);
             session.close(true);
@@ -619,9 +635,9 @@ public class SftpSubsystem
         }
         if (result) {
             version = Integer.parseInt(proposed);
-            sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+            sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
         } else {
-            sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_FAILURE, "Unsupported version " + proposed);
+            sendStatus(buffer, id, SftpConstants.SSH_FX_FAILURE, "Unsupported version " + proposed);
             session.close(true);
         }
     }
@@ -752,11 +768,10 @@ public class SftpSubsystem
                       getServerSession(), id, handle, h);
         }
 
-        Buffer reply = null;
         try {
             DirectoryHandle dh = validateHandle(handle, h, DirectoryHandle.class);
             if (dh.isDone()) {
-                sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_EOF, "Directory reading is done");
+                sendStatus(buffer, id, SftpConstants.SSH_FX_EOF, "Directory reading is done");
                 return;
             }
 
@@ -781,40 +796,37 @@ public class SftpSubsystem
                 // Send only a few files at a time to not create packets of a too
                 // large size or have a timeout to occur.
 
-                reply = BufferUtils.clear(buffer);
-                reply.putByte((byte) SftpConstants.SSH_FXP_NAME);
-                reply.putInt(id);
+                buffer.clear();
+                buffer.putInt(0);
+
+                buffer.putByte((byte) SftpConstants.SSH_FXP_NAME);
+                buffer.putInt(id);
 
-                int lenPos = reply.wpos();
-                reply.putInt(0);
+                int lenPos = buffer.wpos();
+                buffer.putInt(0);
 
                 ServerSession session = getServerSession();
                 int maxDataSize = session.getIntProperty(MAX_READDIR_DATA_SIZE_PROP, DEFAULT_MAX_READDIR_DATA_SIZE);
-                int count = doReadDir(id, handle, dh, reply, maxDataSize, IoUtils.getLinkOptions(false));
-                BufferUtils.updateLengthPlaceholder(reply, lenPos, count);
+                int count = doReadDir(id, handle, dh, buffer, maxDataSize, IoUtils.getLinkOptions(false));
+                BufferUtils.updateLengthPlaceholder(buffer, lenPos, count);
                 if ((!dh.isSendDot()) && (!dh.isSendDotDot()) && (!dh.hasNext())) {
                     dh.markDone();
                 }
 
                 Boolean indicator =
-                    SftpHelper.indicateEndOfNamesList(reply, getVersion(), session, dh.isDone());
+                    SftpHelper.indicateEndOfNamesList(buffer, getVersion(), session, dh.isDone());
                 if (debugEnabled) {
                     log.debug("doReadDir({})({})[{}] - seding {} entries - eol={}", session, handle, h, count, indicator);
                 }
+                send(buffer);
             } else {
                 // empty directory
                 dh.markDone();
-                sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_EOF, "Empty directory");
-                return;
+                sendStatus(buffer, id, SftpConstants.SSH_FX_EOF, "Empty directory");
             }
-
-            Objects.requireNonNull(reply, "No reply buffer created");
         } catch (IOException | RuntimeException e) {
-            sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_READDIR, handle);
-            return;
+            sendStatus(buffer, id, e, SftpConstants.SSH_FXP_READDIR, handle);
         }
-
-        send(reply);
     }
 
     @Override
@@ -990,6 +1002,7 @@ public class SftpSubsystem
         }
 
         buffer.clear();
+        buffer.putInt(0);
 
         buffer.putByte((byte) SftpConstants.SSH_FXP_VERSION);
         buffer.putInt(version);
@@ -1003,10 +1016,8 @@ public class SftpSubsystem
 
     @Override
     protected void send(Buffer buffer) throws IOException {
-        int len = buffer.available();
-        BufferUtils.writeInt(out, len, workBuf, 0, workBuf.length);
-        out.write(buffer.array(), buffer.rpos(), len);
-        out.flush();
+        BufferUtils.updateLengthPlaceholder(buffer, 0);
+        out.writePacket(buffer);
     }
 
     @Override
@@ -1032,17 +1043,6 @@ public class SftpSubsystem
             }
         }
 
-        // if thread has not completed, cancel it
-        if ((pendingFuture != null) && (!pendingFuture.isDone())) {
-            boolean result = pendingFuture.cancel(true);
-            // TODO consider waiting some reasonable (?) amount of time for cancellation
-            if (debugEnabled) {
-                log.debug("destroy(" + session + ") - cancel pending future=" + result);
-            }
-        }
-
-        pendingFuture = null;
-
         ExecutorService executors = getExecutorService();
         if ((executors != null) && (!executors.isShutdown()) && isShutdownOnExit()) {
             Collection<Runnable> runners = executors.shutdownNow();