You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2018/04/19 15:08:01 UTC
[5/5] mina-sshd git commit: [SSHD-812] Make SftpSubsystem an
AsyncCommand
[SSHD-812] Make SftpSubsystem an AsyncCommand
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/e07479f7
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/e07479f7
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/e07479f7
Branch: refs/heads/master
Commit: e07479f73966aee22be285ba3fc7e7d7ac3e9aea
Parents: 6bed356
Author: Guillaume Nodet <gn...@apache.org>
Authored: Thu Apr 19 17:03:59 2018 +0200
Committer: Guillaume Nodet <gn...@apache.org>
Committed: Thu Apr 19 17:07:41 2018 +0200
----------------------------------------------------------------------
.../server/subsystem/sftp/SftpSubsystem.java | 112 ++++++++++++++-----
1 file changed, 83 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e07479f7/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
----------------------------------------------------------------------
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
index ab1794e..ba73695 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
@@ -39,15 +39,20 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.sshd.common.Factory;
import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.channel.BufferedIoOutputStream;
import org.apache.sshd.common.digest.BuiltinDigests;
import org.apache.sshd.common.digest.DigestFactory;
import org.apache.sshd.common.file.FileSystemAware;
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoOutputStream;
import org.apache.sshd.common.random.Random;
import org.apache.sshd.common.subsystem.sftp.SftpConstants;
import org.apache.sshd.common.subsystem.sftp.SftpHelper;
@@ -59,10 +64,14 @@ import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
import org.apache.sshd.common.util.io.IoUtils;
import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
import org.apache.sshd.common.util.threads.ThreadUtils;
+import org.apache.sshd.server.AsyncCommand;
+import org.apache.sshd.server.ChannelSessionAware;
import org.apache.sshd.server.Command;
import org.apache.sshd.server.Environment;
import org.apache.sshd.server.ExitCallback;
import org.apache.sshd.server.SessionAware;
+import org.apache.sshd.server.channel.ChannelDataReceiver;
+import org.apache.sshd.server.channel.ChannelSession;
import org.apache.sshd.server.session.ServerSession;
/**
@@ -72,7 +81,8 @@ import org.apache.sshd.server.session.ServerSession;
*/
public class SftpSubsystem
extends AbstractSftpSubsystemHelper
- implements Command, Runnable, SessionAware, FileSystemAware, ExecutorServiceCarrier {
+ implements Command, Runnable, SessionAware, FileSystemAware, ExecutorServiceCarrier,
+ AsyncCommand, ChannelSessionAware, ChannelDataReceiver {
/**
* Properties key for the maximum of available open handles per session.
@@ -111,10 +121,11 @@ public class SftpSubsystem
public static final String MAX_READDIR_DATA_SIZE_PROP = "sftp-max-readdir-data-size";
public static final int DEFAULT_MAX_READDIR_DATA_SIZE = 16 * 1024;
+ protected static final Buffer CLOSE = new ByteArrayBuffer(null, 0, 0);
+
protected ExitCallback callback;
- protected InputStream in;
- protected OutputStream out;
- protected OutputStream err;
+ protected IoOutputStream out;
+ protected IoOutputStream err;
protected Environment env;
protected Random randomizer;
protected int fileHandleSize = DEFAULT_FILE_HANDLE_SIZE;
@@ -127,11 +138,14 @@ public class SftpSubsystem
protected int version;
protected final Map<String, byte[]> extensions = new TreeMap<>(Comparator.naturalOrder());
protected final Map<String, Handle> handles = new HashMap<>();
+ protected final Buffer buffer = new ByteArrayBuffer(1024);
+ protected final BlockingQueue<Buffer> requests = new LinkedBlockingQueue<>();
- private ServerSession serverSession;
- private final AtomicBoolean closed = new AtomicBoolean(false);
- private ExecutorService executorService;
- private boolean shutdownOnExit;
+ protected ServerSession serverSession;
+ protected ChannelSession channelSession;
+ protected final AtomicBoolean closed = new AtomicBoolean(false);
+ protected ExecutorService executorService;
+ protected boolean shutdownOnExit;
/**
* @param executorService The {@link ExecutorService} to be used by
@@ -207,6 +221,12 @@ public class SftpSubsystem
}
@Override
+ public void setChannelSession(ChannelSession session) {
+ this.channelSession = session;
+ session.setDataReceiver(this);
+ }
+
+ @Override
public void setFileSystem(FileSystem fileSystem) {
if (fileSystem != this.fileSystem) {
this.fileSystem = fileSystem;
@@ -225,16 +245,31 @@ public class SftpSubsystem
@Override
public void setInputStream(InputStream in) {
- this.in = in;
+ // Do nothing
}
@Override
public void setOutputStream(OutputStream out) {
- this.out = out;
+ // Do nothing
}
@Override
public void setErrorStream(OutputStream err) {
+ // Do nothing
+ }
+
+ @Override
+ public void setIoInputStream(IoInputStream in) {
+ // Do nothing
+ }
+
+ @Override
+ public void setIoOutputStream(IoOutputStream out) {
+ this.out = new BufferedIoOutputStream("sftp out buffer", out);
+ }
+
+ @Override
+ public void setIoErrorStream(IoOutputStream err) {
this.err = err;
}
@@ -251,27 +286,40 @@ public class SftpSubsystem
}
@Override
+ public int data(ChannelSession channel, byte[] buf, int start, int len) throws IOException {
+ buffer.compact();
+ buffer.putRawBytes(buf, start, len);
+ while (buffer.available() >= Integer.BYTES) {
+ int rpos = buffer.rpos();
+ int msglen = buffer.getInt();
+ if (buffer.available() >= msglen) {
+ Buffer b = new ByteArrayBuffer(msglen + Integer.BYTES + Long.SIZE /* a bit extra */, false);
+ b.putInt(msglen);
+ b.putRawBytes(buffer.array(), buffer.rpos(), msglen);
+ requests.add(b);
+ buffer.rpos(rpos + msglen + Integer.BYTES);
+ } else {
+ buffer.rpos(rpos);
+ break;
+ }
+ }
+ return 0;
+ }
+
+ @Override
public void run() {
try {
- for (long count = 1L;; count++) {
- int length = BufferUtils.readInt(in, workBuf, 0, workBuf.length);
- ValidateUtils.checkTrue(length >= (Integer.BYTES + 1 /* command */), "Bad length to read: %d", length);
-
- Buffer buffer = new ByteArrayBuffer(length + Integer.BYTES + Long.SIZE /* a bit extra */, false);
- buffer.putInt(length);
- for (int remainLen = length; remainLen > 0;) {
- int l = in.read(buffer.array(), buffer.wpos(), remainLen);
- if (l < 0) {
- throw new IllegalArgumentException("Premature EOF at buffer #" + count + " while read length=" + length + " and remain=" + remainLen);
- }
- buffer.wpos(buffer.wpos() + l);
- remainLen -= l;
+ while (true) {
+ Buffer buffer = requests.take();
+ if (buffer == CLOSE) {
+ break;
}
-
+ int len = buffer.available();
process(buffer);
+ channelSession.getLocalWindow().consumeAndCheck(len);
}
} catch (Throwable t) {
- if ((!closed.get()) && (!(t instanceof EOFException))) { // Ignore
+ if (!closed.get()) { // Ignore
log.error("run({}) {} caught in SFTP subsystem: {}",
getServerSession(), t.getClass().getSimpleName(), t.getMessage());
if (log.isDebugEnabled()) {
@@ -296,6 +344,13 @@ public class SftpSubsystem
}
}
+ @Override
+ public void close() throws IOException {
+ requests.clear();
+ requests.add(CLOSE);
+ }
+
+ @Override
protected void doProcess(Buffer buffer, int length, int type, int id) throws IOException {
super.doProcess(buffer, length, type, id);
if (type != SftpConstants.SSH_FXP_INIT) {
@@ -873,15 +928,14 @@ public class SftpSubsystem
@Override
protected Buffer prepareReply(Buffer buffer) {
buffer.clear();
+ buffer.putInt(0);
return buffer;
}
@Override
protected void send(Buffer buffer) throws IOException {
- int len = buffer.available();
- BufferUtils.writeInt(out, len, workBuf, 0, workBuf.length);
- out.write(buffer.array(), buffer.rpos(), len);
- out.flush();
+ BufferUtils.updateLengthPlaceholder(buffer, 0);
+ out.writePacket(buffer);
}
@Override