You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2020/04/21 08:46:13 UTC

[mina-sshd] branch master updated (9099a2d -> 3d28d18)

This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git.


    from 9099a2d  Tune down the output when downloading sources / javadocs for dependencies
     new e85b67e  [SSHD-979] Improve SFTP streaming
     new 3d28d18  [SSHD-980] Make the SFTP Api cleaner by moving the implementation classes into the non public package

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../common/channel/ChannelAsyncOutputStream.java   | 143 ++++++----
 .../common/session/helpers/AbstractSession.java    |   4 +-
 sshd-sftp/pom.xml                                  |  22 ++
 .../sshd/client/subsystem/sftp/RawSftpClient.java  |   9 +
 .../sshd/client/subsystem/sftp/SftpClient.java     |  90 ++----
 .../helpers/AbstractSftpClientExtension.java       |   5 +
 .../client/subsystem/sftp/fs/SftpFileSystem.java   |  15 +
 .../subsystem/sftp/fs/SftpFileSystemChannel.java   |  41 ---
 .../subsystem/sftp/fs/SftpFileSystemProvider.java  |  46 ++-
 .../subsystem/sftp/impl/AbstractSftpClient.java    |  95 ++++++-
 .../subsystem/sftp/impl/DefaultSftpClient.java     | 187 +++++++-----
 .../sftp/{ => impl}/SftpDirEntryIterator.java      |   3 +-
 .../subsystem/sftp/impl/SftpInputStreamAsync.java  | 312 +++++++++++++++++++++
 .../sftp/{ => impl}/SftpIterableDirEntry.java      |   3 +-
 .../subsystem/sftp/impl/SftpOutputStreamAsync.java | 201 +++++++++++++
 .../sftp/{ => impl}/SftpRemotePathChannel.java     |  71 ++---
 .../sftp/{ => impl}/StfpIterableDirHandle.java     |   3 +-
 .../subsystem/sftp/SftpInputStreamWithChannel.java |   0
 .../sftp/SftpOutputStreamWithChannel.java          |   0
 .../client/subsystem/sftp/SftpPerformanceTest.java | 243 ++++++++++++++++
 .../sshd/client/subsystem/sftp/SftpTest.java       |  21 +-
 .../client/subsystem/sftp/SftpTransferTest.java    | 134 +++++++++
 .../sftp/{ => impl}/SftpRemotePathChannelTest.java |   4 +-
 23 files changed, 1355 insertions(+), 297 deletions(-)
 delete mode 100644 sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/fs/SftpFileSystemChannel.java
 rename sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/{ => impl}/SftpDirEntryIterator.java (98%)
 create mode 100644 sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
 rename sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/{ => impl}/SftpIterableDirEntry.java (95%)
 create mode 100644 sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java
 rename sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/{ => impl}/SftpRemotePathChannel.java (90%)
 rename sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/{ => impl}/StfpIterableDirHandle.java (94%)
 rename sshd-sftp/src/{main => test}/java/org/apache/sshd/client/subsystem/sftp/SftpInputStreamWithChannel.java (100%)
 rename sshd-sftp/src/{main => test}/java/org/apache/sshd/client/subsystem/sftp/SftpOutputStreamWithChannel.java (100%)
 create mode 100644 sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpPerformanceTest.java
 create mode 100644 sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTransferTest.java
 rename sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/{ => impl}/SftpRemotePathChannelTest.java (98%)


[mina-sshd] 02/02: [SSHD-980] Make the SFTP Api cleaner by moving the implementation classes into the non public package

Posted by gn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git

commit 3d28d18b815c7cafe0bed1b7e96e3f346d174e50
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Sat Apr 18 20:13:07 2020 +0200

    [SSHD-980] Make the SFTP Api cleaner by moving the implementation classes into the non public package
---
 .../sshd/client/subsystem/sftp/SftpClient.java     | 34 +++++----------
 .../subsystem/sftp/fs/SftpFileSystemChannel.java   | 41 ------------------
 .../subsystem/sftp/fs/SftpFileSystemProvider.java  | 12 +++---
 .../subsystem/sftp/impl/AbstractSftpClient.java    | 50 ++++++++++++++++++++++
 .../sftp/{ => impl}/SftpDirEntryIterator.java      |  3 +-
 .../sftp/{ => impl}/SftpIterableDirEntry.java      |  3 +-
 .../sftp/{ => impl}/SftpRemotePathChannel.java     | 28 +++++-------
 .../sftp/{ => impl}/StfpIterableDirHandle.java     |  3 +-
 .../sftp/{ => impl}/SftpRemotePathChannelTest.java |  4 +-
 9 files changed, 86 insertions(+), 92 deletions(-)

diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpClient.java
index e78fa00..a31d973 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpClient.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.channels.Channel;
+import java.nio.channels.FileChannel;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.OpenOption;
@@ -771,13 +772,7 @@ public interface SftpClient extends SubsystemClient {
      *                     can be iterated only <U>once</U>
      * @throws IOException If failed to access the directory
      */
-    default Iterable<DirEntry> listDir(Handle handle) throws IOException {
-        if (!isOpen()) {
-            throw new IOException("listDir(" + handle + ") client is closed");
-        }
-
-        return new StfpIterableDirHandle(this, handle);
-    }
+    Iterable<DirEntry> listDir(Handle handle) throws IOException;
 
     /**
      * The effective &quot;normalized&quot; remote path
@@ -875,34 +870,31 @@ public interface SftpClient extends SubsystemClient {
     // High level API
     //
 
-    default SftpRemotePathChannel openRemotePathChannel(String path, OpenOption... options) throws IOException {
+    default FileChannel openRemotePathChannel(String path, OpenOption... options) throws IOException {
         return openRemotePathChannel(path, GenericUtils.isEmpty(options) ? Collections.emptyList() : Arrays.asList(options));
     }
 
-    default SftpRemotePathChannel openRemotePathChannel(String path, Collection<? extends OpenOption> options)
-            throws IOException {
+    default FileChannel openRemotePathChannel(String path, Collection<? extends OpenOption> options) throws IOException {
         return openRemoteFileChannel(path, OpenMode.fromOpenOptions(options));
     }
 
-    default SftpRemotePathChannel openRemoteFileChannel(String path, OpenMode... modes) throws IOException {
+    default FileChannel openRemoteFileChannel(String path, OpenMode... modes) throws IOException {
         return openRemoteFileChannel(path, GenericUtils.isEmpty(modes) ? Collections.emptyList() : Arrays.asList(modes));
     }
 
     /**
-     * Opens an {@link SftpRemotePathChannel} on the specified remote path
+     * Opens an {@link FileChannel} on the specified remote path
      *
      * @param  path        The remote path
      * @param  modes       The access mode(s) - if {@code null}/empty then the {@link #DEFAULT_CHANNEL_MODES} are used
-     * @return             The open {@link SftpRemotePathChannel} - <B>Note:</B> do not close this owner client instance
-     *                     until the channel is no longer needed since it uses the client for providing the channel's
+     * @return             The open {@link FileChannel} - <B>Note:</B> do not close this owner client instance until the
+     *                     channel is no longer needed since it uses the client for providing the channel's
      *                     functionality.
      * @throws IOException If failed to open the channel
      * @see                java.nio.channels.Channels#newInputStream(java.nio.channels.ReadableByteChannel)
      * @see                java.nio.channels.Channels#newOutputStream(java.nio.channels.WritableByteChannel)
      */
-    default SftpRemotePathChannel openRemoteFileChannel(String path, Collection<OpenMode> modes) throws IOException {
-        return new SftpRemotePathChannel(path, this, false, GenericUtils.isEmpty(modes) ? DEFAULT_CHANNEL_MODES : modes);
-    }
+    FileChannel openRemoteFileChannel(String path, Collection<OpenMode> modes) throws IOException;
 
     /**
      * @param  path        The remote directory path
@@ -911,13 +903,7 @@ public interface SftpClient extends SubsystemClient {
      * @throws IOException If failed to access the remote site
      * @see                #readDir(Handle)
      */
-    default Iterable<DirEntry> readDir(String path) throws IOException {
-        if (!isOpen()) {
-            throw new IOException("readDir(" + path + ") client is closed");
-        }
-
-        return new SftpIterableDirEntry(this, path);
-    }
+    Iterable<DirEntry> readDir(String path) throws IOException;
 
     default InputStream read(String path) throws IOException {
         return read(path, DEFAULT_READ_BUFFER_SIZE);
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/fs/SftpFileSystemChannel.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/fs/SftpFileSystemChannel.java
deleted file mode 100644
index 03c5fbc..0000000
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/fs/SftpFileSystemChannel.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sshd.client.subsystem.sftp.fs;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Objects;
-
-import org.apache.sshd.client.subsystem.sftp.SftpClient;
-import org.apache.sshd.client.subsystem.sftp.SftpRemotePathChannel;
-
-/**
- * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
- */
-public class SftpFileSystemChannel extends SftpRemotePathChannel {
-    public SftpFileSystemChannel(SftpPath p, Collection<SftpClient.OpenMode> modes) throws IOException {
-        this(Objects.requireNonNull(p, "No target path").toString(), p.getFileSystem(), modes);
-    }
-
-    public SftpFileSystemChannel(String remotePath, SftpFileSystem fs,
-                                 Collection<SftpClient.OpenMode> modes) throws IOException {
-        super(remotePath, Objects.requireNonNull(fs, "No SFTP file system").getClient(), true, modes);
-    }
-}
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/fs/SftpFileSystemProvider.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/fs/SftpFileSystemProvider.java
index 666658e..5e5ffe3 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/fs/SftpFileSystemProvider.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/fs/SftpFileSystemProvider.java
@@ -76,9 +76,9 @@ import org.apache.sshd.client.subsystem.sftp.SftpClient;
 import org.apache.sshd.client.subsystem.sftp.SftpClient.Attributes;
 import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode;
 import org.apache.sshd.client.subsystem.sftp.SftpClientFactory;
-import org.apache.sshd.client.subsystem.sftp.SftpRemotePathChannel;
 import org.apache.sshd.client.subsystem.sftp.SftpVersionSelector;
 import org.apache.sshd.client.subsystem.sftp.extensions.CopyFileExtension;
+import org.apache.sshd.client.subsystem.sftp.impl.SftpRemotePathChannel;
 import org.apache.sshd.common.PropertyResolver;
 import org.apache.sshd.common.PropertyResolverUtils;
 import org.apache.sshd.common.SshConstants;
@@ -480,9 +480,9 @@ public class SftpFileSystemProvider extends FileSystemProvider {
     @Override
     public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs)
             throws IOException {
-        Collection<SftpClient.OpenMode> modes = SftpClient.OpenMode.fromOpenOptions(options);
+        Collection<OpenMode> modes = OpenMode.fromOpenOptions(options);
         if (modes.isEmpty()) {
-            modes = EnumSet.of(SftpClient.OpenMode.Read, SftpClient.OpenMode.Write);
+            modes = EnumSet.of(OpenMode.Read, OpenMode.Write);
         }
         // TODO: process file attributes
         SftpPath p = toSftpPath(path);
@@ -491,9 +491,9 @@ public class SftpFileSystemProvider extends FileSystemProvider {
 
     @Override
     public InputStream newInputStream(Path path, OpenOption... options) throws IOException {
-        Collection<SftpClient.OpenMode> modes = SftpClient.OpenMode.fromOpenOptions(Arrays.asList(options));
+        Collection<OpenMode> modes = OpenMode.fromOpenOptions(Arrays.asList(options));
         if (modes.isEmpty()) {
-            modes = EnumSet.of(SftpClient.OpenMode.Read);
+            modes = EnumSet.of(OpenMode.Read);
         }
         SftpPath p = toSftpPath(path);
         return p.getFileSystem().getClient().read(p.toString(), modes);
@@ -501,7 +501,7 @@ public class SftpFileSystemProvider extends FileSystemProvider {
 
     @Override
     public OutputStream newOutputStream(Path path, OpenOption... options) throws IOException {
-        Set<SftpClient.OpenMode> modes = SftpClient.OpenMode.fromOpenOptions(Arrays.asList(options));
+        Set<OpenMode> modes = OpenMode.fromOpenOptions(Arrays.asList(options));
         if (modes.contains(OpenMode.Read)) {
             throw new IllegalArgumentException("READ not allowed");
         }
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java
index b96a24a..211f3ee 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java
@@ -21,6 +21,7 @@ package org.apache.sshd.client.subsystem.sftp.impl;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.channels.FileChannel;
 import java.nio.charset.Charset;
 import java.nio.file.attribute.FileTime;
 import java.util.ArrayList;
@@ -1280,6 +1281,55 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme
         checkCommandStatus(SftpConstants.SSH_FXP_UNBLOCK, buffer);
     }
 
+    /**
+     * @param  path        The remote directory path
+     * @return             An {@link Iterable} that can be used to iterate over all the directory entries (unlike
+     *                     {@link #readDir(Handle)})
+     * @throws IOException If failed to access the remote site
+     * @see                #readDir(Handle)
+     */
+    @Override
+    public Iterable<DirEntry> readDir(String path) throws IOException {
+        if (!isOpen()) {
+            throw new IOException("readDir(" + path + ") client is closed");
+        }
+
+        return new SftpIterableDirEntry(this, path);
+    }
+
+    /**
+     * @param  handle      A directory {@link Handle}
+     * @return             An {@link Iterable} that can be used to iterate over all the directory entries (like
+     *                     {@link #readDir(String)}). <B>Note:</B> the iterable instance is not re-usable - i.e., files
+     *                     can be iterated only <U>once</U>
+     * @throws IOException If failed to access the directory
+     */
+    @Override
+    public Iterable<DirEntry> listDir(Handle handle) throws IOException {
+        if (!isOpen()) {
+            throw new IOException("listDir(" + handle + ") client is closed");
+        }
+
+        return new StfpIterableDirHandle(this, handle);
+    }
+
+    /**
+     * Opens an {@link FileChannel} on the specified remote path
+     *
+     * @param  path        The remote path
+     * @param  modes       The access mode(s) - if {@code null}/empty then the {@link #DEFAULT_CHANNEL_MODES} are used
+     * @return             The open {@link FileChannel} - <B>Note:</B> do not close this owner client instance until the
+     *                     channel is no longer needed since it uses the client for providing the channel's
+     *                     functionality.
+     * @throws IOException If failed to open the channel
+     * @see                java.nio.channels.Channels#newInputStream(java.nio.channels.ReadableByteChannel)
+     * @see                java.nio.channels.Channels#newOutputStream(java.nio.channels.WritableByteChannel)
+     */
+    @Override
+    public SftpRemotePathChannel openRemoteFileChannel(String path, Collection<OpenMode> modes) throws IOException {
+        return new SftpRemotePathChannel(path, this, false, GenericUtils.isEmpty(modes) ? DEFAULT_CHANNEL_MODES : modes);
+    }
+
     @Override
     public InputStream read(String path, int bufferSize, Collection<OpenMode> mode) throws IOException {
         if (bufferSize < MIN_WRITE_BUFFER_SIZE) {
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpDirEntryIterator.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpDirEntryIterator.java
similarity index 98%
rename from sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpDirEntryIterator.java
rename to sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpDirEntryIterator.java
index 1a18902..f650cc4 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpDirEntryIterator.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpDirEntryIterator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sshd.client.subsystem.sftp;
+package org.apache.sshd.client.subsystem.sftp.impl;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -27,6 +27,7 @@ import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.sshd.client.subsystem.sftp.SftpClient;
 import org.apache.sshd.client.subsystem.sftp.SftpClient.DirEntry;
 import org.apache.sshd.client.subsystem.sftp.SftpClient.Handle;
 import org.apache.sshd.common.util.ValidateUtils;
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpIterableDirEntry.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpIterableDirEntry.java
similarity index 95%
rename from sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpIterableDirEntry.java
rename to sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpIterableDirEntry.java
index 269b723..6617654 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpIterableDirEntry.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpIterableDirEntry.java
@@ -16,11 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sshd.client.subsystem.sftp;
+package org.apache.sshd.client.subsystem.sftp.impl;
 
 import java.io.IOException;
 import java.util.Objects;
 
+import org.apache.sshd.client.subsystem.sftp.SftpClient;
 import org.apache.sshd.client.subsystem.sftp.SftpClient.DirEntry;
 import org.apache.sshd.common.util.ValidateUtils;
 
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpRemotePathChannel.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpRemotePathChannel.java
similarity index 94%
rename from sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpRemotePathChannel.java
rename to sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpRemotePathChannel.java
index e9d5f5a..b6275d1 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpRemotePathChannel.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpRemotePathChannel.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sshd.client.subsystem.sftp;
+package org.apache.sshd.client.subsystem.sftp.impl;
 
 import java.io.IOException;
 import java.io.StreamCorruptedException;
@@ -41,10 +41,9 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.client.subsystem.sftp.SftpClient;
 import org.apache.sshd.client.subsystem.sftp.SftpClient.Attributes;
-import org.apache.sshd.client.subsystem.sftp.impl.AbstractSftpClient;
-import org.apache.sshd.client.subsystem.sftp.impl.SftpInputStreamAsync;
-import org.apache.sshd.client.subsystem.sftp.impl.SftpOutputStreamAsync;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode;
 import org.apache.sshd.common.subsystem.sftp.SftpConstants;
 import org.apache.sshd.common.subsystem.sftp.SftpException;
 import org.apache.sshd.common.util.GenericUtils;
@@ -62,17 +61,13 @@ public class SftpRemotePathChannel extends FileChannel {
     /** Default value for {@value #COPY_BUFSIZE_PROP} setting */
     public static final int DEFAULT_TRANSFER_BUFFER_SIZE = IoUtils.DEFAULT_COPY_SIZE;
 
-    public static final Set<SftpClient.OpenMode> READ_MODES = Collections.unmodifiableSet(EnumSet.of(SftpClient.OpenMode.Read));
+    public static final Set<OpenMode> READ_MODES = Collections.unmodifiableSet(EnumSet.of(OpenMode.Read));
 
-    public static final Set<SftpClient.OpenMode> WRITE_MODES = Collections.unmodifiableSet(
-            EnumSet.of(
-                    SftpClient.OpenMode.Write,
-                    SftpClient.OpenMode.Append,
-                    SftpClient.OpenMode.Create,
-                    SftpClient.OpenMode.Truncate));
+    public static final Set<OpenMode> WRITE_MODES = Collections.unmodifiableSet(
+            EnumSet.of(OpenMode.Write, OpenMode.Append, OpenMode.Create, OpenMode.Truncate));
 
     protected final Logger log;
-    protected final Collection<SftpClient.OpenMode> modes;
+    protected final Collection<OpenMode> modes;
     protected final boolean closeOnExit;
     protected final SftpClient sftp;
     protected final SftpClient.CloseableHandle handle;
@@ -82,9 +77,8 @@ public class SftpRemotePathChannel extends FileChannel {
 
     private final String path;
 
-    public SftpRemotePathChannel(
-                                 String path, SftpClient sftp, boolean closeOnExit, Collection<SftpClient.OpenMode> modes)
-                                                                                                                           throws IOException {
+    public SftpRemotePathChannel(String path, SftpClient sftp, boolean closeOnExit,
+                                 Collection<OpenMode> modes) throws IOException {
         this.log = LoggerFactory.getLogger(getClass());
         this.path = ValidateUtils.checkNotNullAndNotEmpty(path, "No remote file path specified");
         this.modes = Objects.requireNonNull(modes, "No channel modes specified");
@@ -533,13 +527,13 @@ public class SftpRemotePathChannel extends FileChannel {
      * @param  reqModes    The required modes - ignored if {@code null}/empty
      * @throws IOException If channel not open or the required modes are not satisfied
      */
-    private void ensureOpen(Collection<SftpClient.OpenMode> reqModes) throws IOException {
+    private void ensureOpen(Collection<OpenMode> reqModes) throws IOException {
         if (!isOpen()) {
             throw new ClosedChannelException();
         }
 
         if (GenericUtils.size(reqModes) > 0) {
-            for (SftpClient.OpenMode m : reqModes) {
+            for (OpenMode m : reqModes) {
                 if (this.modes.contains(m)) {
                     return;
                 }
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/StfpIterableDirHandle.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/StfpIterableDirHandle.java
similarity index 94%
rename from sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/StfpIterableDirHandle.java
rename to sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/StfpIterableDirHandle.java
index c3be157..a4ee770 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/StfpIterableDirHandle.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/StfpIterableDirHandle.java
@@ -16,10 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sshd.client.subsystem.sftp;
+package org.apache.sshd.client.subsystem.sftp.impl;
 
 import java.util.Objects;
 
+import org.apache.sshd.client.subsystem.sftp.SftpClient;
 import org.apache.sshd.client.subsystem.sftp.SftpClient.DirEntry;
 import org.apache.sshd.client.subsystem.sftp.SftpClient.Handle;
 
diff --git a/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpRemotePathChannelTest.java b/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/impl/SftpRemotePathChannelTest.java
similarity index 98%
rename from sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpRemotePathChannelTest.java
rename to sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/impl/SftpRemotePathChannelTest.java
index 161d966..db2fcac 100644
--- a/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpRemotePathChannelTest.java
+++ b/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/impl/SftpRemotePathChannelTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sshd.client.subsystem.sftp;
+package org.apache.sshd.client.subsystem.sftp.impl;
 
 import java.io.IOException;
 import java.io.Writer;
@@ -31,6 +31,8 @@ import java.util.Date;
 import java.util.EnumSet;
 
 import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.client.subsystem.sftp.AbstractSftpClientTestSupport;
+import org.apache.sshd.client.subsystem.sftp.SftpClient;
 import org.apache.sshd.common.subsystem.sftp.SftpConstants;
 import org.apache.sshd.util.test.CommonTestSupportUtils;
 import org.junit.Before;


[mina-sshd] 01/02: [SSHD-979] Improve SFTP streaming

Posted by gn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git

commit e85b67e0dc6c5f10a7ad77b365d33905c095bff9
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Mon Apr 20 11:41:45 2020 +0200

    [SSHD-979] Improve SFTP streaming
    
    Work sponsorded by Buddy https://buddy.works/
---
 .../common/channel/ChannelAsyncOutputStream.java   | 143 ++++++----
 .../common/session/helpers/AbstractSession.java    |   4 +-
 sshd-sftp/pom.xml                                  |  22 ++
 .../sshd/client/subsystem/sftp/RawSftpClient.java  |   9 +
 .../sshd/client/subsystem/sftp/SftpClient.java     |  56 +---
 .../subsystem/sftp/SftpRemotePathChannel.java      |  49 ++--
 .../helpers/AbstractSftpClientExtension.java       |   5 +
 .../client/subsystem/sftp/fs/SftpFileSystem.java   |  15 +
 .../subsystem/sftp/fs/SftpFileSystemProvider.java  |  42 ++-
 .../subsystem/sftp/impl/AbstractSftpClient.java    |  45 ++-
 .../subsystem/sftp/impl/DefaultSftpClient.java     | 187 +++++++-----
 .../subsystem/sftp/impl/SftpInputStreamAsync.java  | 312 +++++++++++++++++++++
 .../subsystem/sftp/impl/SftpOutputStreamAsync.java | 201 +++++++++++++
 .../subsystem/sftp/SftpInputStreamWithChannel.java |   0
 .../sftp/SftpOutputStreamWithChannel.java          |   0
 .../client/subsystem/sftp/SftpPerformanceTest.java | 243 ++++++++++++++++
 .../sshd/client/subsystem/sftp/SftpTest.java       |  21 +-
 .../client/subsystem/sftp/SftpTransferTest.java    | 134 +++++++++
 18 files changed, 1276 insertions(+), 212 deletions(-)

diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
index 55af809..af80185 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
@@ -25,13 +25,13 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.future.CloseFuture;
-import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.io.IoOutputStream;
 import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.io.PacketWriter;
 import org.apache.sshd.common.io.WritePendingException;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.common.util.closeable.AbstractCloseable;
 
 public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOutputStream, ChannelHolder {
@@ -107,9 +107,39 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
         if (total > 0) {
             Channel channel = getChannel();
             Window remoteWindow = channel.getRemoteWindow();
-            long length = Math.min(Math.min(remoteWindow.getSize(), total), remoteWindow.getPacketSize());
-            if (log.isTraceEnabled()) {
-                log.trace("doWriteIfPossible({})[resume={}] attempting to write {} out of {}", this, resume, length, total);
+            long length;
+            if (remoteWindow.getSize() < total && total <= remoteWindow.getPacketSize()) {
+                // do not chunk when the window is smaller than the packet size
+                length = 0;
+                // do a defensive copy in case the user reuses the buffer
+                IoWriteFutureImpl f = new IoWriteFutureImpl(future.getId(), new ByteArrayBuffer(buffer.getCompactData()));
+                f.addListener(w -> future.setValue(w.getException() != null ? w.getException() : w.isWritten()));
+                pendingWrite.set(f);
+                if (log.isTraceEnabled()) {
+                    log.trace("doWriteIfPossible({})[resume={}] waiting for window space {}",
+                            this, resume, remoteWindow.getSize());
+                }
+            } else if (total > remoteWindow.getPacketSize()) {
+                if (buffer.rpos() > 0) {
+                    // do a defensive copy in case the user reuses the buffer
+                    IoWriteFutureImpl f = new IoWriteFutureImpl(future.getId(), new ByteArrayBuffer(buffer.getCompactData()));
+                    f.addListener(w -> future.setValue(w.getException() != null ? w.getException() : w.isWritten()));
+                    pendingWrite.set(f);
+                    length = remoteWindow.getPacketSize();
+                    if (log.isTraceEnabled()) {
+                        log.trace("doWriteIfPossible({})[resume={}] attempting to write {} out of {}",
+                                this, resume, length, total);
+                    }
+                    doWriteIfPossible(resume);
+                    return;
+                } else {
+                    length = remoteWindow.getPacketSize();
+                }
+            } else {
+                length = total;
+                if (log.isTraceEnabled()) {
+                    log.trace("doWriteIfPossible({})[resume={}] attempting to write {} bytes", this, resume, length);
+                }
             }
 
             if (length > 0) {
@@ -125,66 +155,12 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
                                                        + ") exceeds int boundaries");
                 }
 
-                Session s = channel.getSession();
-                Buffer buf = s.createBuffer(cmd, (int) length + 12);
-                buf.putInt(channel.getRecipient());
-                if (cmd == SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA) {
-                    buf.putInt(SshConstants.SSH_EXTENDED_DATA_STDERR);
-                }
-                buf.putInt(length);
-                buf.putRawBytes(buffer.array(), buffer.rpos(), (int) length);
-                buffer.rpos(buffer.rpos() + (int) length);
+                Buffer buf = createSendBuffer(buffer, channel, length);
                 remoteWindow.consume(length);
 
                 try {
-                    ChannelAsyncOutputStream stream = this;
                     IoWriteFuture writeFuture = packetWriter.writePacket(buf);
-                    writeFuture.addListener(new SshFutureListener<IoWriteFuture>() {
-                        @Override
-                        public void operationComplete(IoWriteFuture f) {
-                            if (f.isWritten()) {
-                                handleOperationCompleted();
-                            } else {
-                                handleOperationFailed(f.getException());
-                            }
-                        }
-
-                        @SuppressWarnings("synthetic-access")
-                        private void handleOperationCompleted() {
-                            if (total > length) {
-                                if (log.isTraceEnabled()) {
-                                    log.trace("doWriteIfPossible({}) completed write of {} out of {}", stream, length, total);
-                                }
-                                doWriteIfPossible(false);
-                            } else {
-                                boolean nullified = pendingWrite.compareAndSet(future, null);
-                                if (log.isTraceEnabled()) {
-                                    log.trace("doWriteIfPossible({}) completed write len={}, more={}",
-                                            stream, total, !nullified);
-                                }
-                                future.setValue(Boolean.TRUE);
-                            }
-                        }
-
-                        @SuppressWarnings("synthetic-access")
-                        private void handleOperationFailed(Throwable reason) {
-                            if (log.isDebugEnabled()) {
-                                log.debug("doWriteIfPossible({}) failed ({}) to complete write of {} out of {}: {}",
-                                        stream, reason.getClass().getSimpleName(), length, total, reason.getMessage());
-                            }
-
-                            if (log.isTraceEnabled()) {
-                                log.trace("doWriteIfPossible(" + this + ") write failure details", reason);
-                            }
-
-                            boolean nullified = pendingWrite.compareAndSet(future, null);
-                            if (log.isTraceEnabled()) {
-                                log.trace("doWriteIfPossible({}) failed write len={}, more={}",
-                                        stream, total, !nullified);
-                            }
-                            future.setValue(reason);
-                        }
-                    });
+                    writeFuture.addListener(f -> onWritten(future, total, length, f));
                 } catch (IOException e) {
                     future.setValue(e);
                 }
@@ -202,6 +178,53 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
         }
     }
 
+    protected void onWritten(IoWriteFutureImpl future, int total, long length, IoWriteFuture f) {
+        if (f.isWritten()) {
+            if (total > length) {
+                if (log.isTraceEnabled()) {
+                    log.trace("onWritten({}) completed write of {} out of {}",
+                            this, length, total);
+                }
+                doWriteIfPossible(false);
+            } else {
+                boolean nullified = pendingWrite.compareAndSet(future, null);
+                if (log.isTraceEnabled()) {
+                    log.trace("onWritten({}) completed write len={}, more={}",
+                            this, total, !nullified);
+                }
+                future.setValue(Boolean.TRUE);
+            }
+        } else {
+            Throwable reason = f.getException();
+            if (log.isDebugEnabled()) {
+                log.debug("onWritten({}) failed ({}) to complete write of {} out of {}: {}",
+                        this, reason.getClass().getSimpleName(), length, total, reason.getMessage());
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("onWritten(" + this + ") write failure details", reason);
+            }
+            boolean nullified = pendingWrite.compareAndSet(future, null);
+            if (log.isTraceEnabled()) {
+                log.trace("onWritten({}) failed write len={}, more={}",
+                        this, total, !nullified);
+            }
+            future.setValue(reason);
+        }
+    }
+
+    protected Buffer createSendBuffer(Buffer buffer, Channel channel, long length) {
+        Session s = channel.getSession();
+        Buffer buf = s.createBuffer(cmd, (int) length + 12);
+        buf.putInt(channel.getRecipient());
+        if (cmd == SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA) {
+            buf.putInt(SshConstants.SSH_EXTENDED_DATA_STDERR);
+        }
+        buf.putInt(length);
+        buf.putRawBytes(buffer.array(), buffer.rpos(), (int) length);
+        buffer.rpos(buffer.rpos() + (int) length);
+        return buf;
+    }
+
     @Override
     public String toString() {
         return getClass().getSimpleName() + "[" + getChannel() + "] cmd=" + SshConstants.getCommandMessageName(cmd & 0xFF);
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
index e6d7c33..e8eda37 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
@@ -954,7 +954,9 @@ public abstract class AbstractSession extends SessionHelper {
         synchronized (encodeLock) {
             Buffer packet = resolveOutputPacket(buffer);
             IoSession networkSession = getIoSession();
-            return networkSession.writePacket(packet);
+            IoWriteFuture future = networkSession.writePacket(packet);
+            buffer.rpos(buffer.wpos());
+            return future;
         }
     }
 
diff --git a/sshd-sftp/pom.xml b/sshd-sftp/pom.xml
index 9116746..4cf78e3 100644
--- a/sshd-sftp/pom.xml
+++ b/sshd-sftp/pom.xml
@@ -85,8 +85,30 @@
             <artifactId>jzlib</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>toxiproxy</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.testcontainers</groupId>
+                <artifactId>testcontainers-bom</artifactId>
+                <type>pom</type>
+                <version>1.14.0</version>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
     <build>
         <resources>
             <resource>
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/RawSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/RawSftpClient.java
index 0cd90af..560ce55 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/RawSftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/RawSftpClient.java
@@ -41,4 +41,13 @@ public interface RawSftpClient {
      * @throws IOException If connection closed or interrupted
      */
     Buffer receive(int id) throws IOException;
+
+    /**
+     * @param  id          The expected request id
+     * @param  timeout     The amount of time to wait for the response
+     * @return             The received response {@link Buffer} containing the request id
+     * @throws IOException If connection closed or interrupted
+     */
+    Buffer receive(int id, long timeout) throws IOException;
+
 }
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpClient.java
index 593e996..e78fa00 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpClient.java
@@ -80,12 +80,14 @@ public interface SftpClient extends SubsystemClient {
         /**
          * The {@link Set} of {@link OpenOption}-s supported by {@link #fromOpenOptions(Collection)}
          */
-        public static final Set<OpenOption> SUPPORTED_OPTIONS = Collections.unmodifiableSet(
-                EnumSet.of(
-                        StandardOpenOption.READ, StandardOpenOption.APPEND,
-                        StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING,
-                        StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW,
-                        StandardOpenOption.SPARSE));
+        public static final Set<OpenOption> SUPPORTED_OPTIONS = Collections.unmodifiableSet(EnumSet.of(
+                StandardOpenOption.READ,
+                StandardOpenOption.APPEND,
+                StandardOpenOption.CREATE,
+                StandardOpenOption.TRUNCATE_EXISTING,
+                StandardOpenOption.WRITE,
+                StandardOpenOption.CREATE_NEW,
+                StandardOpenOption.SPARSE));
 
         /**
          * Converts {@link StandardOpenOption}-s into {@link OpenMode}-s
@@ -464,17 +466,9 @@ public interface SftpClient extends SubsystemClient {
 
         @Override
         public String toString() {
-            return "type=" + getType()
-                   + ";size=" + getSize()
-                   + ";uid=" + getUserId()
-                   + ";gid=" + getGroupId()
-                   + ";perms=0x" + Integer.toHexString(getPermissions())
-                   + ";flags=" + getFlags()
-                   + ";owner=" + getOwner()
-                   + ";group=" + getGroup()
-                   + ";aTime=" + getAccessTime()
-                   + ";cTime=" + getCreateTime()
-                   + ";mTime=" + getModifyTime()
+            return "type=" + getType() + ";size=" + getSize() + ";uid=" + getUserId() + ";gid=" + getGroupId() + ";perms=0x"
+                   + Integer.toHexString(getPermissions()) + ";flags=" + getFlags() + ";owner=" + getOwner() + ";group="
+                   + getGroup() + ";aTime=" + getAccessTime() + ";cTime=" + getCreateTime() + ";mTime=" + getModifyTime()
                    + ";extensions=" + getExtensions().keySet();
         }
     }
@@ -541,7 +535,7 @@ public interface SftpClient extends SubsystemClient {
     DirEntry[] EMPTY_DIR_ENTRIES = new DirEntry[0];
 
     // default values used if none specified
-    int MIN_BUFFER_SIZE = Byte.MAX_VALUE;
+    int MIN_BUFFER_SIZE = 256;
     int MIN_READ_BUFFER_SIZE = MIN_BUFFER_SIZE;
     int MIN_WRITE_BUFFER_SIZE = MIN_BUFFER_SIZE;
     int IO_BUFFER_SIZE = 32 * 1024;
@@ -954,18 +948,7 @@ public interface SftpClient extends SubsystemClient {
      * @return             An {@link InputStream} for reading the remote file data
      * @throws IOException If failed to execute
      */
-    default InputStream read(String path, int bufferSize, Collection<OpenMode> mode) throws IOException {
-        if (bufferSize < MIN_READ_BUFFER_SIZE) {
-            throw new IllegalArgumentException(
-                    "Insufficient read buffer size: " + bufferSize + ", min.=" + MIN_READ_BUFFER_SIZE);
-        }
-
-        if (!isOpen()) {
-            throw new IOException("read(" + path + ")[" + mode + "] size=" + bufferSize + ": client is closed");
-        }
-
-        return new SftpInputStreamWithChannel(this, bufferSize, path, mode);
-    }
+    InputStream read(String path, int bufferSize, Collection<OpenMode> mode) throws IOException;
 
     default OutputStream write(String path) throws IOException {
         return write(path, DEFAULT_WRITE_BUFFER_SIZE);
@@ -996,18 +979,7 @@ public interface SftpClient extends SubsystemClient {
      * @return             An {@link OutputStream} for writing the data
      * @throws IOException If failed to execute
      */
-    default OutputStream write(String path, int bufferSize, Collection<OpenMode> mode) throws IOException {
-        if (bufferSize < MIN_WRITE_BUFFER_SIZE) {
-            throw new IllegalArgumentException(
-                    "Insufficient write buffer size: " + bufferSize + ", min.=" + MIN_WRITE_BUFFER_SIZE);
-        }
-
-        if (!isOpen()) {
-            throw new IOException("write(" + path + ")[" + mode + "] size=" + bufferSize + ": client is closed");
-        }
-
-        return new SftpOutputStreamWithChannel(this, bufferSize, path, mode);
-    }
+    OutputStream write(String path, int bufferSize, Collection<OpenMode> mode) throws IOException;
 
     /**
      * @param  <E>           The generic extension type
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpRemotePathChannel.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpRemotePathChannel.java
index 2789cd2..e9d5f5a 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpRemotePathChannel.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpRemotePathChannel.java
@@ -42,6 +42,9 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.sshd.client.session.ClientSession;
 import org.apache.sshd.client.subsystem.sftp.SftpClient.Attributes;
+import org.apache.sshd.client.subsystem.sftp.impl.AbstractSftpClient;
+import org.apache.sshd.client.subsystem.sftp.impl.SftpInputStreamAsync;
+import org.apache.sshd.client.subsystem.sftp.impl.SftpOutputStreamAsync;
 import org.apache.sshd.common.subsystem.sftp.SftpConstants;
 import org.apache.sshd.common.subsystem.sftp.SftpException;
 import org.apache.sshd.common.util.GenericUtils;
@@ -230,6 +233,18 @@ public class SftpRemotePathChannel extends FileChannel {
         return doWrite(buffers, -1L);
     }
 
+    static class Ack {
+        int id;
+        long offset;
+        int length;
+
+        Ack(int id, long offset, int length) {
+            this.id = id;
+            this.offset = offset;
+            this.length = length;
+        }
+    }
+
     protected long doWrite(Collection<? extends ByteBuffer> buffers, long position) throws IOException {
         ensureOpen(WRITE_MODES);
 
@@ -346,30 +361,19 @@ public class SftpRemotePathChannel extends FileChannel {
         }
 
         boolean completed = false;
-        boolean eof = false;
-        long curPos = position;
-        int bufSize = (int) Math.min(count, copySize);
-        byte[] buffer = new byte[bufSize];
-        long totalRead = 0L;
+        boolean eof;
+        long totalRead;
 
         synchronized (lock) {
             try {
                 beginBlocking("transferTo");
 
-                while (totalRead < count && !eof) {
-                    int read = sftp.read(handle, curPos, buffer, 0,
-                            (int) Math.min(count - totalRead, buffer.length));
-                    if (read > 0) {
-                        ByteBuffer wrap = ByteBuffer.wrap(buffer, 0, read);
-                        while (wrap.remaining() > 0) {
-                            target.write(wrap);
-                        }
-                        curPos += read;
-                        totalRead += read;
-                    } else {
-                        eof = read == -1;
-                    }
-                }
+                SftpInputStreamAsync input = new SftpInputStreamAsync(
+                        (AbstractSftpClient) sftp,
+                        copySize, position, count, getRemotePath(), handle);
+                totalRead = input.transferTo(count, target);
+                // DO NOT CLOSE THE STREAM AS IT WOULD CLOSE THE HANDLE
+                eof = input.isEof();
                 completed = true;
             } finally {
                 endBlocking("transferTo", completed);
@@ -410,18 +414,23 @@ public class SftpRemotePathChannel extends FileChannel {
             try {
                 beginBlocking("transferFrom");
 
+                SftpOutputStreamAsync output = new SftpOutputStreamAsync(
+                        (AbstractSftpClient) sftp,
+                        copySize, getRemotePath(), handle);
                 while (totalRead < count) {
                     ByteBuffer wrap = ByteBuffer.wrap(
                             buffer, 0, (int) Math.min(buffer.length, count - totalRead));
                     int read = src.read(wrap);
                     if (read > 0) {
-                        sftp.write(handle, curPos, buffer, 0, read);
+                        output.write(buffer, 0, read);
                         curPos += read;
                         totalRead += read;
                     } else {
                         break;
                     }
                 }
+                output.flush();
+                // DO NOT CLOSE THE OUTPUT STREAM AS IT WOULD CLOSE THE HANDLE
                 completed = true;
             } finally {
                 endBlocking("transferFrom", completed);
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/extensions/helpers/AbstractSftpClientExtension.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/extensions/helpers/AbstractSftpClientExtension.java
index 4ad73c3..8ee2293 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/extensions/helpers/AbstractSftpClientExtension.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/extensions/helpers/AbstractSftpClientExtension.java
@@ -95,6 +95,11 @@ public abstract class AbstractSftpClientExtension extends AbstractLoggingBean im
     }
 
     @Override
+    public Buffer receive(int id, long timeout) throws IOException {
+        return raw.receive(id, timeout);
+    }
+
+    @Override
     public final boolean isSupported() {
         return supported;
     }
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/fs/SftpFileSystem.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/fs/SftpFileSystem.java
index c60ae8f..ad8d6dd 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/fs/SftpFileSystem.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/fs/SftpFileSystem.java
@@ -557,6 +557,21 @@ public class SftpFileSystem
                         "receive(id=" + id + ") delegate is not a " + RawSftpClient.class.getSimpleName());
             }
         }
+
+        @Override
+        public Buffer receive(int id, long timeout) throws IOException {
+            if (!isOpen()) {
+                throw new IOException("receive(id=" + id + ", timeout=" + timeout + ") client is closed");
+            }
+
+            if (delegate instanceof RawSftpClient) {
+                return ((RawSftpClient) delegate).receive(id, timeout);
+            } else {
+                throw new StreamCorruptedException(
+                        "receive(id=" + id + ", timeout=" + timeout + ") delegate is not a "
+                                                   + RawSftpClient.class.getSimpleName());
+            }
+        }
     }
 
     public static class DefaultUserPrincipalLookupService extends UserPrincipalLookupService {
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/fs/SftpFileSystemProvider.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/fs/SftpFileSystemProvider.java
index 774e17d..666658e 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/fs/SftpFileSystemProvider.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/fs/SftpFileSystemProvider.java
@@ -74,8 +74,11 @@ import org.apache.sshd.client.SshClient;
 import org.apache.sshd.client.session.ClientSession;
 import org.apache.sshd.client.subsystem.sftp.SftpClient;
 import org.apache.sshd.client.subsystem.sftp.SftpClient.Attributes;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode;
 import org.apache.sshd.client.subsystem.sftp.SftpClientFactory;
+import org.apache.sshd.client.subsystem.sftp.SftpRemotePathChannel;
 import org.apache.sshd.client.subsystem.sftp.SftpVersionSelector;
+import org.apache.sshd.client.subsystem.sftp.extensions.CopyFileExtension;
 import org.apache.sshd.common.PropertyResolver;
 import org.apache.sshd.common.PropertyResolverUtils;
 import org.apache.sshd.common.SshConstants;
@@ -482,7 +485,33 @@ public class SftpFileSystemProvider extends FileSystemProvider {
             modes = EnumSet.of(SftpClient.OpenMode.Read, SftpClient.OpenMode.Write);
         }
         // TODO: process file attributes
-        return new SftpFileSystemChannel(toSftpPath(path), modes);
+        SftpPath p = toSftpPath(path);
+        return new SftpRemotePathChannel(p.toString(), p.getFileSystem().getClient(), true, modes);
+    }
+
+    @Override
+    public InputStream newInputStream(Path path, OpenOption... options) throws IOException {
+        Collection<SftpClient.OpenMode> modes = SftpClient.OpenMode.fromOpenOptions(Arrays.asList(options));
+        if (modes.isEmpty()) {
+            modes = EnumSet.of(SftpClient.OpenMode.Read);
+        }
+        SftpPath p = toSftpPath(path);
+        return p.getFileSystem().getClient().read(p.toString(), modes);
+    }
+
+    @Override
+    public OutputStream newOutputStream(Path path, OpenOption... options) throws IOException {
+        Set<SftpClient.OpenMode> modes = SftpClient.OpenMode.fromOpenOptions(Arrays.asList(options));
+        if (modes.contains(OpenMode.Read)) {
+            throw new IllegalArgumentException("READ not allowed");
+        }
+        if (modes.isEmpty()) {
+            modes = EnumSet.of(OpenMode.Create, OpenMode.Truncate, OpenMode.Write);
+        } else {
+            modes.add(OpenMode.Write);
+        }
+        SftpPath p = toSftpPath(path);
+        return p.getFileSystem().getClient().write(p.toString(), modes);
     }
 
     @Override
@@ -591,9 +620,14 @@ public class SftpFileSystemProvider extends FileSystemProvider {
         if (attrs.isDirectory()) {
             createDirectory(target);
         } else {
-            try (InputStream in = newInputStream(source);
-                 OutputStream os = newOutputStream(target)) {
-                IoUtils.copy(in, os);
+            CopyFileExtension copyFile = src.getFileSystem().getClient().getExtension(CopyFileExtension.class);
+            if (copyFile.isSupported()) {
+                copyFile.copyFile(source.toString(), target.toString(), false);
+            } else {
+                try (InputStream in = newInputStream(source);
+                     OutputStream os = newOutputStream(target)) {
+                    IoUtils.copy(in, os);
+                }
             }
         }
 
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java
index 577ecdb..b96a24a 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java
@@ -19,6 +19,8 @@
 package org.apache.sshd.client.subsystem.sftp.impl;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.charset.Charset;
 import java.nio.file.attribute.FileTime;
 import java.util.ArrayList;
@@ -786,7 +788,6 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme
         if (eofSignalled != null) {
             eofSignalled.set(null);
         }
-
         if (!isOpen()) {
             throw new IOException("read(" + handle + "/" + fileOffset + ")[" + dstOffset + "/" + len + "] client is closed");
         }
@@ -1278,4 +1279,46 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme
         buffer.putLong(length);
         checkCommandStatus(SftpConstants.SSH_FXP_UNBLOCK, buffer);
     }
+
+    @Override
+    public InputStream read(String path, int bufferSize, Collection<OpenMode> mode) throws IOException {
+        if (bufferSize < MIN_WRITE_BUFFER_SIZE) {
+            throw new IllegalArgumentException(
+                    "Insufficient read buffer size: " + bufferSize + ", min.="
+                                               + MIN_READ_BUFFER_SIZE);
+        }
+
+        if (!isOpen()) {
+            throw new IOException("write(" + path + ")[" + mode + "] size=" + bufferSize + ": client is closed");
+        }
+
+        return new SftpInputStreamAsync(this, bufferSize, path, mode);
+    }
+
+    @Override
+    public InputStream read(String path, Collection<OpenMode> mode) throws IOException {
+        int packetSize = (int) getChannel().getRemoteWindow().getPacketSize();
+        return read(path, packetSize, mode);
+    }
+
+    @Override
+    public OutputStream write(String path, int bufferSize, Collection<OpenMode> mode) throws IOException {
+        if (bufferSize < MIN_WRITE_BUFFER_SIZE) {
+            throw new IllegalArgumentException(
+                    "Insufficient write buffer size: " + bufferSize + ", min.="
+                                               + MIN_WRITE_BUFFER_SIZE);
+        }
+
+        if (!isOpen()) {
+            throw new IOException("write(" + path + ")[" + mode + "] size=" + bufferSize + ": client is closed");
+        }
+
+        return new SftpOutputStreamAsync(this, bufferSize, path, mode);
+    }
+
+    @Override
+    public OutputStream write(String path, Collection<OpenMode> mode) throws IOException {
+        int packetSize = (int) getChannel().getRemoteWindow().getPacketSize();
+        return write(path, packetSize, mode);
+    }
 }
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java
index 28e6b4f..5ef0e18 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java
@@ -21,7 +21,6 @@ package org.apache.sshd.client.subsystem.sftp.impl;
 import java.io.ByteArrayOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.InterruptedIOException;
 import java.io.OutputStream;
 import java.io.StreamCorruptedException;
@@ -48,6 +47,10 @@ import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.PropertyResolverUtils;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
+import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.subsystem.sftp.SftpConstants;
 import org.apache.sshd.common.subsystem.sftp.extensions.ParserUtils;
@@ -55,7 +58,6 @@ import org.apache.sshd.common.subsystem.sftp.extensions.VersionsParser.Versions;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
-import org.apache.sshd.common.util.buffer.BufferUtils;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 
 /**
@@ -67,7 +69,6 @@ public class DefaultSftpClient extends AbstractSftpClient {
     private final Map<Integer, Buffer> messages = new HashMap<>();
     private final AtomicInteger cmdId = new AtomicInteger(100);
     private final Buffer receiveBuffer = new ByteArrayBuffer();
-    private final byte[] workBuf = new byte[Integer.BYTES];
     private final AtomicInteger versionHolder = new AtomicInteger(0);
     private final AtomicBoolean closing = new AtomicBoolean(false);
     private final NavigableMap<String, byte[]> extensions = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
@@ -78,24 +79,8 @@ public class DefaultSftpClient extends AbstractSftpClient {
         this.nameDecodingCharset = PropertyResolverUtils.getCharset(
                 clientSession, NAME_DECODING_CHARSET, DEFAULT_NAME_DECODING_CHARSET);
         this.clientSession = Objects.requireNonNull(clientSession, "No client session");
-        this.channel = clientSession.createSubsystemChannel(SftpConstants.SFTP_SUBSYSTEM_NAME);
-        this.channel.setOut(new OutputStream() {
-            private final byte[] singleByte = new byte[1];
-
-            @Override
-            public void write(int b) throws IOException {
-                synchronized (singleByte) {
-                    singleByte[0] = (byte) b;
-                    write(singleByte);
-                }
-            }
-
-            @Override
-            public void write(byte[] b, int off, int len) throws IOException {
-                data(b, off, len);
-            }
-        });
-        this.channel.setErr(new ByteArrayOutputStream(Byte.MAX_VALUE));
+        this.channel = new SftpChannelSubsystem();
+        clientSession.getService(ConnectionService.class).registerChannel(channel);
 
         long initializationTimeout = clientSession.getLongProperty(
                 SFTP_CHANNEL_OPEN_TIMEOUT, DEFAULT_CHANNEL_OPEN_TIMEOUT);
@@ -274,12 +259,26 @@ public class DefaultSftpClient extends AbstractSftpClient {
                     getClientChannel(), SftpConstants.getCommandMessageName(cmd), len, id);
         }
 
-        OutputStream dos = channel.getInvertedIn();
-        BufferUtils.writeInt(dos, 1 /* cmd */ + Integer.BYTES /* id */ + len, workBuf);
-        dos.write(cmd & 0xFF);
-        BufferUtils.writeInt(dos, id, workBuf);
-        dos.write(buffer.array(), buffer.rpos(), len);
-        dos.flush();
+        Buffer buf;
+        int hdr = Integer.BYTES /* length */ + 1 /* cmd */ + Integer.BYTES /* id */;
+        if (buffer.rpos() >= hdr) {
+            int wpos = buffer.wpos();
+            int s = buffer.rpos() - hdr;
+            buffer.rpos(s);
+            buffer.wpos(s);
+            buffer.putInt(1 /* cmd */ + Integer.BYTES /* id */ + len); // length
+            buffer.putByte((byte) (cmd & 0xFF)); // cmd
+            buffer.putInt(id); // id
+            buffer.wpos(wpos);
+            buf = buffer;
+        } else {
+            buf = new ByteArrayBuffer(hdr + len);
+            buf.putInt(1 /* cmd */ + Integer.BYTES /* id */ + len);
+            buf.putByte((byte) (cmd & 0xFF));
+            buf.putInt(id);
+            buf.putBuffer(buffer);
+        }
+        channel.getAsyncIn().writePacket(buf).verify();
         return id;
     }
 
@@ -292,66 +291,50 @@ public class DefaultSftpClient extends AbstractSftpClient {
             idleTimeout = FactoryManager.DEFAULT_IDLE_TIMEOUT;
         }
 
-        Integer reqId = id;
         boolean traceEnabled = log.isTraceEnabled();
         for (int count = 1;; count++) {
             if (isClosing() || (!isOpen())) {
                 throw new SshException("Channel is being closed");
             }
 
-            synchronized (messages) {
-                Buffer buffer = messages.remove(reqId);
-                if (buffer != null) {
-                    return buffer;
-                }
-
-                try {
-                    messages.wait(idleTimeout);
-                } catch (InterruptedException e) {
-                    throw (IOException) new InterruptedIOException(
-                            "Interrupted while waiting for messages at iteration #" + count).initCause(e);
-                }
+            Buffer buffer = receive(id, idleTimeout);
+            if (buffer != null) {
+                return buffer;
             }
 
             if (traceEnabled) {
-                log.trace("receive({}) check iteration #{} for id={}", this, count, reqId);
+                log.trace("receive({}) check iteration #{} for id={}", this, count, id);
             }
         }
     }
 
-    protected Buffer read() throws IOException {
-        InputStream dis = channel.getInvertedOut();
-        int length = BufferUtils.readInt(dis, workBuf);
-        // must have at least command + length
-        if (length < (1 + Integer.BYTES)) {
-            throw new IllegalArgumentException("Bad length: " + length);
-        }
-
-        Buffer buffer = new ByteArrayBuffer(length + Integer.BYTES, false);
-        buffer.putInt(length);
-        int nb = length;
-        while (nb > 0) {
-            int readLen = dis.read(buffer.array(), buffer.wpos(), nb);
-            if (readLen < 0) {
-                throw new IllegalArgumentException("Premature EOF while read " + length + " bytes - remaining=" + nb);
+    @Override
+    public Buffer receive(int id, long idleTimeout) throws IOException {
+        synchronized (messages) {
+            Buffer buffer = messages.remove(id);
+            if (buffer != null) {
+                return buffer;
+            }
+            if (idleTimeout > 0) {
+                try {
+                    messages.wait(idleTimeout);
+                } catch (InterruptedException e) {
+                    throw (IOException) new InterruptedIOException("Interrupted while waiting for messages").initCause(e);
+                }
             }
-            buffer.wpos(buffer.wpos() + readLen);
-            nb -= readLen;
         }
-
-        return buffer;
+        return null;
     }
 
     protected void init(long initializationTimeout) throws IOException {
         ValidateUtils.checkTrue(initializationTimeout > 0L, "Invalid initialization timeout: %d", initializationTimeout);
 
         // Send init packet
-        OutputStream dos = channel.getInvertedIn();
-        BufferUtils.writeInt(dos, 5 /* total length */, workBuf);
-        dos.write(SftpConstants.SSH_FXP_INIT);
-        // Ask for the highest we support and see what the server says
-        BufferUtils.writeInt(dos, SftpConstants.SFTP_V6, workBuf);
-        dos.flush();
+        Buffer buf = new ByteArrayBuffer(9);
+        buf.putInt(5);
+        buf.putByte((byte) SftpConstants.SSH_FXP_INIT);
+        buf.putInt(SftpConstants.SFTP_V6);
+        channel.getAsyncIn().writePacket(buf).verify();
 
         Buffer buffer;
         Integer reqId;
@@ -419,7 +402,7 @@ public class DefaultSftpClient extends AbstractSftpClient {
                 String name = buffer.getString();
                 byte[] data = buffer.getBytes();
                 if (traceEnabled) {
-                    log.trace("init({}) added extension=", getClientChannel(), name);
+                    log.trace("init({}) added extension={}", getClientChannel(), name);
                 }
                 extensions.put(name, data);
             }
@@ -501,4 +484,74 @@ public class DefaultSftpClient extends AbstractSftpClient {
         versionHolder.set(selected);
         return selected;
     }
+
+    private class SftpChannelSubsystem extends ChannelSubsystem {
+
+        SftpChannelSubsystem() {
+            super(SftpConstants.SFTP_SUBSYSTEM_NAME);
+        }
+
+        @Override
+        protected void doOpen() throws IOException {
+            String systemName = getSubsystem();
+            Session session = getSession();
+            boolean wantReply = this.getBooleanProperty(
+                    REQUEST_SUBSYSTEM_REPLY, DEFAULT_REQUEST_SUBSYSTEM_REPLY);
+            Buffer buffer = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_REQUEST,
+                    Channel.CHANNEL_SUBSYSTEM.length() + systemName.length() + Integer.SIZE);
+            buffer.putInt(getRecipient());
+            buffer.putString(Channel.CHANNEL_SUBSYSTEM);
+            buffer.putBoolean(wantReply);
+            buffer.putString(systemName);
+            addPendingRequest(Channel.CHANNEL_SUBSYSTEM, wantReply);
+            writePacket(buffer);
+
+            asyncIn = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA) {
+                @SuppressWarnings("synthetic-access")
+                @Override
+                protected CloseFuture doCloseGracefully() {
+                    try {
+                        sendEof();
+                    } catch (IOException e) {
+                        Session session = getSession();
+                        session.exceptionCaught(e);
+                    }
+                    return super.doCloseGracefully();
+                }
+
+                @Override
+                protected Buffer createSendBuffer(Buffer buffer, Channel channel, long length) {
+                    if (buffer.rpos() >= 9 && length == buffer.available()) {
+                        int rpos = buffer.rpos();
+                        int wpos = buffer.wpos();
+                        buffer.rpos(rpos - 9);
+                        buffer.wpos(rpos - 8);
+                        buffer.putInt(channel.getRecipient());
+                        buffer.putInt(length);
+                        buffer.wpos(wpos);
+                        return buffer;
+                    } else {
+                        return super.createSendBuffer(buffer, channel, length);
+                    }
+                }
+            };
+            out = new OutputStream() {
+                private final byte[] singleByte = new byte[1];
+
+                @Override
+                public void write(int b) throws IOException {
+                    synchronized (singleByte) {
+                        singleByte[0] = (byte) b;
+                        write(singleByte);
+                    }
+                }
+
+                @Override
+                public void write(byte[] b, int off, int len) throws IOException {
+                    data(b, off, len);
+                }
+            };
+            err = new ByteArrayOutputStream();
+        }
+    }
 }
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
new file mode 100644
index 0000000..ec3e593
--- /dev/null
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.subsystem.sftp.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.sshd.client.subsystem.sftp.SftpClient;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.CloseableHandle;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode;
+import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.subsystem.sftp.SftpConstants;
+import org.apache.sshd.common.subsystem.sftp.SftpHelper;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.io.InputStreamWithChannel;
+
+public class SftpInputStreamAsync extends InputStreamWithChannel {
+
+    static class Ack {
+        int id;
+        long offset;
+        int length;
+
+        Ack(int id, long offset, int length) {
+            this.id = id;
+            this.offset = offset;
+            this.length = length;
+        }
+    }
+
+    private final AbstractSftpClient client;
+    private final String path;
+    private final byte[] bb = new byte[1];
+    private final int bufferSize;
+    private final long fileSize;
+    private Buffer buffer;
+    private CloseableHandle handle;
+    private long requestOffset;
+    private long clientOffset;
+    private final Deque<Ack> pendingReads = new LinkedList<>();
+    private boolean eofIndicator;
+
+    public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize,
+                                String path, Collection<OpenMode> mode) throws IOException {
+        this.client = Objects.requireNonNull(client, "No SFTP client instance");
+        this.path = path;
+        this.handle = client.open(path, mode);
+        this.bufferSize = bufferSize;
+        this.fileSize = client.stat(handle).getSize();
+    }
+
+    public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize, long clientOffset, long fileSize,
+                                String path, CloseableHandle handle) {
+        this.client = Objects.requireNonNull(client, "No SFTP client instance");
+        this.path = path;
+        this.handle = handle;
+        this.bufferSize = bufferSize;
+        this.clientOffset = clientOffset;
+        this.fileSize = fileSize;
+    }
+
+    /**
+     * The client instance
+     *
+     * @return {@link SftpClient} instance used to access the remote file
+     */
+    public final AbstractSftpClient getClient() {
+        return client;
+    }
+
+    /**
+     * The remotely accessed file path
+     *
+     * @return Remote file path
+     */
+    public final String getPath() {
+        return path;
+    }
+
+    /**
+     * Check if the stream is at EOF
+     *
+     * @return <code>true</code> if all the data has been consumer
+     */
+    public boolean isEof() {
+        return eofIndicator && hasNoData();
+    }
+
+    @Override
+    public boolean isOpen() {
+        return (handle != null) && handle.isOpen();
+    }
+
+    @Override
+    public int read() throws IOException {
+        int read = read(bb, 0, 1);
+        if (read > 0) {
+            return bb[0] & 0xFF;
+        }
+        return read;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (!isOpen()) {
+            throw new IOException("read(" + getPath() + ") stream closed");
+        }
+        int idx = off;
+        while (len > 0 && !eofIndicator) {
+            if (hasNoData()) {
+                fillData();
+                if (eofIndicator && (hasNoData())) {
+                    break;
+                }
+                sendRequests();
+            } else {
+                int nb = Math.min(buffer.available(), len);
+                buffer.getRawBytes(b, off, nb);
+                idx += nb;
+                len -= nb;
+                clientOffset += nb;
+            }
+        }
+        int res = idx - off;
+        if (res == 0 && eofIndicator) {
+            res = -1;
+        }
+        return res;
+    }
+
+    public long transferTo(long max, WritableByteChannel out) throws IOException {
+        if (!isOpen()) {
+            throw new IOException("transferTo(" + getPath() + ") stream closed");
+        }
+        long orgOffset = clientOffset;
+        while (!eofIndicator && max > 0) {
+            if (hasNoData()) {
+                fillData();
+                if (eofIndicator && hasNoData()) {
+                    break;
+                }
+                sendRequests();
+            } else {
+                int nb = buffer.available();
+                int toRead = (int) Math.min(nb, max);
+                ByteBuffer bb = ByteBuffer.wrap(buffer.array(), buffer.rpos(), toRead);
+                while (bb.hasRemaining()) {
+                    out.write(bb);
+                }
+                buffer.rpos(buffer.rpos() + toRead);
+                clientOffset += toRead;
+                max -= toRead;
+            }
+        }
+        return clientOffset - orgOffset;
+    }
+
+    @SuppressWarnings("PMD.MissingOverride")
+    public long transferTo(OutputStream out) throws IOException {
+        if (!isOpen()) {
+            throw new IOException("transferTo(" + getPath() + ") stream closed");
+        }
+        long orgOffset = clientOffset;
+        while (!eofIndicator) {
+            if (hasNoData()) {
+                fillData();
+                if (eofIndicator && hasNoData()) {
+                    break;
+                }
+                sendRequests();
+            } else {
+                int nb = buffer.available();
+                out.write(buffer.array(), buffer.rpos(), nb);
+                buffer.rpos(buffer.rpos() + nb);
+                clientOffset += nb;
+            }
+        }
+        return clientOffset - orgOffset;
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+        if (!isOpen()) {
+            throw new IOException("skip(" + getPath() + ") stream closed");
+        }
+        if (clientOffset == 0 && pendingReads.isEmpty()) {
+            clientOffset = n;
+            return n;
+        }
+        return super.skip(n);
+    }
+
+    boolean hasNoData() {
+        return buffer == null || buffer.available() == 0;
+    }
+
+    void sendRequests() throws IOException {
+        if (!eofIndicator) {
+            long windowSize = client.getChannel().getLocalWindow().getMaxSize();
+            while (pendingReads.size() < (int) (windowSize / bufferSize) && requestOffset < fileSize + bufferSize
+                    || pendingReads.isEmpty()) {
+                Buffer buf = client.getSession().createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA,
+                        23 /* sftp packet */ + 16 + handle.getIdentifier().length);
+                buf.rpos(23);
+                buf.wpos(23);
+                buf.putBytes(handle.getIdentifier());
+                buf.putLong(requestOffset);
+                buf.putInt(bufferSize);
+                int reqId = client.send(SftpConstants.SSH_FXP_READ, buf);
+                pendingReads.add(new Ack(reqId, requestOffset, bufferSize));
+                requestOffset += bufferSize;
+            }
+        }
+    }
+
+    void fillData() throws IOException {
+        Ack ack = pendingReads.pollFirst();
+        if (ack != null) {
+            pollBuffer(ack);
+            if (!eofIndicator && clientOffset < ack.offset) {
+                // we are actually missing some data
+                // so request is synchronously
+                byte[] data = new byte[(int) (ack.offset - clientOffset + buffer.available())];
+                int cur = 0;
+                int nb = (int) (ack.offset - clientOffset);
+                AtomicReference<Boolean> eof = new AtomicReference<>();
+                while (cur < nb) {
+                    int dlen = client.read(handle, clientOffset, data, cur, nb - cur, eof);
+                    eofIndicator = dlen < 0 || eof.get() != null && eof.get();
+                    cur += dlen;
+                }
+                buffer.getRawBytes(data, nb, buffer.available());
+                buffer = new ByteArrayBuffer(data);
+            }
+        }
+    }
+
+    void pollBuffer(Ack ack) throws IOException {
+        Buffer buf = client.receive(ack.id);
+        int length = buf.getInt();
+        int type = buf.getUByte();
+        int id = buf.getInt();
+        client.validateIncomingResponse(SshConstants.SSH_MSG_CHANNEL_DATA, id, type, length, buf);
+        if (type == SftpConstants.SSH_FXP_DATA) {
+            int dlen = buf.getInt();
+            int rpos = buf.rpos();
+            buf.rpos(rpos + dlen);
+            Boolean b = SftpHelper.getEndOfFileIndicatorValue(buf, client.getVersion());
+            eofIndicator = b != null && b;
+            buf.rpos(rpos);
+            buf.wpos(rpos + dlen);
+            this.buffer = buf;
+        } else if (type == SftpConstants.SSH_FXP_STATUS) {
+            int substatus = buf.getInt();
+            String msg = buf.getString();
+            String lang = buf.getString();
+            if (substatus == SftpConstants.SSH_FX_EOF) {
+                eofIndicator = true;
+            } else {
+                client.checkResponseStatus(SshConstants.SSH_MSG_CHANNEL_DATA, id, substatus, msg, lang);
+            }
+        } else {
+            IOException err = client.handleUnexpectedPacket(SshConstants.SSH_MSG_CHANNEL_DATA,
+                    SftpConstants.SSH_FXP_STATUS, id, type, length, buf);
+            if (err != null) {
+                throw err;
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (isOpen()) {
+            try {
+                try {
+                    while (!pendingReads.isEmpty()) {
+                        Ack ack = pendingReads.removeFirst();
+                        pollBuffer(ack);
+                    }
+                } finally {
+                    handle.close();
+                }
+            } finally {
+                handle = null;
+            }
+        }
+    }
+}
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java
new file mode 100644
index 0000000..d8f1974
--- /dev/null
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.subsystem.sftp.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Objects;
+
+import org.apache.sshd.client.subsystem.sftp.SftpClient;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.CloseableHandle;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode;
+import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.subsystem.sftp.SftpConstants;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.io.OutputStreamWithChannel;
+
+/**
+ * Implements an output stream for a given remote file
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public class SftpOutputStreamAsync extends OutputStreamWithChannel {
+
+    static class Ack {
+        int id;
+        long offset;
+        int length;
+
+        Ack(int id, long offset, int length) {
+            this.id = id;
+            this.offset = offset;
+            this.length = length;
+        }
+    }
+
+    private final AbstractSftpClient client;
+    private final String path;
+    private final byte[] bb = new byte[1];
+    private final int bufferSize;
+    private Buffer buffer;
+    private CloseableHandle handle;
+    private long offset;
+    private final Deque<Ack> pendingWrites = new LinkedList<>();
+
+    public SftpOutputStreamAsync(AbstractSftpClient client, int bufferSize,
+                                 String path, Collection<OpenMode> mode) throws IOException {
+        this.client = Objects.requireNonNull(client, "No SFTP client instance");
+        this.path = path;
+        this.handle = client.open(path, mode);
+        this.bufferSize = bufferSize;
+    }
+
+    public SftpOutputStreamAsync(AbstractSftpClient client, int bufferSize,
+                                 String path, CloseableHandle handle) throws IOException {
+        this.client = Objects.requireNonNull(client, "No SFTP client instance");
+        this.path = path;
+        this.handle = handle;
+        this.bufferSize = bufferSize;
+    }
+
+    /**
+     * The client instance
+     *
+     * @return {@link SftpClient} instance used to access the remote file
+     */
+    public final AbstractSftpClient getClient() {
+        return client;
+    }
+
+    public void setOffset(long offset) {
+        this.offset = offset;
+    }
+
+    /**
+     * The remotely accessed file path
+     *
+     * @return Remote file path
+     */
+    public final String getPath() {
+        return path;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return (handle != null) && handle.isOpen();
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        bb[0] = (byte) b;
+        write(bb, 0, 1);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        do {
+            if (buffer == null) {
+                buffer = client.getSession().createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA, bufferSize);
+                int hdr = (9 + 16 + 8 + handle.getIdentifier().length) + buffer.wpos();
+                buffer.rpos(hdr);
+                buffer.wpos(hdr);
+            }
+            int max = bufferSize - (9 + 16 + handle.getIdentifier().length + 72);
+            int nb = Math.min(len, max - (buffer.wpos() - buffer.rpos()));
+            buffer.putRawBytes(b, off, nb);
+            if (buffer.available() == max) {
+                flush();
+            }
+            off += nb;
+            len -= nb;
+        } while (len > 0);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        if (!isOpen()) {
+            throw new IOException("flush(" + getPath() + ") stream is closed");
+        }
+
+        for (;;) {
+            Ack ack = pendingWrites.peek();
+            if (ack != null) {
+                Buffer response = client.receive(ack.id, 0);
+                if (response != null) {
+                    pendingWrites.removeFirst();
+                    client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response);
+                } else {
+                    break;
+                }
+            } else {
+                break;
+            }
+        }
+
+        byte[] id = handle.getIdentifier();
+        int avail = buffer.available();
+        Buffer buf;
+        if (buffer.rpos() >= 16 + id.length) {
+            int wpos = buffer.wpos();
+            buffer.rpos(buffer.rpos() - 16 - id.length);
+            buffer.wpos(buffer.rpos());
+            buffer.putBytes(id);
+            buffer.putLong(offset);
+            buffer.putInt(avail);
+            buffer.wpos(wpos);
+            buf = buffer;
+        } else {
+            buf = new ByteArrayBuffer(id.length + avail + Long.SIZE /* some extra fields */, false);
+            buf.putBytes(id);
+            buf.putLong(offset);
+            buf.putBytes(buffer.array(), buffer.rpos(), avail);
+        }
+
+        int reqId = client.send(SftpConstants.SSH_FXP_WRITE, buf);
+        pendingWrites.add(new Ack(reqId, offset, avail));
+
+        offset += avail;
+        buffer = null;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (isOpen()) {
+            try {
+                try {
+                    if (buffer != null && buffer.available() > 0) {
+                        flush();
+                    }
+                    while (!pendingWrites.isEmpty()) {
+                        Ack ack = pendingWrites.removeFirst();
+                        Buffer response = client.receive(ack.id);
+                        client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response);
+                    }
+                } finally {
+                    handle.close();
+                }
+            } finally {
+                handle = null;
+            }
+        }
+    }
+}
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpInputStreamWithChannel.java b/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpInputStreamWithChannel.java
similarity index 100%
rename from sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpInputStreamWithChannel.java
rename to sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpInputStreamWithChannel.java
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpOutputStreamWithChannel.java b/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpOutputStreamWithChannel.java
similarity index 100%
rename from sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpOutputStreamWithChannel.java
rename to sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpOutputStreamWithChannel.java
diff --git a/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpPerformanceTest.java b/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpPerformanceTest.java
new file mode 100644
index 0000000..d24b7c8
--- /dev/null
+++ b/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpPerformanceTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.subsystem.sftp;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+
+import eu.rekawek.toxiproxy.model.ToxicDirection;
+import eu.rekawek.toxiproxy.model.toxic.Latency;
+import org.apache.sshd.client.SshClient;
+import org.apache.sshd.client.config.hosts.HostConfigEntryResolver;
+import org.apache.sshd.client.keyverifier.AcceptAllServerKeyVerifier;
+import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode;
+import org.apache.sshd.client.subsystem.sftp.fs.SftpFileSystem;
+import org.apache.sshd.common.keyprovider.KeyIdentityProvider;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.ToxiproxyContainer;
+import org.testcontainers.containers.ToxiproxyContainer.ContainerProxy;
+
+@Ignore("Special class used for development only - not really a test just useful to run as such")
+public class SftpPerformanceTest {
+
+    public static final String USERNAME = "foo";
+    public static final String PASSWORD = "pass";
+
+    // Create a common docker network so that containers can communicate
+    @Rule
+    public Network network = Network.newNetwork();
+
+    // the target container - this could be anything
+    @Rule
+    public GenericContainer<?> sftp = new GenericContainer<>("atmoz/sftp")
+            .withEnv("SFTP_USERS", USERNAME + ":" + PASSWORD)
+            .withNetwork(network)
+            .withFileSystemBind("target", "/home/foo")
+            .withExposedPorts(22);
+
+    // Toxiproxy container, which will be used as a TCP proxy
+    @Rule
+    public ToxiproxyContainer toxiproxy = new ToxiproxyContainer()
+            .withNetwork(network);
+
+    public SftpPerformanceTest() {
+        super();
+    }
+
+    @Test
+    public void testUploadLatency() throws IOException {
+        final ContainerProxy proxy = toxiproxy.getProxy(sftp, 22);
+        for (int latency : Arrays.asList(0, 1, 5, 10, 50, 100, 500)) {
+            Latency toxic = proxy.toxics().latency("latency", ToxicDirection.DOWNSTREAM, latency);
+            for (int megabytes : Arrays.asList(1, 5, 10, 50, 100)) {
+                try (SshClient client = createSshClient()) {
+                    long orgTime;
+                    long newTime;
+                    try (ClientSession session = createClientSession(client, proxy)) {
+                        orgTime = uploadPrevious(session, megabytes);
+                    }
+                    try (ClientSession session = createClientSession(client, proxy)) {
+                        newTime = uploadOptimized(session, megabytes);
+                    }
+                    System.out.println(String.format("%3d MB / %3d ms latency: %7d down to %5d ms, gain = %d%%",
+                            megabytes, latency, orgTime, newTime,
+                            (int) (100 * (orgTime - newTime) / orgTime)));
+                }
+            }
+            toxic.remove();
+        }
+    }
+
+    @Test
+    public void testDownloadLatency() throws IOException {
+        final ContainerProxy proxy = toxiproxy.getProxy(sftp, 22);
+        for (int latency : Arrays.asList(0, 1, 5, 10, 50, 100, 500)) {
+            Latency toxic = proxy.toxics().latency("latency", ToxicDirection.DOWNSTREAM, latency);
+            for (int megabytes : Arrays.asList(1, 5, 10, 50, 100)) {
+                try (SshClient client = createSshClient()) {
+                    long orgTime;
+                    long newTime;
+                    try (ClientSession session = createClientSession(client, proxy)) {
+                        newTime = downloadOptimized(session, megabytes);
+                    }
+                    try (ClientSession session = createClientSession(client, proxy)) {
+                        orgTime = downloadPrevious(session, megabytes);
+                    }
+                    System.out.println(String.format("%3d MB / %3d ms latency: %7d down to %5d ms, gain = %d%%",
+                            megabytes, latency, orgTime, newTime,
+                            (int) (100 * (orgTime - newTime) / orgTime)));
+                }
+            }
+            toxic.remove();
+        }
+    }
+
+    public ClientSession createClientSession(SshClient client, ContainerProxy proxy) throws IOException {
+        final String ipAddressViaToxiproxy = proxy.getContainerIpAddress();
+        final int portViaToxiproxy = proxy.getProxyPort();
+
+        ClientSession session = client.connect(USERNAME, ipAddressViaToxiproxy, portViaToxiproxy).verify().getClientSession();
+        session.addPasswordIdentity(PASSWORD);
+        session.auth().verify();
+        return session;
+    }
+
+    @NotNull
+    public SshClient createSshClient() {
+        SshClient client = SshClient.setUpDefaultClient();
+        client.setServerKeyVerifier(AcceptAllServerKeyVerifier.INSTANCE);
+        client.setHostConfigEntryResolver(HostConfigEntryResolver.EMPTY);
+        client.setKeyIdentityProvider(KeyIdentityProvider.EMPTY_KEYS_PROVIDER);
+        client.start();
+        return client;
+    }
+
+    public long uploadPrevious(ClientSession session, int mb) throws IOException {
+        long t0 = System.currentTimeMillis();
+        try (SftpClient client = SftpClientFactory.instance().createSftpClient(session)) {
+            try (OutputStream os = new BufferedOutputStream(
+                    new SftpOutputStreamWithChannel(
+                            client, 32768, "out.txt",
+                            Arrays.asList(OpenMode.Write,
+                                    OpenMode.Create,
+                                    OpenMode.Truncate)),
+                    32768)) {
+                byte[] bytes = "123456789abcdef\n".getBytes();
+                for (int i = 0; i < 1024 * 1024 * mb / bytes.length; i++) {
+                    os.write(bytes);
+                }
+            }
+        }
+        long t1 = System.currentTimeMillis();
+        return t1 - t0;
+    }
+
+    public long uploadOptimized(ClientSession session, int mb) throws IOException {
+        long t0 = System.currentTimeMillis();
+        try (SftpFileSystem fs = SftpClientFactory.instance().createSftpFileSystem(session)) {
+            Path p = fs.getPath("out.txt");
+            try (OutputStream os = new BufferedOutputStream(
+                    Files.newOutputStream(p, StandardOpenOption.CREATE,
+                            StandardOpenOption.TRUNCATE_EXISTING),
+                    32768)) {
+                byte[] bytes = "123456789abcdef\n".getBytes();
+                for (int i = 0; i < 1024 * 1024 * mb / bytes.length; i++) {
+                    os.write(bytes);
+                }
+            }
+        }
+        long t1 = System.currentTimeMillis();
+        return t1 - t0;
+    }
+
+    public long downloadPrevious(ClientSession session, int mb) throws IOException {
+        Path f = Paths.get("target/out.txt");
+        byte[] bytes = "123456789abcdef\n".getBytes();
+        try (BufferedOutputStream bos = new BufferedOutputStream(
+                Files.newOutputStream(f, StandardOpenOption.CREATE,
+                        StandardOpenOption.TRUNCATE_EXISTING,
+                        StandardOpenOption.WRITE))) {
+            for (int i = 0; i < 1024 * 1024 * mb / bytes.length; i++) {
+                bos.write(bytes);
+            }
+        }
+        long t0 = System.currentTimeMillis();
+        try (SftpClient client = SftpClientFactory.instance().createSftpClient(session)) {
+            try (InputStream os = new BufferedInputStream(
+                    new SftpInputStreamWithChannel(
+                            client, 32768, "out.txt",
+                            Arrays.asList(OpenMode.Read)),
+                    32768)) {
+                byte[] data = new byte[8192];
+                for (int i = 0; i < 1024 * 1024 * mb / data.length; i++) {
+                    int l = os.read(data);
+                    if (l < 0) {
+                        break;
+                    }
+                }
+            }
+        }
+        long t1 = System.currentTimeMillis();
+        return t1 - t0;
+    }
+
+    public long downloadOptimized(ClientSession session, int mb) throws IOException {
+        Path f = Paths.get("target/out.txt");
+        byte[] bytes = "123456789abcdef\n".getBytes();
+        try (BufferedOutputStream bos = new BufferedOutputStream(
+                Files.newOutputStream(f, StandardOpenOption.CREATE,
+                        StandardOpenOption.TRUNCATE_EXISTING,
+                        StandardOpenOption.WRITE))) {
+            for (int i = 0; i < 1024 * 1024 * mb / bytes.length; i++) {
+                bos.write(bytes);
+            }
+        }
+        long t0 = System.currentTimeMillis();
+        try (SftpFileSystem fs = SftpClientFactory.instance().createSftpFileSystem(session)) {
+            Path p = fs.getPath("out.txt");
+            try (InputStream os = new BufferedInputStream(
+                    Files.newInputStream(p, StandardOpenOption.READ), 32768)) {
+                byte[] data = new byte[8192];
+                for (int i = 0; i < 1024 * 1024 * mb / data.length; i++) {
+                    int l = os.read(data);
+                    if (l < 0) {
+                        break;
+                    }
+                }
+            }
+        }
+        long t1 = System.currentTimeMillis();
+        return t1 - t0;
+    }
+
+}
diff --git a/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java b/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java
index da0d96d..3078ec2 100644
--- a/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java
+++ b/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java
@@ -586,21 +586,14 @@ public class SftpTest extends AbstractSftpClientTestSupport {
 
             try (SftpClient sftp = createSftpClient(session);
                  InputStream stream = sftp.read(
-                         CommonTestSupportUtils.resolveRelativeRemotePath(parentPath, localFile), OpenMode.Read)) {
-                assertFalse("Stream reported mark supported", stream.markSupported());
-                try {
-                    stream.mark(data.length);
-                    fail("Unexpected success to mark the read limit");
-                } catch (UnsupportedOperationException e) {
-                    // expected - ignored
-                }
+                         CommonTestSupportUtils.resolveRelativeRemotePath(parentPath, localFile),
+                         OpenMode.Read)) {
 
                 byte[] expected = new byte[data.length / 4];
-                int readLen = stream.read(expected);
-                assertEquals("Failed to read fully initial data", expected.length, readLen);
+                int readLen = expected.length;
+                System.arraycopy(data, 0, expected, 0, readLen);
 
                 byte[] actual = new byte[readLen];
-                stream.reset();
                 readLen = stream.read(actual);
                 assertEquals("Failed to read fully reset data", actual.length, readLen);
                 assertArrayEquals("Mismatched re-read data contents", expected, actual);
@@ -616,12 +609,6 @@ public class SftpTest extends AbstractSftpClientTestSupport {
 
                 System.arraycopy(data, expected.length + readLen, expected, 0, expected.length);
                 assertArrayEquals("Mismatched skipped forward data contents", expected, actual);
-
-                skipped = stream.skip(0 - readLen);
-                assertEquals("Mismatched backward skip size", readLen, skipped);
-                readLen = stream.read(actual);
-                assertEquals("Failed to read fully skipped backward data", actual.length, readLen);
-                assertArrayEquals("Mismatched skipped backward data contents", expected, actual);
             }
         }
     }
diff --git a/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTransferTest.java b/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTransferTest.java
new file mode 100644
index 0000000..9bba81f
--- /dev/null
+++ b/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTransferTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.subsystem.sftp;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.client.subsystem.sftp.fs.SftpFileSystem;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class SftpTransferTest extends AbstractSftpClientTestSupport {
+
+    private static final int BUFFER_SIZE = 8192;
+
+    public SftpTransferTest() throws IOException {
+        super();
+    }
+
+    @Test
+    public void testTransferIntegrity() throws IOException {
+        try (ClientSession session = createClientSession();
+             SftpFileSystem fs = SftpClientFactory.instance().createSftpFileSystem(session)) {
+
+            Path localRoot = detectTargetFolder().resolve("sftp");
+            Path remoteRoot = fs.getDefaultDir().resolve("target/sftp");
+
+            Path local0 = localRoot.resolve("files-0.txt");
+            Path remote0 = remoteRoot.resolve("files-1.txt");
+            Path local1 = localRoot.resolve("files-2.txt");
+            Path remote1 = remoteRoot.resolve("files-3.txt");
+            Path local2 = localRoot.resolve("files-4.txt");
+            Files.deleteIfExists(local0);
+            Files.deleteIfExists(remote0);
+            Files.deleteIfExists(local1);
+            Files.deleteIfExists(remote1);
+            Files.deleteIfExists(local2);
+
+            String data = getClass().getName() + "#" + getCurrentTestName() + "(" + new Date() + ")\n";
+            try (BufferedWriter bos = Files.newBufferedWriter(local0)) {
+                long count = 0;
+                while (count < 1024 * 1024 * 10) { // 10 MB
+                    bos.append(data);
+                    count += data.length();
+                }
+            }
+
+            Files.copy(local0, remote0);
+            Files.copy(remote0, local1);
+            Files.copy(local1, remote1);
+            Files.copy(remote1, local2);
+
+            assertTrue("File integrity problem", sameContent(local0, local2));
+        }
+    }
+
+    private ClientSession createClientSession() throws IOException {
+        ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port)
+                .verify(7L, TimeUnit.SECONDS).getSession();
+        try {
+            session.addPasswordIdentity(getCurrentTestName());
+            session.auth().verify(5L, TimeUnit.SECONDS);
+            return session;
+        } catch (IOException e) {
+            session.close();
+            throw e;
+        }
+    }
+
+    private boolean sameContent(Path path, Path path2) throws IOException {
+        byte[] buffer1 = new byte[BUFFER_SIZE];
+        byte[] buffer2 = new byte[BUFFER_SIZE];
+        try (InputStream in1 = Files.newInputStream(path);
+             InputStream in2 = Files.newInputStream(path2)) {
+            while (true) {
+                int nRead1 = readNBytes(in1, buffer1);
+                int nRead2 = readNBytes(in2, buffer2);
+                if (nRead1 != nRead2) {
+                    return false;
+                } else if (nRead1 == BUFFER_SIZE) {
+                    if (!Arrays.equals(buffer1, buffer2)) {
+                        return false;
+                    }
+                } else {
+                    for (int i = 0; i < nRead1; i++) {
+                        if (buffer1[i] != buffer2[i]) {
+                            return false;
+                        }
+                    }
+                    return true;
+                }
+            }
+        }
+    }
+
+    private int readNBytes(InputStream is, byte[] b) throws IOException {
+        int n = 0;
+        int len = b.length;
+        while (n < len) {
+            int count = is.read(b, n, len - n);
+            if (count < 0) {
+                break;
+            }
+            n += count;
+        }
+        return n;
+    }
+
+}