You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2020/04/28 14:07:30 UTC

[mina-sshd] 01/02: [SSHD-979] Allow more flexible extension of improved SFTP API implementations

This is an automated email from the ASF dual-hosted git repository.

lgoldstein pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git

commit 15d6c73538e0477d4a57783b4ff5aebdbad405c0
Author: Lyor Goldstein <lg...@apache.org>
AuthorDate: Thu Apr 23 21:10:27 2020 +0300

    [SSHD-979] Allow more flexible extension of improved SFTP API implementations
---
 .../sshd/client/subsystem/sftp/RawSftpClient.java  |   1 -
 .../subsystem/sftp/impl/AbstractSftpClient.java    |  34 +----
 .../subsystem/sftp/impl/DefaultSftpClient.java     | 163 +++++++++++++--------
 .../client/subsystem/sftp/impl/SftpAckData.java    |  45 ++++++
 .../subsystem/sftp/impl/SftpInputStreamAsync.java  |  78 +++++-----
 .../subsystem/sftp/impl/SftpOutputStreamAsync.java |  46 +++---
 .../subsystem/sftp/impl/SftpRemotePathChannel.java |  24 +--
 .../sshd/server/subsystem/sftp/FileHandle.java     |   1 +
 8 files changed, 214 insertions(+), 178 deletions(-)

diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/RawSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/RawSftpClient.java
index 560ce55..0f1afd7 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/RawSftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/RawSftpClient.java
@@ -49,5 +49,4 @@ public interface RawSftpClient {
      * @throws IOException If connection closed or interrupted
      */
     Buffer receive(int id, long timeout) throws IOException;
-
 }
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java
index 211f3ee..338f666 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java
@@ -58,6 +58,8 @@ import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
 public abstract class AbstractSftpClient extends AbstractSubsystemClient implements SftpClient, RawSftpClient {
+    public static final int INIT_COMMAND_SIZE = Byte.BYTES /* command */ + Integer.BYTES /* version */;
+
     /**
      * Property used to avoid large buffers when {@link #write(Handle, long, byte[], int, int)} is invoked with a large
      * buffer size.
@@ -158,7 +160,7 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme
 
     /**
      * Sends the specified command, waits for the response and then invokes {@link #checkResponseStatus(int, Buffer)}
-     * 
+     *
      * @param  cmd         The command to send
      * @param  request     The request {@link Buffer}
      * @throws IOException If failed to send, receive or check the returned status
@@ -872,7 +874,7 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme
     @Override
     public void write(Handle handle, long fileOffset, byte[] src, int srcOffset, int len) throws IOException {
         // do some bounds checking first
-        if ((fileOffset < 0) || (srcOffset < 0) || (len < 0)) {
+        if ((fileOffset < 0L) || (srcOffset < 0) || (len < 0)) {
             throw new IllegalArgumentException(
                     "write(" + handle + ") please ensure all parameters "
                                                + " are non-negative values: file-offset=" + fileOffset
@@ -1281,13 +1283,6 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme
         checkCommandStatus(SftpConstants.SSH_FXP_UNBLOCK, buffer);
     }
 
-    /**
-     * @param  path        The remote directory path
-     * @return             An {@link Iterable} that can be used to iterate over all the directory entries (unlike
-     *                     {@link #readDir(Handle)})
-     * @throws IOException If failed to access the remote site
-     * @see                #readDir(Handle)
-     */
     @Override
     public Iterable<DirEntry> readDir(String path) throws IOException {
         if (!isOpen()) {
@@ -1297,13 +1292,6 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme
         return new SftpIterableDirEntry(this, path);
     }
 
-    /**
-     * @param  handle      A directory {@link Handle}
-     * @return             An {@link Iterable} that can be used to iterate over all the directory entries (like
-     *                     {@link #readDir(String)}). <B>Note:</B> the iterable instance is not re-usable - i.e., files
-     *                     can be iterated only <U>once</U>
-     * @throws IOException If failed to access the directory
-     */
     @Override
     public Iterable<DirEntry> listDir(Handle handle) throws IOException {
         if (!isOpen()) {
@@ -1313,20 +1301,8 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme
         return new StfpIterableDirHandle(this, handle);
     }
 
-    /**
-     * Opens an {@link FileChannel} on the specified remote path
-     *
-     * @param  path        The remote path
-     * @param  modes       The access mode(s) - if {@code null}/empty then the {@link #DEFAULT_CHANNEL_MODES} are used
-     * @return             The open {@link FileChannel} - <B>Note:</B> do not close this owner client instance until the
-     *                     channel is no longer needed since it uses the client for providing the channel's
-     *                     functionality.
-     * @throws IOException If failed to open the channel
-     * @see                java.nio.channels.Channels#newInputStream(java.nio.channels.ReadableByteChannel)
-     * @see                java.nio.channels.Channels#newOutputStream(java.nio.channels.WritableByteChannel)
-     */
     @Override
-    public SftpRemotePathChannel openRemoteFileChannel(String path, Collection<OpenMode> modes) throws IOException {
+    public FileChannel openRemoteFileChannel(String path, Collection<OpenMode> modes) throws IOException {
         return new SftpRemotePathChannel(path, this, false, GenericUtils.isEmpty(modes) ? DEFAULT_CHANNEL_MODES : modes);
     }
 
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java
index 2c13f5b..cf00605 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java
@@ -50,6 +50,8 @@ import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.channel.Channel;
 import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
 import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.io.IoOutputStream;
+import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.subsystem.sftp.SftpConstants;
@@ -79,7 +81,7 @@ public class DefaultSftpClient extends AbstractSftpClient {
         this.nameDecodingCharset = PropertyResolverUtils.getCharset(
                 clientSession, NAME_DECODING_CHARSET, DEFAULT_NAME_DECODING_CHARSET);
         this.clientSession = Objects.requireNonNull(clientSession, "No client session");
-        this.channel = new SftpChannelSubsystem();
+        this.channel = createSftpChannelSubsystem(clientSession);
         clientSession.getService(ConnectionService.class).registerChannel(channel);
 
         long initializationTimeout = clientSession.getLongProperty(
@@ -153,7 +155,7 @@ public class DefaultSftpClient extends AbstractSftpClient {
 
     /**
      * Receive binary data
-     * 
+     *
      * @param  buf         The buffer for the incoming data
      * @param  start       Offset in buffer to place the data
      * @param  len         Available space in buffer for the data
@@ -278,7 +280,10 @@ public class DefaultSftpClient extends AbstractSftpClient {
             buf.putInt(id);
             buf.putBuffer(buffer);
         }
-        channel.getAsyncIn().writePacket(buf).verify();
+
+        IoOutputStream asyncIn = channel.getAsyncIn();
+        IoWriteFuture writeFuture = asyncIn.writePacket(buf);
+        writeFuture.verify();
         return id;
     }
 
@@ -315,7 +320,7 @@ public class DefaultSftpClient extends AbstractSftpClient {
             if (buffer != null) {
                 return buffer;
             }
-            if (idleTimeout > 0) {
+            if (idleTimeout > 0L) {
                 try {
                     messages.wait(idleTimeout);
                 } catch (InterruptedException e) {
@@ -327,65 +332,37 @@ public class DefaultSftpClient extends AbstractSftpClient {
     }
 
     protected void init(long initializationTimeout) throws IOException {
-        ValidateUtils.checkTrue(initializationTimeout > 0L, "Invalid initialization timeout: %d", initializationTimeout);
-
         // Send init packet
-        Buffer buf = new ByteArrayBuffer(9);
-        buf.putInt(5);
+        Buffer buf = new ByteArrayBuffer(INIT_COMMAND_SIZE + SshConstants.SSH_PACKET_HEADER_LEN);
+        buf.putInt(INIT_COMMAND_SIZE);
         buf.putByte((byte) SftpConstants.SSH_FXP_INIT);
         buf.putInt(SftpConstants.SFTP_V6);
-        channel.getAsyncIn().writePacket(buf).verify();
-
-        Buffer buffer;
-        Integer reqId;
-        synchronized (messages) {
-            /*
-             * We need to use a timeout since if the remote server does not support SFTP, we will not know it
-             * immediately. This is due to the fact that the request for the subsystem does not contain a reply as to
-             * its success or failure. Thus, the SFTP channel is created by the client, but there is no one on the other
-             * side to reply - thus the need for the timeout
-             */
-            for (long remainingTimeout = initializationTimeout;
-                 (remainingTimeout > 0L) && messages.isEmpty() && (!isClosing()) && isOpen();) {
-                try {
-                    long sleepStart = System.nanoTime();
-                    messages.wait(remainingTimeout);
-                    long sleepEnd = System.nanoTime();
-                    long sleepDuration = sleepEnd - sleepStart;
-                    long sleepMillis = TimeUnit.NANOSECONDS.toMillis(sleepDuration);
-                    if (sleepMillis < 1L) {
-                        remainingTimeout--;
-                    } else {
-                        remainingTimeout -= sleepMillis;
-                    }
-                } catch (InterruptedException e) {
-                    throw (IOException) new InterruptedIOException(
-                            "Interrupted init() while " + remainingTimeout + " msec. remaining").initCause(e);
-                }
-            }
 
-            if (isClosing() || (!isOpen())) {
-                throw new EOFException("Closing while await init message");
-            }
-
-            if (messages.isEmpty()) {
-                throw new SocketTimeoutException(
-                        "No incoming initialization response received within " + initializationTimeout + " msec.");
-            }
+        boolean traceEnabled = log.isTraceEnabled();
+        IoOutputStream asyncIn = channel.getAsyncIn();
+        ClientChannel clientChannel = getClientChannel();
+        if (traceEnabled) {
+            log.trace("init({}) send SSH_FXP_INIT", clientChannel);
+        }
+        IoWriteFuture writeFuture = asyncIn.writePacket(buf);
+        writeFuture.verify();
 
-            Collection<Integer> ids = messages.keySet();
-            Iterator<Integer> iter = ids.iterator();
-            reqId = iter.next();
-            buffer = messages.remove(reqId);
+        if (traceEnabled) {
+            log.trace("init({}) wait for SSH_FXP_INIT respose (timeout={})", clientChannel, initializationTimeout);
         }
+        Buffer buffer = waitForInitResponse(initializationTimeout);
+        handleInitResponse(buffer);
+    }
 
+    protected void handleInitResponse(Buffer buffer) throws IOException {
+        boolean traceEnabled = log.isTraceEnabled();
+        ClientChannel clientChannel = getClientChannel();
         int length = buffer.getInt();
         int type = buffer.getUByte();
         int id = buffer.getInt();
-        boolean traceEnabled = log.isTraceEnabled();
         if (traceEnabled) {
-            log.trace("init({}) id={} type={} len={}",
-                    getClientChannel(), id, SftpConstants.getCommandMessageName(type), length);
+            log.trace("handleInitResponse({}) id={} type={} len={}",
+                    clientChannel, id, SftpConstants.getCommandMessageName(type), length);
         }
 
         if (type == SftpConstants.SSH_FXP_VERSION) {
@@ -395,14 +372,14 @@ public class DefaultSftpClient extends AbstractSftpClient {
             versionHolder.set(id);
 
             if (traceEnabled) {
-                log.trace("init({}) version={}", getClientChannel(), versionHolder);
+                log.trace("handleInitResponse({}) version={}", clientChannel, versionHolder);
             }
 
             while (buffer.available() > 0) {
                 String name = buffer.getString();
                 byte[] data = buffer.getBytes();
                 if (traceEnabled) {
-                    log.trace("init({}) added extension={}", getClientChannel(), name);
+                    log.trace("handleInitResponse({}) added extension={}", clientChannel, name);
                 }
                 extensions.put(name, data);
             }
@@ -411,8 +388,8 @@ public class DefaultSftpClient extends AbstractSftpClient {
             String msg = buffer.getString();
             String lang = buffer.getString();
             if (traceEnabled) {
-                log.trace("init({})[id={}] - status: {} [{}] {}",
-                        getClientChannel(), id, SftpConstants.getStatusName(substatus), lang, msg);
+                log.trace("handleInitResponse({})[id={}] - status: {} [{}] {}",
+                        clientChannel, id, SftpConstants.getStatusName(substatus), lang, msg);
             }
 
             throwStatusException(SftpConstants.SSH_FXP_INIT, id, substatus, msg, lang);
@@ -426,6 +403,51 @@ public class DefaultSftpClient extends AbstractSftpClient {
         }
     }
 
+    protected Buffer waitForInitResponse(long initializationTimeout) throws IOException {
+        ValidateUtils.checkTrue(initializationTimeout > 0L, "Invalid initialization timeout: %d", initializationTimeout);
+
+        synchronized (messages) {
+            /*
+             * We need to use a timeout since if the remote server does not support SFTP, we will not know it
+             * immediately. This is due to the fact that the request for the subsystem does not contain a reply as to
+             * its success or failure. Thus, the SFTP channel is created by the client, but there is no one on the other
+             * side to reply - thus the need for the timeout
+             */
+            for (long remainingTimeout = initializationTimeout;
+                 (remainingTimeout > 0L) && messages.isEmpty() && (!isClosing()) && isOpen();) {
+                try {
+                    long sleepStart = System.nanoTime();
+                    messages.wait(remainingTimeout);
+                    long sleepEnd = System.nanoTime();
+                    long sleepDuration = sleepEnd - sleepStart;
+                    long sleepMillis = TimeUnit.NANOSECONDS.toMillis(sleepDuration);
+                    if (sleepMillis < 1L) {
+                        remainingTimeout--;
+                    } else {
+                        remainingTimeout -= sleepMillis;
+                    }
+                } catch (InterruptedException e) {
+                    throw (IOException) new InterruptedIOException(
+                            "Interrupted init() while " + remainingTimeout + " msec. remaining").initCause(e);
+                }
+            }
+
+            if (isClosing() || (!isOpen())) {
+                throw new EOFException("Closing while await init message");
+            }
+
+            if (messages.isEmpty()) {
+                throw new SocketTimeoutException(
+                        "No incoming initialization response received within " + initializationTimeout + " msec.");
+            }
+
+            Collection<Integer> ids = messages.keySet();
+            Iterator<Integer> iter = ids.iterator();
+            Integer reqId = iter.next();
+            return messages.remove(reqId);
+        }
+    }
+
     /**
      * @param  selector    The {@link SftpVersionSelector} to use - ignored if {@code null}
      * @return             The selected version (may be same as current)
@@ -485,9 +507,12 @@ public class DefaultSftpClient extends AbstractSftpClient {
         return selected;
     }
 
-    private class SftpChannelSubsystem extends ChannelSubsystem {
+    protected ChannelSubsystem createSftpChannelSubsystem(ClientSession clientSession) {
+        return new SftpChannelSubsystem();
+    }
 
-        SftpChannelSubsystem() {
+    protected class SftpChannelSubsystem extends ChannelSubsystem {
+        protected SftpChannelSubsystem() {
             super(SftpConstants.SFTP_SUBSYSTEM_NAME);
         }
 
@@ -506,14 +531,19 @@ public class DefaultSftpClient extends AbstractSftpClient {
             addPendingRequest(Channel.CHANNEL_SUBSYSTEM, wantReply);
             writePacket(buffer);
 
-            asyncIn = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA) {
+            asyncIn = createAsyncInput(session);
+            setOut(createStdOutputStream(session));
+            setErr(createErrOutputStream(session));
+        }
+
+        protected ChannelAsyncOutputStream createAsyncInput(Session session) {
+            return new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA) {
                 @SuppressWarnings("synthetic-access")
                 @Override
                 protected CloseFuture doCloseGracefully() {
                     try {
                         sendEof();
                     } catch (IOException e) {
-                        Session session = getSession();
                         session.exceptionCaught(e);
                     }
                     return super.doCloseGracefully();
@@ -521,7 +551,7 @@ public class DefaultSftpClient extends AbstractSftpClient {
 
                 @Override
                 protected Buffer createSendBuffer(Buffer buffer, Channel channel, long length) {
-                    if (buffer.rpos() >= 9 && length == buffer.available()) {
+                    if ((buffer.rpos() >= 9) && (length == buffer.available())) {
                         int rpos = buffer.rpos();
                         int wpos = buffer.wpos();
                         buffer.rpos(rpos - 9);
@@ -537,7 +567,10 @@ public class DefaultSftpClient extends AbstractSftpClient {
                     }
                 }
             };
-            out = new OutputStream() {
+        }
+
+        protected OutputStream createStdOutputStream(Session session) {
+            return new OutputStream() {
                 private final byte[] singleByte = new byte[1];
 
                 @Override
@@ -553,7 +586,11 @@ public class DefaultSftpClient extends AbstractSftpClient {
                     data(b, off, len);
                 }
             };
-            err = new ByteArrayOutputStream();
+        }
+
+        protected OutputStream createErrOutputStream(Session session) {
+            // TODO use some limit in case some data is constantly written to this stream
+            return new ByteArrayOutputStream();
         }
     }
 }
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpAckData.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpAckData.java
new file mode 100644
index 0000000..e21d48e
--- /dev/null
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpAckData.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.client.subsystem.sftp.impl;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+@SuppressWarnings("checkstyle:VisibilityModifier")
+public class SftpAckData {
+    public final int id;
+    public final long offset;
+    public final int length;
+
+    public SftpAckData(int id, long offset, int length) {
+        this.id = id;
+        this.offset = offset;
+        this.length = length;
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName()
+               + "[id=" + id
+               + ", offset=" + offset
+               + ", length=" + length
+               + "]";
+    }
+}
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
index ec3e593..c1a73d2 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
@@ -32,6 +32,9 @@ import org.apache.sshd.client.subsystem.sftp.SftpClient;
 import org.apache.sshd.client.subsystem.sftp.SftpClient.CloseableHandle;
 import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode;
 import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.channel.Window;
+import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.subsystem.sftp.SftpConstants;
 import org.apache.sshd.common.subsystem.sftp.SftpHelper;
 import org.apache.sshd.common.util.buffer.Buffer;
@@ -39,30 +42,18 @@ import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.common.util.io.InputStreamWithChannel;
 
 public class SftpInputStreamAsync extends InputStreamWithChannel {
-
-    static class Ack {
-        int id;
-        long offset;
-        int length;
-
-        Ack(int id, long offset, int length) {
-            this.id = id;
-            this.offset = offset;
-            this.length = length;
-        }
-    }
+    protected final byte[] bb = new byte[1];
+    protected final int bufferSize;
+    protected final long fileSize;
+    protected Buffer buffer;
+    protected CloseableHandle handle;
+    protected long requestOffset;
+    protected long clientOffset;
+    protected final Deque<SftpAckData> pendingReads = new LinkedList<>();
+    protected boolean eofIndicator;
 
     private final AbstractSftpClient client;
     private final String path;
-    private final byte[] bb = new byte[1];
-    private final int bufferSize;
-    private final long fileSize;
-    private Buffer buffer;
-    private CloseableHandle handle;
-    private long requestOffset;
-    private long clientOffset;
-    private final Deque<Ack> pendingReads = new LinkedList<>();
-    private boolean eofIndicator;
 
     public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize,
                                 String path, Collection<OpenMode> mode) throws IOException {
@@ -156,8 +147,9 @@ public class SftpInputStreamAsync extends InputStreamWithChannel {
         if (!isOpen()) {
             throw new IOException("transferTo(" + getPath() + ") stream closed");
         }
+
         long orgOffset = clientOffset;
-        while (!eofIndicator && max > 0) {
+        while ((!eofIndicator) && (max > 0L)) {
             if (hasNoData()) {
                 fillData();
                 if (eofIndicator && hasNoData()) {
@@ -179,11 +171,11 @@ public class SftpInputStreamAsync extends InputStreamWithChannel {
         return clientOffset - orgOffset;
     }
 
-    @SuppressWarnings("PMD.MissingOverride")
     public long transferTo(OutputStream out) throws IOException {
         if (!isOpen()) {
             throw new IOException("transferTo(" + getPath() + ") stream closed");
         }
+
         long orgOffset = clientOffset;
         while (!eofIndicator) {
             if (hasNoData()) {
@@ -207,41 +199,46 @@ public class SftpInputStreamAsync extends InputStreamWithChannel {
         if (!isOpen()) {
             throw new IOException("skip(" + getPath() + ") stream closed");
         }
-        if (clientOffset == 0 && pendingReads.isEmpty()) {
+        if ((clientOffset == 0L) && pendingReads.isEmpty()) {
             clientOffset = n;
             return n;
         }
         return super.skip(n);
     }
 
-    boolean hasNoData() {
-        return buffer == null || buffer.available() == 0;
+    protected boolean hasNoData() {
+        return (buffer == null) || (buffer.available() == 0);
     }
 
-    void sendRequests() throws IOException {
+    protected void sendRequests() throws IOException {
         if (!eofIndicator) {
-            long windowSize = client.getChannel().getLocalWindow().getMaxSize();
-            while (pendingReads.size() < (int) (windowSize / bufferSize) && requestOffset < fileSize + bufferSize
+            Channel channel = client.getChannel();
+            Window localWindow = channel.getLocalWindow();
+            long windowSize = localWindow.getMaxSize();
+            Session session = client.getSession();
+            byte[] id = handle.getIdentifier();
+
+            while ((pendingReads.size() < (int) (windowSize / bufferSize)) && (requestOffset < (fileSize + bufferSize))
                     || pendingReads.isEmpty()) {
-                Buffer buf = client.getSession().createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA,
-                        23 /* sftp packet */ + 16 + handle.getIdentifier().length);
+                Buffer buf = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA,
+                        23 /* sftp packet */ + 16 + id.length);
                 buf.rpos(23);
                 buf.wpos(23);
-                buf.putBytes(handle.getIdentifier());
+                buf.putBytes(id);
                 buf.putLong(requestOffset);
                 buf.putInt(bufferSize);
                 int reqId = client.send(SftpConstants.SSH_FXP_READ, buf);
-                pendingReads.add(new Ack(reqId, requestOffset, bufferSize));
+                pendingReads.add(new SftpAckData(reqId, requestOffset, bufferSize));
                 requestOffset += bufferSize;
             }
         }
     }
 
-    void fillData() throws IOException {
-        Ack ack = pendingReads.pollFirst();
+    protected void fillData() throws IOException {
+        SftpAckData ack = pendingReads.pollFirst();
         if (ack != null) {
             pollBuffer(ack);
-            if (!eofIndicator && clientOffset < ack.offset) {
+            if ((!eofIndicator) && (clientOffset < ack.offset)) {
                 // we are actually missing some data
                 // so request is synchronously
                 byte[] data = new byte[(int) (ack.offset - clientOffset + buffer.available())];
@@ -250,7 +247,8 @@ public class SftpInputStreamAsync extends InputStreamWithChannel {
                 AtomicReference<Boolean> eof = new AtomicReference<>();
                 while (cur < nb) {
                     int dlen = client.read(handle, clientOffset, data, cur, nb - cur, eof);
-                    eofIndicator = dlen < 0 || eof.get() != null && eof.get();
+                    Boolean eofSignal = eof.getAndSet(null);
+                    eofIndicator = (dlen < 0) || ((eofSignal != null) && eofSignal.booleanValue());
                     cur += dlen;
                 }
                 buffer.getRawBytes(data, nb, buffer.available());
@@ -259,7 +257,7 @@ public class SftpInputStreamAsync extends InputStreamWithChannel {
         }
     }
 
-    void pollBuffer(Ack ack) throws IOException {
+    protected void pollBuffer(SftpAckData ack) throws IOException {
         Buffer buf = client.receive(ack.id);
         int length = buf.getInt();
         int type = buf.getUByte();
@@ -270,7 +268,7 @@ public class SftpInputStreamAsync extends InputStreamWithChannel {
             int rpos = buf.rpos();
             buf.rpos(rpos + dlen);
             Boolean b = SftpHelper.getEndOfFileIndicatorValue(buf, client.getVersion());
-            eofIndicator = b != null && b;
+            eofIndicator = (b != null) && b.booleanValue();
             buf.rpos(rpos);
             buf.wpos(rpos + dlen);
             this.buffer = buf;
@@ -298,7 +296,7 @@ public class SftpInputStreamAsync extends InputStreamWithChannel {
             try {
                 try {
                     while (!pendingReads.isEmpty()) {
-                        Ack ack = pendingReads.removeFirst();
+                        SftpAckData ack = pendingReads.removeFirst();
                         pollBuffer(ack);
                     }
                 } finally {
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java
index d8f1974..b5b809c 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java
@@ -28,6 +28,7 @@ import org.apache.sshd.client.subsystem.sftp.SftpClient;
 import org.apache.sshd.client.subsystem.sftp.SftpClient.CloseableHandle;
 import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode;
 import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.subsystem.sftp.SftpConstants;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
@@ -39,27 +40,15 @@ import org.apache.sshd.common.util.io.OutputStreamWithChannel;
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
 public class SftpOutputStreamAsync extends OutputStreamWithChannel {
-
-    static class Ack {
-        int id;
-        long offset;
-        int length;
-
-        Ack(int id, long offset, int length) {
-            this.id = id;
-            this.offset = offset;
-            this.length = length;
-        }
-    }
+    protected final byte[] bb = new byte[1];
+    protected final int bufferSize;
+    protected Buffer buffer;
+    protected CloseableHandle handle;
+    protected long offset;
+    protected final Deque<SftpAckData> pendingWrites = new LinkedList<>();
 
     private final AbstractSftpClient client;
     private final String path;
-    private final byte[] bb = new byte[1];
-    private final int bufferSize;
-    private Buffer buffer;
-    private CloseableHandle handle;
-    private long offset;
-    private final Deque<Ack> pendingWrites = new LinkedList<>();
 
     public SftpOutputStreamAsync(AbstractSftpClient client, int bufferSize,
                                  String path, Collection<OpenMode> mode) throws IOException {
@@ -112,14 +101,17 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel {
 
     @Override
     public void write(byte[] b, int off, int len) throws IOException {
+        byte[] id = handle.getIdentifier();
+        Session session = client.getSession();
+
         do {
             if (buffer == null) {
-                buffer = client.getSession().createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA, bufferSize);
-                int hdr = (9 + 16 + 8 + handle.getIdentifier().length) + buffer.wpos();
+                buffer = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA, bufferSize);
+                int hdr = 9 + 16 + 8 + id.length + buffer.wpos();
                 buffer.rpos(hdr);
                 buffer.wpos(hdr);
             }
-            int max = bufferSize - (9 + 16 + handle.getIdentifier().length + 72);
+            int max = bufferSize - (9 + 16 + id.length + 72);
             int nb = Math.min(len, max - (buffer.wpos() - buffer.rpos()));
             buffer.putRawBytes(b, off, nb);
             if (buffer.available() == max) {
@@ -137,9 +129,9 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel {
         }
 
         for (;;) {
-            Ack ack = pendingWrites.peek();
+            SftpAckData ack = pendingWrites.peek();
             if (ack != null) {
-                Buffer response = client.receive(ack.id, 0);
+                Buffer response = client.receive(ack.id, 0L);
                 if (response != null) {
                     pendingWrites.removeFirst();
                     client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response);
@@ -154,7 +146,7 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel {
         byte[] id = handle.getIdentifier();
         int avail = buffer.available();
         Buffer buf;
-        if (buffer.rpos() >= 16 + id.length) {
+        if (buffer.rpos() >= (16 + id.length)) {
             int wpos = buffer.wpos();
             buffer.rpos(buffer.rpos() - 16 - id.length);
             buffer.wpos(buffer.rpos());
@@ -171,7 +163,7 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel {
         }
 
         int reqId = client.send(SftpConstants.SSH_FXP_WRITE, buf);
-        pendingWrites.add(new Ack(reqId, offset, avail));
+        pendingWrites.add(new SftpAckData(reqId, offset, avail));
 
         offset += avail;
         buffer = null;
@@ -182,11 +174,11 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel {
         if (isOpen()) {
             try {
                 try {
-                    if (buffer != null && buffer.available() > 0) {
+                    if ((buffer != null) && (buffer.available() > 0)) {
                         flush();
                     }
                     while (!pendingWrites.isEmpty()) {
-                        Ack ack = pendingWrites.removeFirst();
+                        SftpAckData ack = pendingWrites.removeFirst();
                         Buffer response = client.receive(ack.id);
                         client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response);
                     }
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpRemotePathChannel.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpRemotePathChannel.java
index b6275d1..b698c9f 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpRemotePathChannel.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpRemotePathChannel.java
@@ -227,18 +227,6 @@ public class SftpRemotePathChannel extends FileChannel {
         return doWrite(buffers, -1L);
     }
 
-    static class Ack {
-        int id;
-        long offset;
-        int length;
-
-        Ack(int id, long offset, int length) {
-            this.id = id;
-            this.offset = offset;
-            this.length = length;
-        }
-    }
-
     protected long doWrite(Collection<? extends ByteBuffer> buffers, long position) throws IOException {
         ensureOpen(WRITE_MODES);
 
@@ -362,11 +350,12 @@ public class SftpRemotePathChannel extends FileChannel {
             try {
                 beginBlocking("transferTo");
 
+                // DO NOT CLOSE THE STREAM AS IT WOULD CLOSE THE HANDLE
+                @SuppressWarnings("resource")
                 SftpInputStreamAsync input = new SftpInputStreamAsync(
                         (AbstractSftpClient) sftp,
                         copySize, position, count, getRemotePath(), handle);
                 totalRead = input.transferTo(count, target);
-                // DO NOT CLOSE THE STREAM AS IT WOULD CLOSE THE HANDLE
                 eof = input.isEof();
                 completed = true;
             } finally {
@@ -379,7 +368,7 @@ public class SftpRemotePathChannel extends FileChannel {
                     this, position, count, copySize, totalRead, eof, target);
         }
 
-        return totalRead > 0L ? totalRead : eof ? -1L : 0L;
+        return (totalRead > 0L) ? totalRead : eof ? -1L : 0L;
     }
 
     @Override
@@ -400,7 +389,6 @@ public class SftpRemotePathChannel extends FileChannel {
         }
 
         boolean completed = false;
-        long curPos = (position >= 0L) ? position : posTracker.get();
         long totalRead = 0L;
         byte[] buffer = new byte[(int) Math.min(copySize, count)];
 
@@ -408,6 +396,8 @@ public class SftpRemotePathChannel extends FileChannel {
             try {
                 beginBlocking("transferFrom");
 
+                // DO NOT CLOSE THE OUTPUT STREAM AS IT WOULD CLOSE THE HANDLE
+                @SuppressWarnings("resource")
                 SftpOutputStreamAsync output = new SftpOutputStreamAsync(
                         (AbstractSftpClient) sftp,
                         copySize, getRemotePath(), handle);
@@ -417,7 +407,6 @@ public class SftpRemotePathChannel extends FileChannel {
                     int read = src.read(wrap);
                     if (read > 0) {
                         output.write(buffer, 0, read);
-                        curPos += read;
                         totalRead += read;
                     } else {
                         break;
@@ -441,8 +430,7 @@ public class SftpRemotePathChannel extends FileChannel {
     @Override
     public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException {
         throw new UnsupportedOperationException(
-                "map(" + getRemotePath() + ")"
-                                                + "[" + mode + "," + position + "," + size + "] N/A");
+                "map(" + getRemotePath() + ")[" + mode + "," + position + "," + size + "] N/A");
     }
 
     @Override
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/FileHandle.java b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/FileHandle.java
index 6676eb4..9e4237d 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/FileHandle.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/FileHandle.java
@@ -110,6 +110,7 @@ public class FileHandle extends Handle {
         return read(data, 0, data.length, offset);
     }
 
+    @SuppressWarnings("resource")
     public int read(byte[] data, int doff, int length, long offset) throws IOException {
         SeekableByteChannel channel = getFileChannel();
         channel = channel.position(offset);