You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2018/04/09 08:05:43 UTC
mina-sshd git commit: [SSHD-812] Make SftpSubsystem a bit more
asynchronous
Repository: mina-sshd
Updated Branches:
refs/heads/SSHD-812-async [created] e05aa13c7
[SSHD-812] Make SftpSubsystem a bit more asynchronous
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/e05aa13c
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/e05aa13c
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/e05aa13c
Branch: refs/heads/SSHD-812-async
Commit: e05aa13c781d135105700f5aba314c4788c1c613
Parents: 4651d23
Author: Guillaume Nodet <gn...@apache.org>
Authored: Mon Apr 9 10:05:32 2018 +0200
Committer: Guillaume Nodet <gn...@apache.org>
Committed: Mon Apr 9 10:05:32 2018 +0200
----------------------------------------------------------------------
.../sshd/server/channel/ChannelSession.java | 4 +-
.../sftp/AbstractSftpSubsystemHelper.java | 124 +++++++------
.../server/subsystem/sftp/SftpSubsystem.java | 186 +++++++++----------
3 files changed, 167 insertions(+), 147 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e05aa13c/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
index 806d6c7..bcc1e33 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
@@ -667,7 +667,9 @@ public class ChannelSession extends AbstractServerChannel {
if (this.receiver == null) {
// if the command hasn't installed any ChannelDataReceiver, install the default
// and give the command an InputStream
- if (command instanceof AsyncCommand) {
+ if (command instanceof ChannelDataReceiver) {
+ setDataReceiver((ChannelDataReceiver) command);
+ } else if (command instanceof AsyncCommand) {
AsyncDataReceiver recv = new AsyncDataReceiver(this);
setDataReceiver(recv);
((AsyncCommand) command).setIoInputStream(recv.getIn());
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e05aa13c/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpSubsystemHelper.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpSubsystemHelper.java b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpSubsystemHelper.java
index 08213ee..027b8c2 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpSubsystemHelper.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpSubsystemHelper.java
@@ -312,7 +312,7 @@ public abstract class AbstractSftpSubsystemHelper
}
if ((proposed < low) || (proposed > hig)) {
- sendStatus(BufferUtils.clear(buffer), id, failureOpcode, "Proposed version (" + proposed + ") not in supported range: " + available);
+ sendStatus(buffer, id, failureOpcode, "Proposed version (" + proposed + ") not in supported range: " + available);
return null;
}
@@ -379,11 +379,11 @@ public abstract class AbstractSftpSubsystemHelper
try {
handle = doOpen(id, path, pflags, access, attrs);
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_OPEN, path);
+ sendStatus(buffer, id, e, SftpConstants.SSH_FXP_OPEN, path);
return;
}
- sendHandle(BufferUtils.clear(buffer), id, handle);
+ sendHandle(buffer, id, handle);
}
/**
@@ -402,11 +402,11 @@ public abstract class AbstractSftpSubsystemHelper
try {
doClose(id, handle);
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_CLOSE, handle);
+ sendStatus(buffer, id, e, SftpConstants.SSH_FXP_CLOSE, handle);
return;
}
- sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "", "");
+ sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "", "");
}
protected abstract void doClose(int id, String handle) throws IOException;
@@ -428,6 +428,7 @@ public abstract class AbstractSftpSubsystemHelper
buffer.clear();
buffer.ensureCapacity(readLen + Long.SIZE /* the header */, IntUnaryOperator.identity());
+ buffer.putInt(0);
buffer.putByte((byte) SftpConstants.SSH_FXP_DATA);
buffer.putInt(id);
@@ -442,7 +443,7 @@ public abstract class AbstractSftpSubsystemHelper
buffer.wpos(startPos + len);
BufferUtils.updateLengthPlaceholder(buffer, lenPos, len);
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_READ, handle, offset, requestedLength);
+ sendStatus(buffer, id, e, SftpConstants.SSH_FXP_READ, handle, offset, requestedLength);
return;
}
@@ -458,11 +459,11 @@ public abstract class AbstractSftpSubsystemHelper
try {
doWrite(id, handle, offset, length, buffer.array(), buffer.rpos(), buffer.available());
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_WRITE, handle, offset, length);
+ sendStatus(buffer, id, e, SftpConstants.SSH_FXP_WRITE, handle, offset, length);
return;
}
- sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+ sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
}
protected abstract void doWrite(int id, String handle, long offset, int length, byte[] data, int doff, int remaining) throws IOException;
@@ -479,11 +480,11 @@ public abstract class AbstractSftpSubsystemHelper
try {
attrs = doLStat(id, path, flags);
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_LSTAT, path, flags);
+ sendStatus(buffer, id, e, SftpConstants.SSH_FXP_LSTAT, path, flags);
return;
}
- sendAttrs(BufferUtils.clear(buffer), id, attrs);
+ sendAttrs(buffer, id, attrs);
}
protected Map<String, Object> doLStat(int id, String path, int flags) throws IOException {
@@ -506,11 +507,11 @@ public abstract class AbstractSftpSubsystemHelper
try {
doSetStat(id, path, attrs);
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_SETSTAT, path);
+ sendStatus(buffer, id, e, SftpConstants.SSH_FXP_SETSTAT, path);
return;
}
- sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+ sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
}
protected void doSetStat(int id, String path, Map<String, ?> attrs) throws IOException {
@@ -534,11 +535,11 @@ public abstract class AbstractSftpSubsystemHelper
try {
attrs = doFStat(id, handle, flags);
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_FSTAT, handle, flags);
+ sendStatus(buffer, id, e, SftpConstants.SSH_FXP_FSTAT, handle, flags);
return;
}
- sendAttrs(BufferUtils.clear(buffer), id, attrs);
+ sendAttrs(buffer, id, attrs);
}
protected abstract Map<String, Object> doFStat(int id, String handle, int flags) throws IOException;
@@ -549,11 +550,11 @@ public abstract class AbstractSftpSubsystemHelper
try {
doFSetStat(id, handle, attrs);
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_FSETSTAT, handle, attrs);
+ sendStatus(buffer, id, e, SftpConstants.SSH_FXP_FSETSTAT, handle, attrs);
return;
}
- sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+ sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
}
protected abstract void doFSetStat(int id, String handle, Map<String, ?> attrs) throws IOException;
@@ -573,11 +574,11 @@ public abstract class AbstractSftpSubsystemHelper
getPathResolutionLinkOption(SftpConstants.SSH_FXP_OPENDIR, "", p);
handle = doOpenDir(id, path, p, options);
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_OPENDIR, path);
+ sendStatus(buffer, id, e, SftpConstants.SSH_FXP_OPENDIR, path);
return;
}
- sendHandle(BufferUtils.clear(buffer), id, handle);
+ sendHandle(buffer, id, handle);
}
protected abstract String doOpenDir(int id, String path, Path p, LinkOption... options) throws IOException;
@@ -597,11 +598,11 @@ public abstract class AbstractSftpSubsystemHelper
doLink(id, targetPath, linkPath, symLink);
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_LINK, targetPath, linkPath, symLink);
+ sendStatus(buffer, id, e, SftpConstants.SSH_FXP_LINK, targetPath, linkPath, symLink);
return;
}
- sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+ sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
}
protected void doLink(int id, String targetPath, String linkPath, boolean symLink) throws IOException {
@@ -618,11 +619,11 @@ public abstract class AbstractSftpSubsystemHelper
}
doSymLink(id, targetPath, linkPath);
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_SYMLINK, targetPath, linkPath);
+ sendStatus(buffer, id, e, SftpConstants.SSH_FXP_SYMLINK, targetPath, linkPath);
return;
}
- sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+ sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
}
protected void doSymLink(int id, String targetPath, String linkPath) throws IOException {
@@ -639,11 +640,11 @@ public abstract class AbstractSftpSubsystemHelper
try {
doOpenSSHHardLink(id, srcFile, dstFile);
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_EXTENDED, HardLinkExtensionParser.NAME, srcFile, dstFile);
+ sendStatus(buffer, id, e, SftpConstants.SSH_FXP_EXTENDED, HardLinkExtensionParser.NAME, srcFile, dstFile);
return;
}
- sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+ sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
}
protected void doOpenSSHHardLink(int id, String srcFile, String dstFile) throws IOException {
@@ -661,11 +662,12 @@ public abstract class AbstractSftpSubsystemHelper
try {
info = doSpaceAvailable(id, path);
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_EXTENDED, SftpConstants.EXT_SPACE_AVAILABLE, path);
+ sendStatus(buffer, id, e, SftpConstants.SSH_FXP_EXTENDED, SftpConstants.EXT_SPACE_AVAILABLE, path);
return;
}
buffer.clear();
+ buffer.putInt(0);
buffer.putByte((byte) SftpConstants.SSH_FXP_EXTENDED_REPLY);
buffer.putInt(id);
SpaceAvailableExtensionInfo.encode(buffer, info);
@@ -694,11 +696,11 @@ public abstract class AbstractSftpSubsystemHelper
// TODO : implement text-seek - see https://tools.ietf.org/html/draft-ietf-secsh-filexfer-03#section-6.3
doTextSeek(id, handle, line);
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_EXTENDED, SftpConstants.EXT_TEXT_SEEK, handle, line);
+ sendStatus(buffer, id, e, SftpConstants.SSH_FXP_EXTENDED, SftpConstants.EXT_TEXT_SEEK, handle, line);
return;
}
- sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+ sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
}
protected abstract void doTextSeek(int id, String handle, long line) throws IOException;
@@ -709,11 +711,11 @@ public abstract class AbstractSftpSubsystemHelper
try {
doOpenSSHFsync(id, handle);
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_EXTENDED, FsyncExtensionParser.NAME, handle);
+ sendStatus(buffer, id, e, SftpConstants.SSH_FXP_EXTENDED, FsyncExtensionParser.NAME, handle);
return;
}
- sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+ sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
}
protected abstract void doOpenSSHFsync(int id, String handle) throws IOException;
@@ -727,12 +729,13 @@ public abstract class AbstractSftpSubsystemHelper
int blockSize = buffer.getInt();
try {
buffer.clear();
+ buffer.putInt(0);
buffer.putByte((byte) SftpConstants.SSH_FXP_EXTENDED_REPLY);
buffer.putInt(id);
buffer.putString(SftpConstants.EXT_CHECK_FILE);
doCheckFileHash(id, targetType, target, Arrays.asList(algos), startOffset, length, blockSize, buffer);
} catch (Exception e) {
- sendStatus(BufferUtils.clear(buffer), id, e,
+ sendStatus(buffer, id, e,
SftpConstants.SSH_FXP_EXTENDED, targetType, target, algList, startOffset, length, blockSize);
return;
}
@@ -844,12 +847,13 @@ public abstract class AbstractSftpSubsystemHelper
}
} catch (Exception e) {
- sendStatus(BufferUtils.clear(buffer), id, e,
+ sendStatus(buffer, id, e,
SftpConstants.SSH_FXP_EXTENDED, targetType, target, startOffset, length, quickCheckHash);
return;
}
buffer.clear();
+ buffer.putInt(0);
buffer.putByte((byte) SftpConstants.SSH_FXP_EXTENDED_REPLY);
buffer.putInt(id);
buffer.putString(targetType);
@@ -976,11 +980,11 @@ public abstract class AbstractSftpSubsystemHelper
}
l = doReadLink(id, path);
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_READLINK, path);
+ sendStatus(buffer, id, e, SftpConstants.SSH_FXP_READLINK, path);
return;
}
- sendLink(BufferUtils.clear(buffer), id, l);
+ sendLink(buffer, id, l);
}
protected String doReadLink(int id, String path) throws IOException {
@@ -1004,11 +1008,11 @@ public abstract class AbstractSftpSubsystemHelper
try {
doRename(id, oldPath, newPath, flags);
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_RENAME, oldPath, newPath, flags);
+ sendStatus(buffer, id, e, SftpConstants.SSH_FXP_RENAME, oldPath, newPath, flags);
return;
}
- sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+ sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
}
protected void doRename(int id, String oldPath, String newPath, int flags) throws IOException {
@@ -1057,13 +1061,13 @@ public abstract class AbstractSftpSubsystemHelper
try {
doCopyData(id, readHandle, readOffset, readLength, writeHandle, writeOffset);
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e,
+ sendStatus(buffer, id, e,
SftpConstants.SSH_FXP_EXTENDED, SftpConstants.EXT_COPY_DATA,
readHandle, readOffset, readLength, writeHandle, writeOffset);
return;
}
- sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+ sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
}
protected abstract void doCopyData(int id, String readHandle, long readOffset, long readLength, String writeHandle, long writeOffset) throws IOException;
@@ -1077,12 +1081,12 @@ public abstract class AbstractSftpSubsystemHelper
try {
doCopyFile(id, srcFile, dstFile, overwriteDestination);
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e,
+ sendStatus(buffer, id, e,
SftpConstants.SSH_FXP_EXTENDED, SftpConstants.EXT_COPY_FILE, srcFile, dstFile, overwriteDestination);
return;
}
- sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+ sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
}
protected void doCopyFile(int id, String srcFile, String dstFile, boolean overwriteDestination) throws IOException {
@@ -1113,11 +1117,11 @@ public abstract class AbstractSftpSubsystemHelper
try {
doBlock(id, handle, offset, length, mask);
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_BLOCK, handle, offset, length, mask);
+ sendStatus(buffer, id, e, SftpConstants.SSH_FXP_BLOCK, handle, offset, length, mask);
return;
}
- sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+ sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
}
protected abstract void doBlock(int id, String handle, long offset, long length, int mask) throws IOException;
@@ -1129,11 +1133,11 @@ public abstract class AbstractSftpSubsystemHelper
try {
doUnblock(id, handle, offset, length);
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_UNBLOCK, handle, offset, length);
+ sendStatus(buffer, id, e, SftpConstants.SSH_FXP_UNBLOCK, handle, offset, length);
return;
}
- sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+ sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
}
protected abstract void doUnblock(int id, String handle, long offset, long length) throws IOException;
@@ -1150,11 +1154,11 @@ public abstract class AbstractSftpSubsystemHelper
try {
attrs = doStat(id, path, flags);
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_STAT, path, flags);
+ sendStatus(buffer, id, e, SftpConstants.SSH_FXP_STAT, path, flags);
return;
}
- sendAttrs(BufferUtils.clear(buffer), id, attrs);
+ sendAttrs(buffer, id, attrs);
}
protected Map<String, Object> doStat(int id, String path, int flags) throws IOException {
@@ -1267,11 +1271,11 @@ public abstract class AbstractSftpSubsystemHelper
}
}
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_REALPATH, path);
+ sendStatus(buffer, id, e, SftpConstants.SSH_FXP_REALPATH, path);
return;
}
- sendPath(BufferUtils.clear(buffer), id, result.getKey(), attrs);
+ sendPath(buffer, id, result.getKey(), attrs);
}
protected SimpleImmutableEntry<Path, Boolean> doRealPathV6(
@@ -1322,11 +1326,11 @@ public abstract class AbstractSftpSubsystemHelper
try {
doRemoveDirectory(id, path, IoUtils.getLinkOptions(false));
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_RMDIR, path);
+ sendStatus(buffer, id, e, SftpConstants.SSH_FXP_RMDIR, path);
return;
}
- sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+ sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
}
protected void doRemoveDirectory(int id, String path, LinkOption... options) throws IOException {
@@ -1368,11 +1372,11 @@ public abstract class AbstractSftpSubsystemHelper
try {
doMakeDirectory(id, path, attrs, IoUtils.getLinkOptions(false));
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_MKDIR, path, attrs);
+ sendStatus(buffer, id, e, SftpConstants.SSH_FXP_MKDIR, path, attrs);
return;
}
- sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+ sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
}
protected void doMakeDirectory(int id, String path, Map<String, ?> attrs, LinkOption... options) throws IOException {
@@ -1417,11 +1421,11 @@ public abstract class AbstractSftpSubsystemHelper
*/
doRemove(id, path, IoUtils.getLinkOptions(false));
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_REMOVE, path);
+ sendStatus(buffer, id, e, SftpConstants.SSH_FXP_REMOVE, path);
return;
}
- sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+ sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
}
protected void doRemove(int id, String path, LinkOption... options) throws IOException {
@@ -1774,6 +1778,8 @@ public abstract class AbstractSftpSubsystemHelper
}
protected void sendHandle(Buffer buffer, int id, String handle) throws IOException {
+ buffer.clear();
+ buffer.putInt(0);
buffer.putByte((byte) SftpConstants.SSH_FXP_HANDLE);
buffer.putInt(id);
buffer.putString(handle);
@@ -1781,6 +1787,8 @@ public abstract class AbstractSftpSubsystemHelper
}
protected void sendAttrs(Buffer buffer, int id, Map<String, ?> attributes) throws IOException {
+ buffer.clear();
+ buffer.putInt(0);
buffer.putByte((byte) SftpConstants.SSH_FXP_ATTRS);
buffer.putInt(id);
writeAttrs(buffer, attributes);
@@ -1791,6 +1799,9 @@ public abstract class AbstractSftpSubsystemHelper
//in case we are running on Windows
String unixPath = link.replace(File.separatorChar, '/');
+ buffer.clear();
+ buffer.putInt(0);
+
buffer.putByte((byte) SftpConstants.SSH_FXP_NAME);
buffer.putInt(id);
buffer.putInt(1); // one response
@@ -1814,6 +1825,9 @@ public abstract class AbstractSftpSubsystemHelper
}
protected void sendPath(Buffer buffer, int id, Path f, Map<String, ?> attrs) throws IOException {
+ buffer.clear();
+ buffer.putInt(0);
+
buffer.putByte((byte) SftpConstants.SSH_FXP_NAME);
buffer.putInt(id);
buffer.putInt(1); // one reply
@@ -2537,11 +2551,15 @@ public abstract class AbstractSftpSubsystemHelper
getServerSession(), id, SftpConstants.getStatusName(substatus), lang, msg);
}
+ buffer.clear();
+ buffer.putInt(0);
+
buffer.putByte((byte) SftpConstants.SSH_FXP_STATUS);
buffer.putInt(id);
buffer.putInt(substatus);
buffer.putString(msg);
buffer.putString(lang);
+
send(buffer);
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e05aa13c/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
index 5cfbf01..39ebce2 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
@@ -40,7 +40,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.sshd.common.Factory;
@@ -48,6 +47,8 @@ import org.apache.sshd.common.FactoryManager;
import org.apache.sshd.common.digest.BuiltinDigests;
import org.apache.sshd.common.digest.DigestFactory;
import org.apache.sshd.common.file.FileSystemAware;
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoOutputStream;
import org.apache.sshd.common.random.Random;
import org.apache.sshd.common.subsystem.sftp.SftpConstants;
import org.apache.sshd.common.subsystem.sftp.SftpHelper;
@@ -61,10 +62,13 @@ import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
import org.apache.sshd.common.util.io.IoUtils;
import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
import org.apache.sshd.common.util.threads.ThreadUtils;
-import org.apache.sshd.server.Command;
+import org.apache.sshd.server.AsyncCommand;
+import org.apache.sshd.server.ChannelSessionAware;
import org.apache.sshd.server.Environment;
import org.apache.sshd.server.ExitCallback;
import org.apache.sshd.server.SessionAware;
+import org.apache.sshd.server.channel.ChannelDataReceiver;
+import org.apache.sshd.server.channel.ChannelSession;
import org.apache.sshd.server.session.ServerSession;
/**
@@ -74,7 +78,7 @@ import org.apache.sshd.server.session.ServerSession;
*/
public class SftpSubsystem
extends AbstractSftpSubsystemHelper
- implements Command, Runnable, SessionAware, FileSystemAware, ExecutorServiceCarrier {
+ implements AsyncCommand, ChannelDataReceiver, ChannelSessionAware, SessionAware, FileSystemAware, ExecutorServiceCarrier {
/**
* Properties key for the maximum of available open handles per session.
@@ -114,14 +118,13 @@ public class SftpSubsystem
public static final int DEFAULT_MAX_READDIR_DATA_SIZE = 16 * 1024;
protected ExitCallback callback;
- protected InputStream in;
- protected OutputStream out;
- protected OutputStream err;
+ protected IoInputStream in;
+ protected IoOutputStream out;
+ protected IoOutputStream err;
protected Environment env;
protected Random randomizer;
protected int fileHandleSize = DEFAULT_FILE_HANDLE_SIZE;
protected int maxFileHandleRounds = DEFAULT_FILE_HANDLE_ROUNDS;
- protected Future<?> pendingFuture;
protected byte[] workBuf = new byte[Math.max(DEFAULT_FILE_HANDLE_SIZE, Integer.BYTES)];
protected FileSystem fileSystem = FileSystems.getDefault();
protected Path defaultDir = fileSystem.getPath(System.getProperty("user.dir"));
@@ -129,8 +132,10 @@ public class SftpSubsystem
protected int version;
protected final Map<String, byte[]> extensions = new TreeMap<>(Comparator.naturalOrder());
protected final Map<String, Handle> handles = new HashMap<>();
+ private Buffer buffer = new ByteArrayBuffer(1024);
private ServerSession serverSession;
+ private ChannelSession channelSession;
private final AtomicBoolean closed = new AtomicBoolean(false);
private ExecutorService executorService;
private boolean shutdownOnExit;
@@ -227,75 +232,86 @@ public class SftpSubsystem
@Override
public void setInputStream(InputStream in) {
- this.in = in;
}
@Override
public void setOutputStream(OutputStream out) {
- this.out = out;
}
@Override
public void setErrorStream(OutputStream err) {
+ }
+
+ @Override
+ public void setIoInputStream(IoInputStream in) {
+ this.in = in;
+ }
+
+ @Override
+ public void setIoOutputStream(IoOutputStream out) {
+ this.out = out;
+ }
+
+ @Override
+ public void setIoErrorStream(IoOutputStream err) {
this.err = err;
}
@Override
public void start(Environment env) throws IOException {
this.env = env;
- try {
- ExecutorService executor = getExecutorService();
- pendingFuture = executor.submit(this);
- } catch (RuntimeException e) { // e.g., RejectedExecutionException
- log.error("Failed (" + e.getClass().getSimpleName() + ") to start command: " + e.toString(), e);
- throw new IOException(e);
- }
}
@Override
- public void run() {
- try {
- for (long count = 1L;; count++) {
- int length = BufferUtils.readInt(in, workBuf, 0, workBuf.length);
- ValidateUtils.checkTrue(length >= (Integer.BYTES + 1 /* command */), "Bad length to read: %d", length);
-
- Buffer buffer = new ByteArrayBuffer(length + Integer.BYTES + Long.SIZE /* a bit extra */, false);
- buffer.putInt(length);
- for (int remainLen = length; remainLen > 0;) {
- int l = in.read(buffer.array(), buffer.wpos(), remainLen);
- if (l < 0) {
- throw new IllegalArgumentException("Premature EOF at buffer #" + count + " while read length=" + length + " and remain=" + remainLen);
- }
- buffer.wpos(buffer.wpos() + l);
- remainLen -= l;
- }
+ public void setChannelSession(ChannelSession session) {
+ this.channelSession = session;
+ }
- process(buffer);
+ @Override
+ public int data(ChannelSession channel, byte[] buf, int start, int len) throws IOException {
+ buffer.putRawBytes(buf, start, len);
+ while (buffer.available() >= Integer.BYTES) {
+ int rpos = buffer.rpos();
+ int msglen = buffer.getInt();
+ if (buffer.available() >= msglen) {
+ Buffer b = new ByteArrayBuffer(msglen + Integer.BYTES + Long.SIZE /* a bit extra */, false);
+ b.putInt(msglen);
+ b.putRawBytes(buffer.array(), buffer.rpos(), msglen);
+ buffer.rpos(buffer.rpos() + msglen);
+ getExecutorService().submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ process(b);
+ channelSession.getLocalWindow().consumeAndCheck(msglen + Integer.BYTES);
+ } catch (IOException e) {
+ serverSession.exceptionCaught(e);
+ }
+ }
+ });
+ } else {
+ buffer.rpos(rpos);
+ break;
}
- } catch (Throwable t) {
- if ((!closed.get()) && (!(t instanceof EOFException))) { // Ignore
- log.error("run({}) {} caught in SFTP subsystem: {}",
- getServerSession(), t.getClass().getSimpleName(), t.getMessage());
- if (log.isDebugEnabled()) {
- log.debug("run(" + getServerSession() + ") caught exception details", t);
+ }
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ boolean debugEnabled = log.isDebugEnabled();
+ handles.forEach((id, handle) -> {
+ try {
+ handle.close();
+ if (debugEnabled) {
+ log.debug("run({}) closed pending handle {} [{}]", getServerSession(), id, handle);
}
+ } catch (IOException ioe) {
+ log.error("run({}) failed ({}) to close handle={}[{}]: {}",
+ getServerSession(), ioe.getClass().getSimpleName(), id, handle, ioe.getMessage());
}
- } finally {
- boolean debugEnabled = log.isDebugEnabled();
- handles.forEach((id, handle) -> {
- try {
- handle.close();
- if (debugEnabled) {
- log.debug("run({}) closed pending handle {} [{}]", getServerSession(), id, handle);
- }
- } catch (IOException ioe) {
- log.error("run({}) failed ({}) to close handle={}[{}]: {}",
- getServerSession(), ioe.getClass().getSimpleName(), id, handle, ioe.getMessage());
- }
- });
-
- callback.onExit(0);
- }
+ });
+ callback.onExit(0);
}
@Override
@@ -383,7 +399,7 @@ public class SftpSubsystem
String name = SftpConstants.getCommandMessageName(type);
log.warn("process({})[length={}, type={}, id={}] unknown command",
getServerSession(), length, name, id);
- sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OP_UNSUPPORTED, "Command " + name + " is unsupported or not implemented");
+ sendStatus(buffer, id, SftpConstants.SSH_FX_OP_UNSUPPORTED, "Command " + name + " is unsupported or not implemented");
}
}
@@ -428,7 +444,7 @@ public class SftpSubsystem
if (log.isDebugEnabled()) {
log.debug("executeExtendedCommand({}) received unsupported SSH_FXP_EXTENDED({})", getServerSession(), extension);
}
- sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OP_UNSUPPORTED, "Command SSH_FXP_EXTENDED(" + extension + ") is unsupported or not implemented");
+ sendStatus(buffer, id, SftpConstants.SSH_FX_OP_UNSUPPORTED, "Command SSH_FXP_EXTENDED(" + extension + ") is unsupported or not implemented");
break;
}
}
@@ -602,7 +618,7 @@ public class SftpSubsystem
* channel.
*/
if (requestsCount > 0L) {
- sendStatus(BufferUtils.clear(buffer), id,
+ sendStatus(buffer, id,
SftpConstants.SSH_FX_FAILURE,
"Version selection not the 1st request for proposal = " + proposed);
session.close(true);
@@ -619,9 +635,9 @@ public class SftpSubsystem
}
if (result) {
version = Integer.parseInt(proposed);
- sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+ sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "");
} else {
- sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_FAILURE, "Unsupported version " + proposed);
+ sendStatus(buffer, id, SftpConstants.SSH_FX_FAILURE, "Unsupported version " + proposed);
session.close(true);
}
}
@@ -752,11 +768,10 @@ public class SftpSubsystem
getServerSession(), id, handle, h);
}
- Buffer reply = null;
try {
DirectoryHandle dh = validateHandle(handle, h, DirectoryHandle.class);
if (dh.isDone()) {
- sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_EOF, "Directory reading is done");
+ sendStatus(buffer, id, SftpConstants.SSH_FX_EOF, "Directory reading is done");
return;
}
@@ -781,40 +796,37 @@ public class SftpSubsystem
// Send only a few files at a time to not create packets of a too
// large size or have a timeout to occur.
- reply = BufferUtils.clear(buffer);
- reply.putByte((byte) SftpConstants.SSH_FXP_NAME);
- reply.putInt(id);
+ buffer.clear();
+ buffer.putInt(0);
+
+ buffer.putByte((byte) SftpConstants.SSH_FXP_NAME);
+ buffer.putInt(id);
- int lenPos = reply.wpos();
- reply.putInt(0);
+ int lenPos = buffer.wpos();
+ buffer.putInt(0);
ServerSession session = getServerSession();
int maxDataSize = session.getIntProperty(MAX_READDIR_DATA_SIZE_PROP, DEFAULT_MAX_READDIR_DATA_SIZE);
- int count = doReadDir(id, handle, dh, reply, maxDataSize, IoUtils.getLinkOptions(false));
- BufferUtils.updateLengthPlaceholder(reply, lenPos, count);
+ int count = doReadDir(id, handle, dh, buffer, maxDataSize, IoUtils.getLinkOptions(false));
+ BufferUtils.updateLengthPlaceholder(buffer, lenPos, count);
if ((!dh.isSendDot()) && (!dh.isSendDotDot()) && (!dh.hasNext())) {
dh.markDone();
}
Boolean indicator =
- SftpHelper.indicateEndOfNamesList(reply, getVersion(), session, dh.isDone());
+ SftpHelper.indicateEndOfNamesList(buffer, getVersion(), session, dh.isDone());
if (debugEnabled) {
log.debug("doReadDir({})({})[{}] - seding {} entries - eol={}", session, handle, h, count, indicator);
}
+ send(buffer);
} else {
// empty directory
dh.markDone();
- sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_EOF, "Empty directory");
- return;
+ sendStatus(buffer, id, SftpConstants.SSH_FX_EOF, "Empty directory");
}
-
- Objects.requireNonNull(reply, "No reply buffer created");
} catch (IOException | RuntimeException e) {
- sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_READDIR, handle);
- return;
+ sendStatus(buffer, id, e, SftpConstants.SSH_FXP_READDIR, handle);
}
-
- send(reply);
}
@Override
@@ -990,6 +1002,7 @@ public class SftpSubsystem
}
buffer.clear();
+ buffer.putInt(0);
buffer.putByte((byte) SftpConstants.SSH_FXP_VERSION);
buffer.putInt(version);
@@ -1003,10 +1016,8 @@ public class SftpSubsystem
@Override
protected void send(Buffer buffer) throws IOException {
- int len = buffer.available();
- BufferUtils.writeInt(out, len, workBuf, 0, workBuf.length);
- out.write(buffer.array(), buffer.rpos(), len);
- out.flush();
+ BufferUtils.updateLengthPlaceholder(buffer, 0);
+ out.writePacket(buffer);
}
@Override
@@ -1032,17 +1043,6 @@ public class SftpSubsystem
}
}
- // if thread has not completed, cancel it
- if ((pendingFuture != null) && (!pendingFuture.isDone())) {
- boolean result = pendingFuture.cancel(true);
- // TODO consider waiting some reasonable (?) amount of time for cancellation
- if (debugEnabled) {
- log.debug("destroy(" + session + ") - cancel pending future=" + result);
- }
- }
-
- pendingFuture = null;
-
ExecutorService executors = getExecutorService();
if ((executors != null) && (!executors.isShutdown()) && isShutdownOnExit()) {
Collection<Runnable> runners = executors.shutdownNow();