You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2020/04/21 08:46:14 UTC

[mina-sshd] 01/02: [SSHD-979] Improve SFTP streaming

This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git

commit e85b67e0dc6c5f10a7ad77b365d33905c095bff9
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Mon Apr 20 11:41:45 2020 +0200

    [SSHD-979] Improve SFTP streaming
    
    Work sponsorded by Buddy https://buddy.works/
---
 .../common/channel/ChannelAsyncOutputStream.java   | 143 ++++++----
 .../common/session/helpers/AbstractSession.java    |   4 +-
 sshd-sftp/pom.xml                                  |  22 ++
 .../sshd/client/subsystem/sftp/RawSftpClient.java  |   9 +
 .../sshd/client/subsystem/sftp/SftpClient.java     |  56 +---
 .../subsystem/sftp/SftpRemotePathChannel.java      |  49 ++--
 .../helpers/AbstractSftpClientExtension.java       |   5 +
 .../client/subsystem/sftp/fs/SftpFileSystem.java   |  15 +
 .../subsystem/sftp/fs/SftpFileSystemProvider.java  |  42 ++-
 .../subsystem/sftp/impl/AbstractSftpClient.java    |  45 ++-
 .../subsystem/sftp/impl/DefaultSftpClient.java     | 187 +++++++-----
 .../subsystem/sftp/impl/SftpInputStreamAsync.java  | 312 +++++++++++++++++++++
 .../subsystem/sftp/impl/SftpOutputStreamAsync.java | 201 +++++++++++++
 .../subsystem/sftp/SftpInputStreamWithChannel.java |   0
 .../sftp/SftpOutputStreamWithChannel.java          |   0
 .../client/subsystem/sftp/SftpPerformanceTest.java | 243 ++++++++++++++++
 .../sshd/client/subsystem/sftp/SftpTest.java       |  21 +-
 .../client/subsystem/sftp/SftpTransferTest.java    | 134 +++++++++
 18 files changed, 1276 insertions(+), 212 deletions(-)

diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
index 55af809..af80185 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
@@ -25,13 +25,13 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.future.CloseFuture;
-import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.io.IoOutputStream;
 import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.io.PacketWriter;
 import org.apache.sshd.common.io.WritePendingException;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.common.util.closeable.AbstractCloseable;
 
 public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOutputStream, ChannelHolder {
@@ -107,9 +107,39 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
         if (total > 0) {
             Channel channel = getChannel();
             Window remoteWindow = channel.getRemoteWindow();
-            long length = Math.min(Math.min(remoteWindow.getSize(), total), remoteWindow.getPacketSize());
-            if (log.isTraceEnabled()) {
-                log.trace("doWriteIfPossible({})[resume={}] attempting to write {} out of {}", this, resume, length, total);
+            long length;
+            if (remoteWindow.getSize() < total && total <= remoteWindow.getPacketSize()) {
+                // do not chunk when the window is smaller than the packet size
+                length = 0;
+                // do a defensive copy in case the user reuses the buffer
+                IoWriteFutureImpl f = new IoWriteFutureImpl(future.getId(), new ByteArrayBuffer(buffer.getCompactData()));
+                f.addListener(w -> future.setValue(w.getException() != null ? w.getException() : w.isWritten()));
+                pendingWrite.set(f);
+                if (log.isTraceEnabled()) {
+                    log.trace("doWriteIfPossible({})[resume={}] waiting for window space {}",
+                            this, resume, remoteWindow.getSize());
+                }
+            } else if (total > remoteWindow.getPacketSize()) {
+                if (buffer.rpos() > 0) {
+                    // do a defensive copy in case the user reuses the buffer
+                    IoWriteFutureImpl f = new IoWriteFutureImpl(future.getId(), new ByteArrayBuffer(buffer.getCompactData()));
+                    f.addListener(w -> future.setValue(w.getException() != null ? w.getException() : w.isWritten()));
+                    pendingWrite.set(f);
+                    length = remoteWindow.getPacketSize();
+                    if (log.isTraceEnabled()) {
+                        log.trace("doWriteIfPossible({})[resume={}] attempting to write {} out of {}",
+                                this, resume, length, total);
+                    }
+                    doWriteIfPossible(resume);
+                    return;
+                } else {
+                    length = remoteWindow.getPacketSize();
+                }
+            } else {
+                length = total;
+                if (log.isTraceEnabled()) {
+                    log.trace("doWriteIfPossible({})[resume={}] attempting to write {} bytes", this, resume, length);
+                }
             }
 
             if (length > 0) {
@@ -125,66 +155,12 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
                                                        + ") exceeds int boundaries");
                 }
 
-                Session s = channel.getSession();
-                Buffer buf = s.createBuffer(cmd, (int) length + 12);
-                buf.putInt(channel.getRecipient());
-                if (cmd == SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA) {
-                    buf.putInt(SshConstants.SSH_EXTENDED_DATA_STDERR);
-                }
-                buf.putInt(length);
-                buf.putRawBytes(buffer.array(), buffer.rpos(), (int) length);
-                buffer.rpos(buffer.rpos() + (int) length);
+                Buffer buf = createSendBuffer(buffer, channel, length);
                 remoteWindow.consume(length);
 
                 try {
-                    ChannelAsyncOutputStream stream = this;
                     IoWriteFuture writeFuture = packetWriter.writePacket(buf);
-                    writeFuture.addListener(new SshFutureListener<IoWriteFuture>() {
-                        @Override
-                        public void operationComplete(IoWriteFuture f) {
-                            if (f.isWritten()) {
-                                handleOperationCompleted();
-                            } else {
-                                handleOperationFailed(f.getException());
-                            }
-                        }
-
-                        @SuppressWarnings("synthetic-access")
-                        private void handleOperationCompleted() {
-                            if (total > length) {
-                                if (log.isTraceEnabled()) {
-                                    log.trace("doWriteIfPossible({}) completed write of {} out of {}", stream, length, total);
-                                }
-                                doWriteIfPossible(false);
-                            } else {
-                                boolean nullified = pendingWrite.compareAndSet(future, null);
-                                if (log.isTraceEnabled()) {
-                                    log.trace("doWriteIfPossible({}) completed write len={}, more={}",
-                                            stream, total, !nullified);
-                                }
-                                future.setValue(Boolean.TRUE);
-                            }
-                        }
-
-                        @SuppressWarnings("synthetic-access")
-                        private void handleOperationFailed(Throwable reason) {
-                            if (log.isDebugEnabled()) {
-                                log.debug("doWriteIfPossible({}) failed ({}) to complete write of {} out of {}: {}",
-                                        stream, reason.getClass().getSimpleName(), length, total, reason.getMessage());
-                            }
-
-                            if (log.isTraceEnabled()) {
-                                log.trace("doWriteIfPossible(" + this + ") write failure details", reason);
-                            }
-
-                            boolean nullified = pendingWrite.compareAndSet(future, null);
-                            if (log.isTraceEnabled()) {
-                                log.trace("doWriteIfPossible({}) failed write len={}, more={}",
-                                        stream, total, !nullified);
-                            }
-                            future.setValue(reason);
-                        }
-                    });
+                    writeFuture.addListener(f -> onWritten(future, total, length, f));
                 } catch (IOException e) {
                     future.setValue(e);
                 }
@@ -202,6 +178,53 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
         }
     }
 
+    protected void onWritten(IoWriteFutureImpl future, int total, long length, IoWriteFuture f) {
+        if (f.isWritten()) {
+            if (total > length) {
+                if (log.isTraceEnabled()) {
+                    log.trace("onWritten({}) completed write of {} out of {}",
+                            this, length, total);
+                }
+                doWriteIfPossible(false);
+            } else {
+                boolean nullified = pendingWrite.compareAndSet(future, null);
+                if (log.isTraceEnabled()) {
+                    log.trace("onWritten({}) completed write len={}, more={}",
+                            this, total, !nullified);
+                }
+                future.setValue(Boolean.TRUE);
+            }
+        } else {
+            Throwable reason = f.getException();
+            if (log.isDebugEnabled()) {
+                log.debug("onWritten({}) failed ({}) to complete write of {} out of {}: {}",
+                        this, reason.getClass().getSimpleName(), length, total, reason.getMessage());
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("onWritten(" + this + ") write failure details", reason);
+            }
+            boolean nullified = pendingWrite.compareAndSet(future, null);
+            if (log.isTraceEnabled()) {
+                log.trace("onWritten({}) failed write len={}, more={}",
+                        this, total, !nullified);
+            }
+            future.setValue(reason);
+        }
+    }
+
+    protected Buffer createSendBuffer(Buffer buffer, Channel channel, long length) {
+        Session s = channel.getSession();
+        Buffer buf = s.createBuffer(cmd, (int) length + 12);
+        buf.putInt(channel.getRecipient());
+        if (cmd == SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA) {
+            buf.putInt(SshConstants.SSH_EXTENDED_DATA_STDERR);
+        }
+        buf.putInt(length);
+        buf.putRawBytes(buffer.array(), buffer.rpos(), (int) length);
+        buffer.rpos(buffer.rpos() + (int) length);
+        return buf;
+    }
+
     @Override
     public String toString() {
         return getClass().getSimpleName() + "[" + getChannel() + "] cmd=" + SshConstants.getCommandMessageName(cmd & 0xFF);
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
index e6d7c33..e8eda37 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
@@ -954,7 +954,9 @@ public abstract class AbstractSession extends SessionHelper {
         synchronized (encodeLock) {
             Buffer packet = resolveOutputPacket(buffer);
             IoSession networkSession = getIoSession();
-            return networkSession.writePacket(packet);
+            IoWriteFuture future = networkSession.writePacket(packet);
+            buffer.rpos(buffer.wpos());
+            return future;
         }
     }
 
diff --git a/sshd-sftp/pom.xml b/sshd-sftp/pom.xml
index 9116746..4cf78e3 100644
--- a/sshd-sftp/pom.xml
+++ b/sshd-sftp/pom.xml
@@ -85,8 +85,30 @@
             <artifactId>jzlib</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>toxiproxy</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.testcontainers</groupId>
+                <artifactId>testcontainers-bom</artifactId>
+                <type>pom</type>
+                <version>1.14.0</version>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
     <build>
         <resources>
             <resource>
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/RawSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/RawSftpClient.java
index 0cd90af..560ce55 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/RawSftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/RawSftpClient.java
@@ -41,4 +41,13 @@ public interface RawSftpClient {
      * @throws IOException If connection closed or interrupted
      */
     Buffer receive(int id) throws IOException;
+
+    /**
+     * @param  id          The expected request id
+     * @param  timeout     The amount of time to wait for the response
+     * @return             The received response {@link Buffer} containing the request id
+     * @throws IOException If connection closed or interrupted
+     */
+    Buffer receive(int id, long timeout) throws IOException;
+
 }
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpClient.java
index 593e996..e78fa00 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpClient.java
@@ -80,12 +80,14 @@ public interface SftpClient extends SubsystemClient {
         /**
          * The {@link Set} of {@link OpenOption}-s supported by {@link #fromOpenOptions(Collection)}
          */
-        public static final Set<OpenOption> SUPPORTED_OPTIONS = Collections.unmodifiableSet(
-                EnumSet.of(
-                        StandardOpenOption.READ, StandardOpenOption.APPEND,
-                        StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING,
-                        StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW,
-                        StandardOpenOption.SPARSE));
+        public static final Set<OpenOption> SUPPORTED_OPTIONS = Collections.unmodifiableSet(EnumSet.of(
+                StandardOpenOption.READ,
+                StandardOpenOption.APPEND,
+                StandardOpenOption.CREATE,
+                StandardOpenOption.TRUNCATE_EXISTING,
+                StandardOpenOption.WRITE,
+                StandardOpenOption.CREATE_NEW,
+                StandardOpenOption.SPARSE));
 
         /**
          * Converts {@link StandardOpenOption}-s into {@link OpenMode}-s
@@ -464,17 +466,9 @@ public interface SftpClient extends SubsystemClient {
 
         @Override
         public String toString() {
-            return "type=" + getType()
-                   + ";size=" + getSize()
-                   + ";uid=" + getUserId()
-                   + ";gid=" + getGroupId()
-                   + ";perms=0x" + Integer.toHexString(getPermissions())
-                   + ";flags=" + getFlags()
-                   + ";owner=" + getOwner()
-                   + ";group=" + getGroup()
-                   + ";aTime=" + getAccessTime()
-                   + ";cTime=" + getCreateTime()
-                   + ";mTime=" + getModifyTime()
+            return "type=" + getType() + ";size=" + getSize() + ";uid=" + getUserId() + ";gid=" + getGroupId() + ";perms=0x"
+                   + Integer.toHexString(getPermissions()) + ";flags=" + getFlags() + ";owner=" + getOwner() + ";group="
+                   + getGroup() + ";aTime=" + getAccessTime() + ";cTime=" + getCreateTime() + ";mTime=" + getModifyTime()
                    + ";extensions=" + getExtensions().keySet();
         }
     }
@@ -541,7 +535,7 @@ public interface SftpClient extends SubsystemClient {
     DirEntry[] EMPTY_DIR_ENTRIES = new DirEntry[0];
 
     // default values used if none specified
-    int MIN_BUFFER_SIZE = Byte.MAX_VALUE;
+    int MIN_BUFFER_SIZE = 256;
     int MIN_READ_BUFFER_SIZE = MIN_BUFFER_SIZE;
     int MIN_WRITE_BUFFER_SIZE = MIN_BUFFER_SIZE;
     int IO_BUFFER_SIZE = 32 * 1024;
@@ -954,18 +948,7 @@ public interface SftpClient extends SubsystemClient {
      * @return             An {@link InputStream} for reading the remote file data
      * @throws IOException If failed to execute
      */
-    default InputStream read(String path, int bufferSize, Collection<OpenMode> mode) throws IOException {
-        if (bufferSize < MIN_READ_BUFFER_SIZE) {
-            throw new IllegalArgumentException(
-                    "Insufficient read buffer size: " + bufferSize + ", min.=" + MIN_READ_BUFFER_SIZE);
-        }
-
-        if (!isOpen()) {
-            throw new IOException("read(" + path + ")[" + mode + "] size=" + bufferSize + ": client is closed");
-        }
-
-        return new SftpInputStreamWithChannel(this, bufferSize, path, mode);
-    }
+    InputStream read(String path, int bufferSize, Collection<OpenMode> mode) throws IOException;
 
     default OutputStream write(String path) throws IOException {
         return write(path, DEFAULT_WRITE_BUFFER_SIZE);
@@ -996,18 +979,7 @@ public interface SftpClient extends SubsystemClient {
      * @return             An {@link OutputStream} for writing the data
      * @throws IOException If failed to execute
      */
-    default OutputStream write(String path, int bufferSize, Collection<OpenMode> mode) throws IOException {
-        if (bufferSize < MIN_WRITE_BUFFER_SIZE) {
-            throw new IllegalArgumentException(
-                    "Insufficient write buffer size: " + bufferSize + ", min.=" + MIN_WRITE_BUFFER_SIZE);
-        }
-
-        if (!isOpen()) {
-            throw new IOException("write(" + path + ")[" + mode + "] size=" + bufferSize + ": client is closed");
-        }
-
-        return new SftpOutputStreamWithChannel(this, bufferSize, path, mode);
-    }
+    OutputStream write(String path, int bufferSize, Collection<OpenMode> mode) throws IOException;
 
     /**
      * @param  <E>           The generic extension type
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpRemotePathChannel.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpRemotePathChannel.java
index 2789cd2..e9d5f5a 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpRemotePathChannel.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpRemotePathChannel.java
@@ -42,6 +42,9 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.sshd.client.session.ClientSession;
 import org.apache.sshd.client.subsystem.sftp.SftpClient.Attributes;
+import org.apache.sshd.client.subsystem.sftp.impl.AbstractSftpClient;
+import org.apache.sshd.client.subsystem.sftp.impl.SftpInputStreamAsync;
+import org.apache.sshd.client.subsystem.sftp.impl.SftpOutputStreamAsync;
 import org.apache.sshd.common.subsystem.sftp.SftpConstants;
 import org.apache.sshd.common.subsystem.sftp.SftpException;
 import org.apache.sshd.common.util.GenericUtils;
@@ -230,6 +233,18 @@ public class SftpRemotePathChannel extends FileChannel {
         return doWrite(buffers, -1L);
     }
 
+    static class Ack {
+        int id;
+        long offset;
+        int length;
+
+        Ack(int id, long offset, int length) {
+            this.id = id;
+            this.offset = offset;
+            this.length = length;
+        }
+    }
+
     protected long doWrite(Collection<? extends ByteBuffer> buffers, long position) throws IOException {
         ensureOpen(WRITE_MODES);
 
@@ -346,30 +361,19 @@ public class SftpRemotePathChannel extends FileChannel {
         }
 
         boolean completed = false;
-        boolean eof = false;
-        long curPos = position;
-        int bufSize = (int) Math.min(count, copySize);
-        byte[] buffer = new byte[bufSize];
-        long totalRead = 0L;
+        boolean eof;
+        long totalRead;
 
         synchronized (lock) {
             try {
                 beginBlocking("transferTo");
 
-                while (totalRead < count && !eof) {
-                    int read = sftp.read(handle, curPos, buffer, 0,
-                            (int) Math.min(count - totalRead, buffer.length));
-                    if (read > 0) {
-                        ByteBuffer wrap = ByteBuffer.wrap(buffer, 0, read);
-                        while (wrap.remaining() > 0) {
-                            target.write(wrap);
-                        }
-                        curPos += read;
-                        totalRead += read;
-                    } else {
-                        eof = read == -1;
-                    }
-                }
+                SftpInputStreamAsync input = new SftpInputStreamAsync(
+                        (AbstractSftpClient) sftp,
+                        copySize, position, count, getRemotePath(), handle);
+                totalRead = input.transferTo(count, target);
+                // DO NOT CLOSE THE STREAM AS IT WOULD CLOSE THE HANDLE
+                eof = input.isEof();
                 completed = true;
             } finally {
                 endBlocking("transferTo", completed);
@@ -410,18 +414,23 @@ public class SftpRemotePathChannel extends FileChannel {
             try {
                 beginBlocking("transferFrom");
 
+                SftpOutputStreamAsync output = new SftpOutputStreamAsync(
+                        (AbstractSftpClient) sftp,
+                        copySize, getRemotePath(), handle);
                 while (totalRead < count) {
                     ByteBuffer wrap = ByteBuffer.wrap(
                             buffer, 0, (int) Math.min(buffer.length, count - totalRead));
                     int read = src.read(wrap);
                     if (read > 0) {
-                        sftp.write(handle, curPos, buffer, 0, read);
+                        output.write(buffer, 0, read);
                         curPos += read;
                         totalRead += read;
                     } else {
                         break;
                     }
                 }
+                output.flush();
+                // DO NOT CLOSE THE OUTPUT STREAM AS IT WOULD CLOSE THE HANDLE
                 completed = true;
             } finally {
                 endBlocking("transferFrom", completed);
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/extensions/helpers/AbstractSftpClientExtension.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/extensions/helpers/AbstractSftpClientExtension.java
index 4ad73c3..8ee2293 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/extensions/helpers/AbstractSftpClientExtension.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/extensions/helpers/AbstractSftpClientExtension.java
@@ -95,6 +95,11 @@ public abstract class AbstractSftpClientExtension extends AbstractLoggingBean im
     }
 
     @Override
+    public Buffer receive(int id, long timeout) throws IOException {
+        return raw.receive(id, timeout);
+    }
+
+    @Override
     public final boolean isSupported() {
         return supported;
     }
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/fs/SftpFileSystem.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/fs/SftpFileSystem.java
index c60ae8f..ad8d6dd 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/fs/SftpFileSystem.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/fs/SftpFileSystem.java
@@ -557,6 +557,21 @@ public class SftpFileSystem
                         "receive(id=" + id + ") delegate is not a " + RawSftpClient.class.getSimpleName());
             }
         }
+
+        @Override
+        public Buffer receive(int id, long timeout) throws IOException {
+            if (!isOpen()) {
+                throw new IOException("receive(id=" + id + ", timeout=" + timeout + ") client is closed");
+            }
+
+            if (delegate instanceof RawSftpClient) {
+                return ((RawSftpClient) delegate).receive(id, timeout);
+            } else {
+                throw new StreamCorruptedException(
+                        "receive(id=" + id + ", timeout=" + timeout + ") delegate is not a "
+                                                   + RawSftpClient.class.getSimpleName());
+            }
+        }
     }
 
     public static class DefaultUserPrincipalLookupService extends UserPrincipalLookupService {
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/fs/SftpFileSystemProvider.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/fs/SftpFileSystemProvider.java
index 774e17d..666658e 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/fs/SftpFileSystemProvider.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/fs/SftpFileSystemProvider.java
@@ -74,8 +74,11 @@ import org.apache.sshd.client.SshClient;
 import org.apache.sshd.client.session.ClientSession;
 import org.apache.sshd.client.subsystem.sftp.SftpClient;
 import org.apache.sshd.client.subsystem.sftp.SftpClient.Attributes;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode;
 import org.apache.sshd.client.subsystem.sftp.SftpClientFactory;
+import org.apache.sshd.client.subsystem.sftp.SftpRemotePathChannel;
 import org.apache.sshd.client.subsystem.sftp.SftpVersionSelector;
+import org.apache.sshd.client.subsystem.sftp.extensions.CopyFileExtension;
 import org.apache.sshd.common.PropertyResolver;
 import org.apache.sshd.common.PropertyResolverUtils;
 import org.apache.sshd.common.SshConstants;
@@ -482,7 +485,33 @@ public class SftpFileSystemProvider extends FileSystemProvider {
             modes = EnumSet.of(SftpClient.OpenMode.Read, SftpClient.OpenMode.Write);
         }
         // TODO: process file attributes
-        return new SftpFileSystemChannel(toSftpPath(path), modes);
+        SftpPath p = toSftpPath(path);
+        return new SftpRemotePathChannel(p.toString(), p.getFileSystem().getClient(), true, modes);
+    }
+
+    @Override
+    public InputStream newInputStream(Path path, OpenOption... options) throws IOException {
+        Collection<SftpClient.OpenMode> modes = SftpClient.OpenMode.fromOpenOptions(Arrays.asList(options));
+        if (modes.isEmpty()) {
+            modes = EnumSet.of(SftpClient.OpenMode.Read);
+        }
+        SftpPath p = toSftpPath(path);
+        return p.getFileSystem().getClient().read(p.toString(), modes);
+    }
+
+    @Override
+    public OutputStream newOutputStream(Path path, OpenOption... options) throws IOException {
+        Set<SftpClient.OpenMode> modes = SftpClient.OpenMode.fromOpenOptions(Arrays.asList(options));
+        if (modes.contains(OpenMode.Read)) {
+            throw new IllegalArgumentException("READ not allowed");
+        }
+        if (modes.isEmpty()) {
+            modes = EnumSet.of(OpenMode.Create, OpenMode.Truncate, OpenMode.Write);
+        } else {
+            modes.add(OpenMode.Write);
+        }
+        SftpPath p = toSftpPath(path);
+        return p.getFileSystem().getClient().write(p.toString(), modes);
     }
 
     @Override
@@ -591,9 +620,14 @@ public class SftpFileSystemProvider extends FileSystemProvider {
         if (attrs.isDirectory()) {
             createDirectory(target);
         } else {
-            try (InputStream in = newInputStream(source);
-                 OutputStream os = newOutputStream(target)) {
-                IoUtils.copy(in, os);
+            CopyFileExtension copyFile = src.getFileSystem().getClient().getExtension(CopyFileExtension.class);
+            if (copyFile.isSupported()) {
+                copyFile.copyFile(source.toString(), target.toString(), false);
+            } else {
+                try (InputStream in = newInputStream(source);
+                     OutputStream os = newOutputStream(target)) {
+                    IoUtils.copy(in, os);
+                }
             }
         }
 
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java
index 577ecdb..b96a24a 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java
@@ -19,6 +19,8 @@
 package org.apache.sshd.client.subsystem.sftp.impl;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.charset.Charset;
 import java.nio.file.attribute.FileTime;
 import java.util.ArrayList;
@@ -786,7 +788,6 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme
         if (eofSignalled != null) {
             eofSignalled.set(null);
         }
-
         if (!isOpen()) {
             throw new IOException("read(" + handle + "/" + fileOffset + ")[" + dstOffset + "/" + len + "] client is closed");
         }
@@ -1278,4 +1279,46 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme
         buffer.putLong(length);
         checkCommandStatus(SftpConstants.SSH_FXP_UNBLOCK, buffer);
     }
+
+    @Override
+    public InputStream read(String path, int bufferSize, Collection<OpenMode> mode) throws IOException {
+        if (bufferSize < MIN_WRITE_BUFFER_SIZE) {
+            throw new IllegalArgumentException(
+                    "Insufficient read buffer size: " + bufferSize + ", min.="
+                                               + MIN_READ_BUFFER_SIZE);
+        }
+
+        if (!isOpen()) {
+            throw new IOException("write(" + path + ")[" + mode + "] size=" + bufferSize + ": client is closed");
+        }
+
+        return new SftpInputStreamAsync(this, bufferSize, path, mode);
+    }
+
+    @Override
+    public InputStream read(String path, Collection<OpenMode> mode) throws IOException {
+        int packetSize = (int) getChannel().getRemoteWindow().getPacketSize();
+        return read(path, packetSize, mode);
+    }
+
+    @Override
+    public OutputStream write(String path, int bufferSize, Collection<OpenMode> mode) throws IOException {
+        if (bufferSize < MIN_WRITE_BUFFER_SIZE) {
+            throw new IllegalArgumentException(
+                    "Insufficient write buffer size: " + bufferSize + ", min.="
+                                               + MIN_WRITE_BUFFER_SIZE);
+        }
+
+        if (!isOpen()) {
+            throw new IOException("write(" + path + ")[" + mode + "] size=" + bufferSize + ": client is closed");
+        }
+
+        return new SftpOutputStreamAsync(this, bufferSize, path, mode);
+    }
+
+    @Override
+    public OutputStream write(String path, Collection<OpenMode> mode) throws IOException {
+        int packetSize = (int) getChannel().getRemoteWindow().getPacketSize();
+        return write(path, packetSize, mode);
+    }
 }
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java
index 28e6b4f..5ef0e18 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java
@@ -21,7 +21,6 @@ package org.apache.sshd.client.subsystem.sftp.impl;
 import java.io.ByteArrayOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.InterruptedIOException;
 import java.io.OutputStream;
 import java.io.StreamCorruptedException;
@@ -48,6 +47,10 @@ import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.PropertyResolverUtils;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
+import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.subsystem.sftp.SftpConstants;
 import org.apache.sshd.common.subsystem.sftp.extensions.ParserUtils;
@@ -55,7 +58,6 @@ import org.apache.sshd.common.subsystem.sftp.extensions.VersionsParser.Versions;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
-import org.apache.sshd.common.util.buffer.BufferUtils;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 
 /**
@@ -67,7 +69,6 @@ public class DefaultSftpClient extends AbstractSftpClient {
     private final Map<Integer, Buffer> messages = new HashMap<>();
     private final AtomicInteger cmdId = new AtomicInteger(100);
     private final Buffer receiveBuffer = new ByteArrayBuffer();
-    private final byte[] workBuf = new byte[Integer.BYTES];
     private final AtomicInteger versionHolder = new AtomicInteger(0);
     private final AtomicBoolean closing = new AtomicBoolean(false);
     private final NavigableMap<String, byte[]> extensions = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
@@ -78,24 +79,8 @@ public class DefaultSftpClient extends AbstractSftpClient {
         this.nameDecodingCharset = PropertyResolverUtils.getCharset(
                 clientSession, NAME_DECODING_CHARSET, DEFAULT_NAME_DECODING_CHARSET);
         this.clientSession = Objects.requireNonNull(clientSession, "No client session");
-        this.channel = clientSession.createSubsystemChannel(SftpConstants.SFTP_SUBSYSTEM_NAME);
-        this.channel.setOut(new OutputStream() {
-            private final byte[] singleByte = new byte[1];
-
-            @Override
-            public void write(int b) throws IOException {
-                synchronized (singleByte) {
-                    singleByte[0] = (byte) b;
-                    write(singleByte);
-                }
-            }
-
-            @Override
-            public void write(byte[] b, int off, int len) throws IOException {
-                data(b, off, len);
-            }
-        });
-        this.channel.setErr(new ByteArrayOutputStream(Byte.MAX_VALUE));
+        this.channel = new SftpChannelSubsystem();
+        clientSession.getService(ConnectionService.class).registerChannel(channel);
 
         long initializationTimeout = clientSession.getLongProperty(
                 SFTP_CHANNEL_OPEN_TIMEOUT, DEFAULT_CHANNEL_OPEN_TIMEOUT);
@@ -274,12 +259,26 @@ public class DefaultSftpClient extends AbstractSftpClient {
                     getClientChannel(), SftpConstants.getCommandMessageName(cmd), len, id);
         }
 
-        OutputStream dos = channel.getInvertedIn();
-        BufferUtils.writeInt(dos, 1 /* cmd */ + Integer.BYTES /* id */ + len, workBuf);
-        dos.write(cmd & 0xFF);
-        BufferUtils.writeInt(dos, id, workBuf);
-        dos.write(buffer.array(), buffer.rpos(), len);
-        dos.flush();
+        Buffer buf;
+        int hdr = Integer.BYTES /* length */ + 1 /* cmd */ + Integer.BYTES /* id */;
+        if (buffer.rpos() >= hdr) {
+            int wpos = buffer.wpos();
+            int s = buffer.rpos() - hdr;
+            buffer.rpos(s);
+            buffer.wpos(s);
+            buffer.putInt(1 /* cmd */ + Integer.BYTES /* id */ + len); // length
+            buffer.putByte((byte) (cmd & 0xFF)); // cmd
+            buffer.putInt(id); // id
+            buffer.wpos(wpos);
+            buf = buffer;
+        } else {
+            buf = new ByteArrayBuffer(hdr + len);
+            buf.putInt(1 /* cmd */ + Integer.BYTES /* id */ + len);
+            buf.putByte((byte) (cmd & 0xFF));
+            buf.putInt(id);
+            buf.putBuffer(buffer);
+        }
+        channel.getAsyncIn().writePacket(buf).verify();
         return id;
     }
 
@@ -292,66 +291,50 @@ public class DefaultSftpClient extends AbstractSftpClient {
             idleTimeout = FactoryManager.DEFAULT_IDLE_TIMEOUT;
         }
 
-        Integer reqId = id;
         boolean traceEnabled = log.isTraceEnabled();
         for (int count = 1;; count++) {
             if (isClosing() || (!isOpen())) {
                 throw new SshException("Channel is being closed");
             }
 
-            synchronized (messages) {
-                Buffer buffer = messages.remove(reqId);
-                if (buffer != null) {
-                    return buffer;
-                }
-
-                try {
-                    messages.wait(idleTimeout);
-                } catch (InterruptedException e) {
-                    throw (IOException) new InterruptedIOException(
-                            "Interrupted while waiting for messages at iteration #" + count).initCause(e);
-                }
+            Buffer buffer = receive(id, idleTimeout);
+            if (buffer != null) {
+                return buffer;
             }
 
             if (traceEnabled) {
-                log.trace("receive({}) check iteration #{} for id={}", this, count, reqId);
+                log.trace("receive({}) check iteration #{} for id={}", this, count, id);
             }
         }
     }
 
-    protected Buffer read() throws IOException {
-        InputStream dis = channel.getInvertedOut();
-        int length = BufferUtils.readInt(dis, workBuf);
-        // must have at least command + length
-        if (length < (1 + Integer.BYTES)) {
-            throw new IllegalArgumentException("Bad length: " + length);
-        }
-
-        Buffer buffer = new ByteArrayBuffer(length + Integer.BYTES, false);
-        buffer.putInt(length);
-        int nb = length;
-        while (nb > 0) {
-            int readLen = dis.read(buffer.array(), buffer.wpos(), nb);
-            if (readLen < 0) {
-                throw new IllegalArgumentException("Premature EOF while read " + length + " bytes - remaining=" + nb);
+    @Override
+    public Buffer receive(int id, long idleTimeout) throws IOException {
+        synchronized (messages) {
+            Buffer buffer = messages.remove(id);
+            if (buffer != null) {
+                return buffer;
+            }
+            if (idleTimeout > 0) {
+                try {
+                    messages.wait(idleTimeout);
+                } catch (InterruptedException e) {
+                    throw (IOException) new InterruptedIOException("Interrupted while waiting for messages").initCause(e);
+                }
             }
-            buffer.wpos(buffer.wpos() + readLen);
-            nb -= readLen;
         }
-
-        return buffer;
+        return null;
     }
 
     protected void init(long initializationTimeout) throws IOException {
         ValidateUtils.checkTrue(initializationTimeout > 0L, "Invalid initialization timeout: %d", initializationTimeout);
 
         // Send init packet
-        OutputStream dos = channel.getInvertedIn();
-        BufferUtils.writeInt(dos, 5 /* total length */, workBuf);
-        dos.write(SftpConstants.SSH_FXP_INIT);
-        // Ask for the highest we support and see what the server says
-        BufferUtils.writeInt(dos, SftpConstants.SFTP_V6, workBuf);
-        dos.flush();
+        Buffer buf = new ByteArrayBuffer(9);
+        buf.putInt(5);
+        buf.putByte((byte) SftpConstants.SSH_FXP_INIT);
+        buf.putInt(SftpConstants.SFTP_V6);
+        channel.getAsyncIn().writePacket(buf).verify();
 
         Buffer buffer;
         Integer reqId;
@@ -419,7 +402,7 @@ public class DefaultSftpClient extends AbstractSftpClient {
                 String name = buffer.getString();
                 byte[] data = buffer.getBytes();
                 if (traceEnabled) {
-                    log.trace("init({}) added extension=", getClientChannel(), name);
+                    log.trace("init({}) added extension={}", getClientChannel(), name);
                 }
                 extensions.put(name, data);
             }
@@ -501,4 +484,74 @@ public class DefaultSftpClient extends AbstractSftpClient {
         versionHolder.set(selected);
         return selected;
     }
+
+    private class SftpChannelSubsystem extends ChannelSubsystem {
+
+        SftpChannelSubsystem() {
+            super(SftpConstants.SFTP_SUBSYSTEM_NAME);
+        }
+
+        @Override
+        protected void doOpen() throws IOException {
+            String systemName = getSubsystem();
+            Session session = getSession();
+            boolean wantReply = this.getBooleanProperty(
+                    REQUEST_SUBSYSTEM_REPLY, DEFAULT_REQUEST_SUBSYSTEM_REPLY);
+            Buffer buffer = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_REQUEST,
+                    Channel.CHANNEL_SUBSYSTEM.length() + systemName.length() + Integer.SIZE);
+            buffer.putInt(getRecipient());
+            buffer.putString(Channel.CHANNEL_SUBSYSTEM);
+            buffer.putBoolean(wantReply);
+            buffer.putString(systemName);
+            addPendingRequest(Channel.CHANNEL_SUBSYSTEM, wantReply);
+            writePacket(buffer);
+
+            asyncIn = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA) {
+                @SuppressWarnings("synthetic-access")
+                @Override
+                protected CloseFuture doCloseGracefully() {
+                    try {
+                        sendEof();
+                    } catch (IOException e) {
+                        Session session = getSession();
+                        session.exceptionCaught(e);
+                    }
+                    return super.doCloseGracefully();
+                }
+
+                @Override
+                protected Buffer createSendBuffer(Buffer buffer, Channel channel, long length) {
+                    if (buffer.rpos() >= 9 && length == buffer.available()) {
+                        int rpos = buffer.rpos();
+                        int wpos = buffer.wpos();
+                        buffer.rpos(rpos - 9);
+                        buffer.wpos(rpos - 8);
+                        buffer.putInt(channel.getRecipient());
+                        buffer.putInt(length);
+                        buffer.wpos(wpos);
+                        return buffer;
+                    } else {
+                        return super.createSendBuffer(buffer, channel, length);
+                    }
+                }
+            };
+            out = new OutputStream() {
+                private final byte[] singleByte = new byte[1];
+
+                @Override
+                public void write(int b) throws IOException {
+                    synchronized (singleByte) {
+                        singleByte[0] = (byte) b;
+                        write(singleByte);
+                    }
+                }
+
+                @Override
+                public void write(byte[] b, int off, int len) throws IOException {
+                    data(b, off, len);
+                }
+            };
+            err = new ByteArrayOutputStream();
+        }
+    }
 }
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
new file mode 100644
index 0000000..ec3e593
--- /dev/null
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.subsystem.sftp.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.sshd.client.subsystem.sftp.SftpClient;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.CloseableHandle;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode;
+import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.subsystem.sftp.SftpConstants;
+import org.apache.sshd.common.subsystem.sftp.SftpHelper;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.io.InputStreamWithChannel;
+
+public class SftpInputStreamAsync extends InputStreamWithChannel {
+
+    static class Ack {
+        int id;
+        long offset;
+        int length;
+
+        Ack(int id, long offset, int length) {
+            this.id = id;
+            this.offset = offset;
+            this.length = length;
+        }
+    }
+
+    private final AbstractSftpClient client;
+    private final String path;
+    private final byte[] bb = new byte[1];
+    private final int bufferSize;
+    private final long fileSize;
+    private Buffer buffer;
+    private CloseableHandle handle;
+    private long requestOffset;
+    private long clientOffset;
+    private final Deque<Ack> pendingReads = new LinkedList<>();
+    private boolean eofIndicator;
+
+    public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize,
+                                String path, Collection<OpenMode> mode) throws IOException {
+        this.client = Objects.requireNonNull(client, "No SFTP client instance");
+        this.path = path;
+        this.handle = client.open(path, mode);
+        this.bufferSize = bufferSize;
+        this.fileSize = client.stat(handle).getSize();
+    }
+
+    public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize, long clientOffset, long fileSize,
+                                String path, CloseableHandle handle) {
+        this.client = Objects.requireNonNull(client, "No SFTP client instance");
+        this.path = path;
+        this.handle = handle;
+        this.bufferSize = bufferSize;
+        this.clientOffset = clientOffset;
+        this.fileSize = fileSize;
+    }
+
+    /**
+     * The client instance
+     *
+     * @return {@link SftpClient} instance used to access the remote file
+     */
+    public final AbstractSftpClient getClient() {
+        return client;
+    }
+
+    /**
+     * The remotely accessed file path
+     *
+     * @return Remote file path
+     */
+    public final String getPath() {
+        return path;
+    }
+
+    /**
+     * Check if the stream is at EOF
+     *
+     * @return <code>true</code> if all the data has been consumer
+     */
+    public boolean isEof() {
+        return eofIndicator && hasNoData();
+    }
+
+    @Override
+    public boolean isOpen() {
+        return (handle != null) && handle.isOpen();
+    }
+
+    @Override
+    public int read() throws IOException {
+        int read = read(bb, 0, 1);
+        if (read > 0) {
+            return bb[0] & 0xFF;
+        }
+        return read;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (!isOpen()) {
+            throw new IOException("read(" + getPath() + ") stream closed");
+        }
+        int idx = off;
+        while (len > 0 && !eofIndicator) {
+            if (hasNoData()) {
+                fillData();
+                if (eofIndicator && (hasNoData())) {
+                    break;
+                }
+                sendRequests();
+            } else {
+                int nb = Math.min(buffer.available(), len);
+                buffer.getRawBytes(b, off, nb);
+                idx += nb;
+                len -= nb;
+                clientOffset += nb;
+            }
+        }
+        int res = idx - off;
+        if (res == 0 && eofIndicator) {
+            res = -1;
+        }
+        return res;
+    }
+
+    public long transferTo(long max, WritableByteChannel out) throws IOException {
+        if (!isOpen()) {
+            throw new IOException("transferTo(" + getPath() + ") stream closed");
+        }
+        long orgOffset = clientOffset;
+        while (!eofIndicator && max > 0) {
+            if (hasNoData()) {
+                fillData();
+                if (eofIndicator && hasNoData()) {
+                    break;
+                }
+                sendRequests();
+            } else {
+                int nb = buffer.available();
+                int toRead = (int) Math.min(nb, max);
+                ByteBuffer bb = ByteBuffer.wrap(buffer.array(), buffer.rpos(), toRead);
+                while (bb.hasRemaining()) {
+                    out.write(bb);
+                }
+                buffer.rpos(buffer.rpos() + toRead);
+                clientOffset += toRead;
+                max -= toRead;
+            }
+        }
+        return clientOffset - orgOffset;
+    }
+
+    @SuppressWarnings("PMD.MissingOverride")
+    public long transferTo(OutputStream out) throws IOException {
+        if (!isOpen()) {
+            throw new IOException("transferTo(" + getPath() + ") stream closed");
+        }
+        long orgOffset = clientOffset;
+        while (!eofIndicator) {
+            if (hasNoData()) {
+                fillData();
+                if (eofIndicator && hasNoData()) {
+                    break;
+                }
+                sendRequests();
+            } else {
+                int nb = buffer.available();
+                out.write(buffer.array(), buffer.rpos(), nb);
+                buffer.rpos(buffer.rpos() + nb);
+                clientOffset += nb;
+            }
+        }
+        return clientOffset - orgOffset;
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+        if (!isOpen()) {
+            throw new IOException("skip(" + getPath() + ") stream closed");
+        }
+        if (clientOffset == 0 && pendingReads.isEmpty()) {
+            clientOffset = n;
+            return n;
+        }
+        return super.skip(n);
+    }
+
+    boolean hasNoData() {
+        return buffer == null || buffer.available() == 0;
+    }
+
+    void sendRequests() throws IOException {
+        if (!eofIndicator) {
+            long windowSize = client.getChannel().getLocalWindow().getMaxSize();
+            while (pendingReads.size() < (int) (windowSize / bufferSize) && requestOffset < fileSize + bufferSize
+                    || pendingReads.isEmpty()) {
+                Buffer buf = client.getSession().createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA,
+                        23 /* sftp packet */ + 16 + handle.getIdentifier().length);
+                buf.rpos(23);
+                buf.wpos(23);
+                buf.putBytes(handle.getIdentifier());
+                buf.putLong(requestOffset);
+                buf.putInt(bufferSize);
+                int reqId = client.send(SftpConstants.SSH_FXP_READ, buf);
+                pendingReads.add(new Ack(reqId, requestOffset, bufferSize));
+                requestOffset += bufferSize;
+            }
+        }
+    }
+
+    void fillData() throws IOException {
+        Ack ack = pendingReads.pollFirst();
+        if (ack != null) {
+            pollBuffer(ack);
+            if (!eofIndicator && clientOffset < ack.offset) {
+                // we are actually missing some data
+                // so request is synchronously
+                byte[] data = new byte[(int) (ack.offset - clientOffset + buffer.available())];
+                int cur = 0;
+                int nb = (int) (ack.offset - clientOffset);
+                AtomicReference<Boolean> eof = new AtomicReference<>();
+                while (cur < nb) {
+                    int dlen = client.read(handle, clientOffset, data, cur, nb - cur, eof);
+                    eofIndicator = dlen < 0 || eof.get() != null && eof.get();
+                    cur += dlen;
+                }
+                buffer.getRawBytes(data, nb, buffer.available());
+                buffer = new ByteArrayBuffer(data);
+            }
+        }
+    }
+
+    void pollBuffer(Ack ack) throws IOException {
+        Buffer buf = client.receive(ack.id);
+        int length = buf.getInt();
+        int type = buf.getUByte();
+        int id = buf.getInt();
+        client.validateIncomingResponse(SshConstants.SSH_MSG_CHANNEL_DATA, id, type, length, buf);
+        if (type == SftpConstants.SSH_FXP_DATA) {
+            int dlen = buf.getInt();
+            int rpos = buf.rpos();
+            buf.rpos(rpos + dlen);
+            Boolean b = SftpHelper.getEndOfFileIndicatorValue(buf, client.getVersion());
+            eofIndicator = b != null && b;
+            buf.rpos(rpos);
+            buf.wpos(rpos + dlen);
+            this.buffer = buf;
+        } else if (type == SftpConstants.SSH_FXP_STATUS) {
+            int substatus = buf.getInt();
+            String msg = buf.getString();
+            String lang = buf.getString();
+            if (substatus == SftpConstants.SSH_FX_EOF) {
+                eofIndicator = true;
+            } else {
+                client.checkResponseStatus(SshConstants.SSH_MSG_CHANNEL_DATA, id, substatus, msg, lang);
+            }
+        } else {
+            IOException err = client.handleUnexpectedPacket(SshConstants.SSH_MSG_CHANNEL_DATA,
+                    SftpConstants.SSH_FXP_STATUS, id, type, length, buf);
+            if (err != null) {
+                throw err;
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (isOpen()) {
+            try {
+                try {
+                    while (!pendingReads.isEmpty()) {
+                        Ack ack = pendingReads.removeFirst();
+                        pollBuffer(ack);
+                    }
+                } finally {
+                    handle.close();
+                }
+            } finally {
+                handle = null;
+            }
+        }
+    }
+}
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java
new file mode 100644
index 0000000..d8f1974
--- /dev/null
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.subsystem.sftp.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Objects;
+
+import org.apache.sshd.client.subsystem.sftp.SftpClient;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.CloseableHandle;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode;
+import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.subsystem.sftp.SftpConstants;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.io.OutputStreamWithChannel;
+
+/**
+ * Implements an output stream for a given remote file
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public class SftpOutputStreamAsync extends OutputStreamWithChannel {
+
+    static class Ack {
+        int id;
+        long offset;
+        int length;
+
+        Ack(int id, long offset, int length) {
+            this.id = id;
+            this.offset = offset;
+            this.length = length;
+        }
+    }
+
+    private final AbstractSftpClient client;
+    private final String path;
+    private final byte[] bb = new byte[1];
+    private final int bufferSize;
+    private Buffer buffer;
+    private CloseableHandle handle;
+    private long offset;
+    private final Deque<Ack> pendingWrites = new LinkedList<>();
+
+    public SftpOutputStreamAsync(AbstractSftpClient client, int bufferSize,
+                                 String path, Collection<OpenMode> mode) throws IOException {
+        this.client = Objects.requireNonNull(client, "No SFTP client instance");
+        this.path = path;
+        this.handle = client.open(path, mode);
+        this.bufferSize = bufferSize;
+    }
+
+    public SftpOutputStreamAsync(AbstractSftpClient client, int bufferSize,
+                                 String path, CloseableHandle handle) throws IOException {
+        this.client = Objects.requireNonNull(client, "No SFTP client instance");
+        this.path = path;
+        this.handle = handle;
+        this.bufferSize = bufferSize;
+    }
+
+    /**
+     * The client instance
+     *
+     * @return {@link SftpClient} instance used to access the remote file
+     */
+    public final AbstractSftpClient getClient() {
+        return client;
+    }
+
+    public void setOffset(long offset) {
+        this.offset = offset;
+    }
+
+    /**
+     * The remotely accessed file path
+     *
+     * @return Remote file path
+     */
+    public final String getPath() {
+        return path;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return (handle != null) && handle.isOpen();
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        bb[0] = (byte) b;
+        write(bb, 0, 1);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        do {
+            if (buffer == null) {
+                buffer = client.getSession().createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA, bufferSize);
+                int hdr = (9 + 16 + 8 + handle.getIdentifier().length) + buffer.wpos();
+                buffer.rpos(hdr);
+                buffer.wpos(hdr);
+            }
+            int max = bufferSize - (9 + 16 + handle.getIdentifier().length + 72);
+            int nb = Math.min(len, max - (buffer.wpos() - buffer.rpos()));
+            buffer.putRawBytes(b, off, nb);
+            if (buffer.available() == max) {
+                flush();
+            }
+            off += nb;
+            len -= nb;
+        } while (len > 0);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        if (!isOpen()) {
+            throw new IOException("flush(" + getPath() + ") stream is closed");
+        }
+
+        for (;;) {
+            Ack ack = pendingWrites.peek();
+            if (ack != null) {
+                Buffer response = client.receive(ack.id, 0);
+                if (response != null) {
+                    pendingWrites.removeFirst();
+                    client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response);
+                } else {
+                    break;
+                }
+            } else {
+                break;
+            }
+        }
+
+        byte[] id = handle.getIdentifier();
+        int avail = buffer.available();
+        Buffer buf;
+        if (buffer.rpos() >= 16 + id.length) {
+            int wpos = buffer.wpos();
+            buffer.rpos(buffer.rpos() - 16 - id.length);
+            buffer.wpos(buffer.rpos());
+            buffer.putBytes(id);
+            buffer.putLong(offset);
+            buffer.putInt(avail);
+            buffer.wpos(wpos);
+            buf = buffer;
+        } else {
+            buf = new ByteArrayBuffer(id.length + avail + Long.SIZE /* some extra fields */, false);
+            buf.putBytes(id);
+            buf.putLong(offset);
+            buf.putBytes(buffer.array(), buffer.rpos(), avail);
+        }
+
+        int reqId = client.send(SftpConstants.SSH_FXP_WRITE, buf);
+        pendingWrites.add(new Ack(reqId, offset, avail));
+
+        offset += avail;
+        buffer = null;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (isOpen()) {
+            try {
+                try {
+                    if (buffer != null && buffer.available() > 0) {
+                        flush();
+                    }
+                    while (!pendingWrites.isEmpty()) {
+                        Ack ack = pendingWrites.removeFirst();
+                        Buffer response = client.receive(ack.id);
+                        client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response);
+                    }
+                } finally {
+                    handle.close();
+                }
+            } finally {
+                handle = null;
+            }
+        }
+    }
+}
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpInputStreamWithChannel.java b/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpInputStreamWithChannel.java
similarity index 100%
rename from sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpInputStreamWithChannel.java
rename to sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpInputStreamWithChannel.java
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpOutputStreamWithChannel.java b/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpOutputStreamWithChannel.java
similarity index 100%
rename from sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpOutputStreamWithChannel.java
rename to sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpOutputStreamWithChannel.java
diff --git a/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpPerformanceTest.java b/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpPerformanceTest.java
new file mode 100644
index 0000000..d24b7c8
--- /dev/null
+++ b/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpPerformanceTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.subsystem.sftp;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+
+import eu.rekawek.toxiproxy.model.ToxicDirection;
+import eu.rekawek.toxiproxy.model.toxic.Latency;
+import org.apache.sshd.client.SshClient;
+import org.apache.sshd.client.config.hosts.HostConfigEntryResolver;
+import org.apache.sshd.client.keyverifier.AcceptAllServerKeyVerifier;
+import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode;
+import org.apache.sshd.client.subsystem.sftp.fs.SftpFileSystem;
+import org.apache.sshd.common.keyprovider.KeyIdentityProvider;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.ToxiproxyContainer;
+import org.testcontainers.containers.ToxiproxyContainer.ContainerProxy;
+
+@Ignore("Special class used for development only - not really a test just useful to run as such")
+public class SftpPerformanceTest {
+
+    public static final String USERNAME = "foo";
+    public static final String PASSWORD = "pass";
+
+    // Create a common docker network so that containers can communicate
+    @Rule
+    public Network network = Network.newNetwork();
+
+    // the target container - this could be anything
+    @Rule
+    public GenericContainer<?> sftp = new GenericContainer<>("atmoz/sftp")
+            .withEnv("SFTP_USERS", USERNAME + ":" + PASSWORD)
+            .withNetwork(network)
+            .withFileSystemBind("target", "/home/foo")
+            .withExposedPorts(22);
+
+    // Toxiproxy container, which will be used as a TCP proxy
+    @Rule
+    public ToxiproxyContainer toxiproxy = new ToxiproxyContainer()
+            .withNetwork(network);
+
+    public SftpPerformanceTest() {
+        super();
+    }
+
+    @Test
+    public void testUploadLatency() throws IOException {
+        final ContainerProxy proxy = toxiproxy.getProxy(sftp, 22);
+        for (int latency : Arrays.asList(0, 1, 5, 10, 50, 100, 500)) {
+            Latency toxic = proxy.toxics().latency("latency", ToxicDirection.DOWNSTREAM, latency);
+            for (int megabytes : Arrays.asList(1, 5, 10, 50, 100)) {
+                try (SshClient client = createSshClient()) {
+                    long orgTime;
+                    long newTime;
+                    try (ClientSession session = createClientSession(client, proxy)) {
+                        orgTime = uploadPrevious(session, megabytes);
+                    }
+                    try (ClientSession session = createClientSession(client, proxy)) {
+                        newTime = uploadOptimized(session, megabytes);
+                    }
+                    System.out.println(String.format("%3d MB / %3d ms latency: %7d down to %5d ms, gain = %d%%",
+                            megabytes, latency, orgTime, newTime,
+                            (int) (100 * (orgTime - newTime) / orgTime)));
+                }
+            }
+            toxic.remove();
+        }
+    }
+
+    @Test
+    public void testDownloadLatency() throws IOException {
+        final ContainerProxy proxy = toxiproxy.getProxy(sftp, 22);
+        for (int latency : Arrays.asList(0, 1, 5, 10, 50, 100, 500)) {
+            Latency toxic = proxy.toxics().latency("latency", ToxicDirection.DOWNSTREAM, latency);
+            for (int megabytes : Arrays.asList(1, 5, 10, 50, 100)) {
+                try (SshClient client = createSshClient()) {
+                    long orgTime;
+                    long newTime;
+                    try (ClientSession session = createClientSession(client, proxy)) {
+                        newTime = downloadOptimized(session, megabytes);
+                    }
+                    try (ClientSession session = createClientSession(client, proxy)) {
+                        orgTime = downloadPrevious(session, megabytes);
+                    }
+                    System.out.println(String.format("%3d MB / %3d ms latency: %7d down to %5d ms, gain = %d%%",
+                            megabytes, latency, orgTime, newTime,
+                            (int) (100 * (orgTime - newTime) / orgTime)));
+                }
+            }
+            toxic.remove();
+        }
+    }
+
+    public ClientSession createClientSession(SshClient client, ContainerProxy proxy) throws IOException {
+        final String ipAddressViaToxiproxy = proxy.getContainerIpAddress();
+        final int portViaToxiproxy = proxy.getProxyPort();
+
+        ClientSession session = client.connect(USERNAME, ipAddressViaToxiproxy, portViaToxiproxy).verify().getClientSession();
+        session.addPasswordIdentity(PASSWORD);
+        session.auth().verify();
+        return session;
+    }
+
+    @NotNull
+    public SshClient createSshClient() {
+        SshClient client = SshClient.setUpDefaultClient();
+        client.setServerKeyVerifier(AcceptAllServerKeyVerifier.INSTANCE);
+        client.setHostConfigEntryResolver(HostConfigEntryResolver.EMPTY);
+        client.setKeyIdentityProvider(KeyIdentityProvider.EMPTY_KEYS_PROVIDER);
+        client.start();
+        return client;
+    }
+
+    public long uploadPrevious(ClientSession session, int mb) throws IOException {
+        long t0 = System.currentTimeMillis();
+        try (SftpClient client = SftpClientFactory.instance().createSftpClient(session)) {
+            try (OutputStream os = new BufferedOutputStream(
+                    new SftpOutputStreamWithChannel(
+                            client, 32768, "out.txt",
+                            Arrays.asList(OpenMode.Write,
+                                    OpenMode.Create,
+                                    OpenMode.Truncate)),
+                    32768)) {
+                byte[] bytes = "123456789abcdef\n".getBytes();
+                for (int i = 0; i < 1024 * 1024 * mb / bytes.length; i++) {
+                    os.write(bytes);
+                }
+            }
+        }
+        long t1 = System.currentTimeMillis();
+        return t1 - t0;
+    }
+
+    public long uploadOptimized(ClientSession session, int mb) throws IOException {
+        long t0 = System.currentTimeMillis();
+        try (SftpFileSystem fs = SftpClientFactory.instance().createSftpFileSystem(session)) {
+            Path p = fs.getPath("out.txt");
+            try (OutputStream os = new BufferedOutputStream(
+                    Files.newOutputStream(p, StandardOpenOption.CREATE,
+                            StandardOpenOption.TRUNCATE_EXISTING),
+                    32768)) {
+                byte[] bytes = "123456789abcdef\n".getBytes();
+                for (int i = 0; i < 1024 * 1024 * mb / bytes.length; i++) {
+                    os.write(bytes);
+                }
+            }
+        }
+        long t1 = System.currentTimeMillis();
+        return t1 - t0;
+    }
+
+    public long downloadPrevious(ClientSession session, int mb) throws IOException {
+        Path f = Paths.get("target/out.txt");
+        byte[] bytes = "123456789abcdef\n".getBytes();
+        try (BufferedOutputStream bos = new BufferedOutputStream(
+                Files.newOutputStream(f, StandardOpenOption.CREATE,
+                        StandardOpenOption.TRUNCATE_EXISTING,
+                        StandardOpenOption.WRITE))) {
+            for (int i = 0; i < 1024 * 1024 * mb / bytes.length; i++) {
+                bos.write(bytes);
+            }
+        }
+        long t0 = System.currentTimeMillis();
+        try (SftpClient client = SftpClientFactory.instance().createSftpClient(session)) {
+            try (InputStream os = new BufferedInputStream(
+                    new SftpInputStreamWithChannel(
+                            client, 32768, "out.txt",
+                            Arrays.asList(OpenMode.Read)),
+                    32768)) {
+                byte[] data = new byte[8192];
+                for (int i = 0; i < 1024 * 1024 * mb / data.length; i++) {
+                    int l = os.read(data);
+                    if (l < 0) {
+                        break;
+                    }
+                }
+            }
+        }
+        long t1 = System.currentTimeMillis();
+        return t1 - t0;
+    }
+
+    public long downloadOptimized(ClientSession session, int mb) throws IOException {
+        Path f = Paths.get("target/out.txt");
+        byte[] bytes = "123456789abcdef\n".getBytes();
+        try (BufferedOutputStream bos = new BufferedOutputStream(
+                Files.newOutputStream(f, StandardOpenOption.CREATE,
+                        StandardOpenOption.TRUNCATE_EXISTING,
+                        StandardOpenOption.WRITE))) {
+            for (int i = 0; i < 1024 * 1024 * mb / bytes.length; i++) {
+                bos.write(bytes);
+            }
+        }
+        long t0 = System.currentTimeMillis();
+        try (SftpFileSystem fs = SftpClientFactory.instance().createSftpFileSystem(session)) {
+            Path p = fs.getPath("out.txt");
+            try (InputStream os = new BufferedInputStream(
+                    Files.newInputStream(p, StandardOpenOption.READ), 32768)) {
+                byte[] data = new byte[8192];
+                for (int i = 0; i < 1024 * 1024 * mb / data.length; i++) {
+                    int l = os.read(data);
+                    if (l < 0) {
+                        break;
+                    }
+                }
+            }
+        }
+        long t1 = System.currentTimeMillis();
+        return t1 - t0;
+    }
+
+}
diff --git a/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java b/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java
index da0d96d..3078ec2 100644
--- a/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java
+++ b/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java
@@ -586,21 +586,14 @@ public class SftpTest extends AbstractSftpClientTestSupport {
 
             try (SftpClient sftp = createSftpClient(session);
                  InputStream stream = sftp.read(
-                         CommonTestSupportUtils.resolveRelativeRemotePath(parentPath, localFile), OpenMode.Read)) {
-                assertFalse("Stream reported mark supported", stream.markSupported());
-                try {
-                    stream.mark(data.length);
-                    fail("Unexpected success to mark the read limit");
-                } catch (UnsupportedOperationException e) {
-                    // expected - ignored
-                }
+                         CommonTestSupportUtils.resolveRelativeRemotePath(parentPath, localFile),
+                         OpenMode.Read)) {
 
                 byte[] expected = new byte[data.length / 4];
-                int readLen = stream.read(expected);
-                assertEquals("Failed to read fully initial data", expected.length, readLen);
+                int readLen = expected.length;
+                System.arraycopy(data, 0, expected, 0, readLen);
 
                 byte[] actual = new byte[readLen];
-                stream.reset();
                 readLen = stream.read(actual);
                 assertEquals("Failed to read fully reset data", actual.length, readLen);
                 assertArrayEquals("Mismatched re-read data contents", expected, actual);
@@ -616,12 +609,6 @@ public class SftpTest extends AbstractSftpClientTestSupport {
 
                 System.arraycopy(data, expected.length + readLen, expected, 0, expected.length);
                 assertArrayEquals("Mismatched skipped forward data contents", expected, actual);
-
-                skipped = stream.skip(0 - readLen);
-                assertEquals("Mismatched backward skip size", readLen, skipped);
-                readLen = stream.read(actual);
-                assertEquals("Failed to read fully skipped backward data", actual.length, readLen);
-                assertArrayEquals("Mismatched skipped backward data contents", expected, actual);
             }
         }
     }
diff --git a/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTransferTest.java b/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTransferTest.java
new file mode 100644
index 0000000..9bba81f
--- /dev/null
+++ b/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTransferTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.subsystem.sftp;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.client.subsystem.sftp.fs.SftpFileSystem;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class SftpTransferTest extends AbstractSftpClientTestSupport {
+
+    private static final int BUFFER_SIZE = 8192;
+
+    public SftpTransferTest() throws IOException {
+        super();
+    }
+
+    @Test
+    public void testTransferIntegrity() throws IOException {
+        try (ClientSession session = createClientSession();
+             SftpFileSystem fs = SftpClientFactory.instance().createSftpFileSystem(session)) {
+
+            Path localRoot = detectTargetFolder().resolve("sftp");
+            Path remoteRoot = fs.getDefaultDir().resolve("target/sftp");
+
+            Path local0 = localRoot.resolve("files-0.txt");
+            Path remote0 = remoteRoot.resolve("files-1.txt");
+            Path local1 = localRoot.resolve("files-2.txt");
+            Path remote1 = remoteRoot.resolve("files-3.txt");
+            Path local2 = localRoot.resolve("files-4.txt");
+            Files.deleteIfExists(local0);
+            Files.deleteIfExists(remote0);
+            Files.deleteIfExists(local1);
+            Files.deleteIfExists(remote1);
+            Files.deleteIfExists(local2);
+
+            String data = getClass().getName() + "#" + getCurrentTestName() + "(" + new Date() + ")\n";
+            try (BufferedWriter bos = Files.newBufferedWriter(local0)) {
+                long count = 0;
+                while (count < 1024 * 1024 * 10) { // 10 MB
+                    bos.append(data);
+                    count += data.length();
+                }
+            }
+
+            Files.copy(local0, remote0);
+            Files.copy(remote0, local1);
+            Files.copy(local1, remote1);
+            Files.copy(remote1, local2);
+
+            assertTrue("File integrity problem", sameContent(local0, local2));
+        }
+    }
+
+    private ClientSession createClientSession() throws IOException {
+        ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port)
+                .verify(7L, TimeUnit.SECONDS).getSession();
+        try {
+            session.addPasswordIdentity(getCurrentTestName());
+            session.auth().verify(5L, TimeUnit.SECONDS);
+            return session;
+        } catch (IOException e) {
+            session.close();
+            throw e;
+        }
+    }
+
+    private boolean sameContent(Path path, Path path2) throws IOException {
+        byte[] buffer1 = new byte[BUFFER_SIZE];
+        byte[] buffer2 = new byte[BUFFER_SIZE];
+        try (InputStream in1 = Files.newInputStream(path);
+             InputStream in2 = Files.newInputStream(path2)) {
+            while (true) {
+                int nRead1 = readNBytes(in1, buffer1);
+                int nRead2 = readNBytes(in2, buffer2);
+                if (nRead1 != nRead2) {
+                    return false;
+                } else if (nRead1 == BUFFER_SIZE) {
+                    if (!Arrays.equals(buffer1, buffer2)) {
+                        return false;
+                    }
+                } else {
+                    for (int i = 0; i < nRead1; i++) {
+                        if (buffer1[i] != buffer2[i]) {
+                            return false;
+                        }
+                    }
+                    return true;
+                }
+            }
+        }
+    }
+
+    private int readNBytes(InputStream is, byte[] b) throws IOException {
+        int n = 0;
+        int len = b.length;
+        while (n < len) {
+            int count = is.read(b, n, len - n);
+            if (count < 0) {
+                break;
+            }
+            n += count;
+        }
+        return n;
+    }
+
+}