You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2018/04/19 15:08:01 UTC

[5/5] mina-sshd git commit: [SSHD-812] Make SftpSubsystem an AsyncCommand

[SSHD-812] Make SftpSubsystem an AsyncCommand


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/e07479f7
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/e07479f7
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/e07479f7

Branch: refs/heads/master
Commit: e07479f73966aee22be285ba3fc7e7d7ac3e9aea
Parents: 6bed356
Author: Guillaume Nodet <gn...@apache.org>
Authored: Thu Apr 19 17:03:59 2018 +0200
Committer: Guillaume Nodet <gn...@apache.org>
Committed: Thu Apr 19 17:07:41 2018 +0200

----------------------------------------------------------------------
 .../server/subsystem/sftp/SftpSubsystem.java    | 112 ++++++++++++++-----
 1 file changed, 83 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e07479f7/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
----------------------------------------------------------------------
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
index ab1794e..ba73695 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
@@ -39,15 +39,20 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
 import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.sshd.common.Factory;
 import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.channel.BufferedIoOutputStream;
 import org.apache.sshd.common.digest.BuiltinDigests;
 import org.apache.sshd.common.digest.DigestFactory;
 import org.apache.sshd.common.file.FileSystemAware;
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoOutputStream;
 import org.apache.sshd.common.random.Random;
 import org.apache.sshd.common.subsystem.sftp.SftpConstants;
 import org.apache.sshd.common.subsystem.sftp.SftpHelper;
@@ -59,10 +64,14 @@ import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.common.util.io.IoUtils;
 import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
 import org.apache.sshd.common.util.threads.ThreadUtils;
+import org.apache.sshd.server.AsyncCommand;
+import org.apache.sshd.server.ChannelSessionAware;
 import org.apache.sshd.server.Command;
 import org.apache.sshd.server.Environment;
 import org.apache.sshd.server.ExitCallback;
 import org.apache.sshd.server.SessionAware;
+import org.apache.sshd.server.channel.ChannelDataReceiver;
+import org.apache.sshd.server.channel.ChannelSession;
 import org.apache.sshd.server.session.ServerSession;
 
 /**
@@ -72,7 +81,8 @@ import org.apache.sshd.server.session.ServerSession;
  */
 public class SftpSubsystem
         extends AbstractSftpSubsystemHelper
-        implements Command, Runnable, SessionAware, FileSystemAware, ExecutorServiceCarrier {
+        implements Command, Runnable, SessionAware, FileSystemAware, ExecutorServiceCarrier,
+                    AsyncCommand, ChannelSessionAware, ChannelDataReceiver {
 
     /**
      * Properties key for the maximum of available open handles per session.
@@ -111,10 +121,11 @@ public class SftpSubsystem
     public static final String MAX_READDIR_DATA_SIZE_PROP = "sftp-max-readdir-data-size";
     public static final int DEFAULT_MAX_READDIR_DATA_SIZE = 16 * 1024;
 
+    protected static final Buffer CLOSE = new ByteArrayBuffer(null, 0, 0);
+
     protected ExitCallback callback;
-    protected InputStream in;
-    protected OutputStream out;
-    protected OutputStream err;
+    protected IoOutputStream out;
+    protected IoOutputStream err;
     protected Environment env;
     protected Random randomizer;
     protected int fileHandleSize = DEFAULT_FILE_HANDLE_SIZE;
@@ -127,11 +138,14 @@ public class SftpSubsystem
     protected int version;
     protected final Map<String, byte[]> extensions = new TreeMap<>(Comparator.naturalOrder());
     protected final Map<String, Handle> handles = new HashMap<>();
+    protected final Buffer buffer = new ByteArrayBuffer(1024);
+    protected final BlockingQueue<Buffer> requests = new LinkedBlockingQueue<>();
 
-    private ServerSession serverSession;
-    private final AtomicBoolean closed = new AtomicBoolean(false);
-    private ExecutorService executorService;
-    private boolean shutdownOnExit;
+    protected ServerSession serverSession;
+    protected ChannelSession channelSession;
+    protected final AtomicBoolean closed = new AtomicBoolean(false);
+    protected ExecutorService executorService;
+    protected boolean shutdownOnExit;
 
     /**
      * @param executorService The {@link ExecutorService} to be used by
@@ -207,6 +221,12 @@ public class SftpSubsystem
     }
 
     @Override
+    public void setChannelSession(ChannelSession session) {
+        this.channelSession = session;
+        session.setDataReceiver(this);
+    }
+
+    @Override
     public void setFileSystem(FileSystem fileSystem) {
         if (fileSystem != this.fileSystem) {
             this.fileSystem = fileSystem;
@@ -225,16 +245,31 @@ public class SftpSubsystem
 
     @Override
     public void setInputStream(InputStream in) {
-        this.in = in;
+        // Do nothing
     }
 
     @Override
     public void setOutputStream(OutputStream out) {
-        this.out = out;
+        // Do nothing
     }
 
     @Override
     public void setErrorStream(OutputStream err) {
+        // Do nothing
+    }
+
+    @Override
+    public void setIoInputStream(IoInputStream in) {
+        // Do nothing
+    }
+
+    @Override
+    public void setIoOutputStream(IoOutputStream out) {
+        this.out = new BufferedIoOutputStream("sftp out buffer", out);
+    }
+
+    @Override
+    public void setIoErrorStream(IoOutputStream err) {
         this.err = err;
     }
 
@@ -251,27 +286,40 @@ public class SftpSubsystem
     }
 
     @Override
+    public int data(ChannelSession channel, byte[] buf, int start, int len) throws IOException {
+        buffer.compact();
+        buffer.putRawBytes(buf, start, len);
+        while (buffer.available() >= Integer.BYTES) {
+            int rpos = buffer.rpos();
+            int msglen = buffer.getInt();
+            if (buffer.available() >= msglen) {
+                Buffer b = new ByteArrayBuffer(msglen + Integer.BYTES + Long.SIZE /* a bit extra */, false);
+                b.putInt(msglen);
+                b.putRawBytes(buffer.array(), buffer.rpos(), msglen);
+                requests.add(b);
+                buffer.rpos(rpos + msglen + Integer.BYTES);
+            } else {
+                buffer.rpos(rpos);
+                break;
+            }
+        }
+        return 0;
+    }
+
+    @Override
     public void run() {
         try {
-            for (long count = 1L;; count++) {
-                int length = BufferUtils.readInt(in, workBuf, 0, workBuf.length);
-                ValidateUtils.checkTrue(length >= (Integer.BYTES + 1 /* command */), "Bad length to read: %d", length);
-
-                Buffer buffer = new ByteArrayBuffer(length + Integer.BYTES + Long.SIZE /* a bit extra */, false);
-                buffer.putInt(length);
-                for (int remainLen = length; remainLen > 0;) {
-                    int l = in.read(buffer.array(), buffer.wpos(), remainLen);
-                    if (l < 0) {
-                        throw new IllegalArgumentException("Premature EOF at buffer #" + count + " while read length=" + length + " and remain=" + remainLen);
-                    }
-                    buffer.wpos(buffer.wpos() + l);
-                    remainLen -= l;
+            while (true) {
+                Buffer buffer = requests.take();
+                if (buffer == CLOSE) {
+                    break;
                 }
-
+                int len = buffer.available();
                 process(buffer);
+                channelSession.getLocalWindow().consumeAndCheck(len);
             }
         } catch (Throwable t) {
-            if ((!closed.get()) && (!(t instanceof EOFException))) { // Ignore
+            if (!closed.get()) { // Ignore
                 log.error("run({}) {} caught in SFTP subsystem: {}",
                           getServerSession(), t.getClass().getSimpleName(), t.getMessage());
                 if (log.isDebugEnabled()) {
@@ -296,6 +344,13 @@ public class SftpSubsystem
         }
     }
 
+    @Override
+    public void close() throws IOException {
+        requests.clear();
+        requests.add(CLOSE);
+    }
+
+    @Override
     protected void doProcess(Buffer buffer, int length, int type, int id) throws IOException {
         super.doProcess(buffer, length, type, id);
         if (type != SftpConstants.SSH_FXP_INIT) {
@@ -873,15 +928,14 @@ public class SftpSubsystem
     @Override
     protected Buffer prepareReply(Buffer buffer) {
         buffer.clear();
+        buffer.putInt(0);
         return buffer;
     }
 
     @Override
     protected void send(Buffer buffer) throws IOException {
-        int len = buffer.available();
-        BufferUtils.writeInt(out, len, workBuf, 0, workBuf.length);
-        out.write(buffer.array(), buffer.rpos(), len);
-        out.flush();
+        BufferUtils.updateLengthPlaceholder(buffer, 0);
+        out.writePacket(buffer);
     }
 
     @Override