You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2020/04/28 14:07:29 UTC

[mina-sshd] branch master updated (5e89f97 -> 0da9e23)

This is an automated email from the ASF dual-hosted git repository.

lgoldstein pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git.


    from 5e89f97  [SSHD-982] Fix race condition when loading known hosts file
     new 15d6c73  [SSHD-979] Allow more flexible extension of improved SFTP API implementations
     new 0da9e23  [SSHD-979] Ignore by default any data written to error stream of SFTP subsystem client channel

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../sshd/client/subsystem/sftp/RawSftpClient.java  |   1 -
 .../subsystem/sftp/impl/AbstractSftpClient.java    |  34 +----
 .../subsystem/sftp/impl/DefaultSftpClient.java     | 168 +++++++++++++--------
 .../client/subsystem/sftp/impl/SftpAckData.java    |  22 ++-
 .../subsystem/sftp/impl/SftpInputStreamAsync.java  |  78 +++++-----
 .../subsystem/sftp/impl/SftpOutputStreamAsync.java |  46 +++---
 .../subsystem/sftp/impl/SftpRemotePathChannel.java |  24 +--
 .../sshd/server/subsystem/sftp/FileHandle.java     |   1 +
 8 files changed, 187 insertions(+), 187 deletions(-)
 copy sshd-core/src/main/java/org/apache/sshd/client/subsystem/AbstractSubsystemClient.java => sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpAckData.java (68%)


[mina-sshd] 01/02: [SSHD-979] Allow more flexible extension of improved SFTP API implementations

Posted by lg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lgoldstein pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git

commit 15d6c73538e0477d4a57783b4ff5aebdbad405c0
Author: Lyor Goldstein <lg...@apache.org>
AuthorDate: Thu Apr 23 21:10:27 2020 +0300

    [SSHD-979] Allow more flexible extension of improved SFTP API implementations
---
 .../sshd/client/subsystem/sftp/RawSftpClient.java  |   1 -
 .../subsystem/sftp/impl/AbstractSftpClient.java    |  34 +----
 .../subsystem/sftp/impl/DefaultSftpClient.java     | 163 +++++++++++++--------
 .../client/subsystem/sftp/impl/SftpAckData.java    |  45 ++++++
 .../subsystem/sftp/impl/SftpInputStreamAsync.java  |  78 +++++-----
 .../subsystem/sftp/impl/SftpOutputStreamAsync.java |  46 +++---
 .../subsystem/sftp/impl/SftpRemotePathChannel.java |  24 +--
 .../sshd/server/subsystem/sftp/FileHandle.java     |   1 +
 8 files changed, 214 insertions(+), 178 deletions(-)

diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/RawSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/RawSftpClient.java
index 560ce55..0f1afd7 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/RawSftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/RawSftpClient.java
@@ -49,5 +49,4 @@ public interface RawSftpClient {
      * @throws IOException If connection closed or interrupted
      */
     Buffer receive(int id, long timeout) throws IOException;
-
 }
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java
index 211f3ee..338f666 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java
@@ -58,6 +58,8 @@ import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
 public abstract class AbstractSftpClient extends AbstractSubsystemClient implements SftpClient, RawSftpClient {
+    public static final int INIT_COMMAND_SIZE = Byte.BYTES /* command */ + Integer.BYTES /* version */;
+
     /**
      * Property used to avoid large buffers when {@link #write(Handle, long, byte[], int, int)} is invoked with a large
      * buffer size.
@@ -158,7 +160,7 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme
 
     /**
      * Sends the specified command, waits for the response and then invokes {@link #checkResponseStatus(int, Buffer)}
-     * 
+     *
      * @param  cmd         The command to send
      * @param  request     The request {@link Buffer}
      * @throws IOException If failed to send, receive or check the returned status
@@ -872,7 +874,7 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme
     @Override
     public void write(Handle handle, long fileOffset, byte[] src, int srcOffset, int len) throws IOException {
         // do some bounds checking first
-        if ((fileOffset < 0) || (srcOffset < 0) || (len < 0)) {
+        if ((fileOffset < 0L) || (srcOffset < 0) || (len < 0)) {
             throw new IllegalArgumentException(
                     "write(" + handle + ") please ensure all parameters "
                                                + " are non-negative values: file-offset=" + fileOffset
@@ -1281,13 +1283,6 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme
         checkCommandStatus(SftpConstants.SSH_FXP_UNBLOCK, buffer);
     }
 
-    /**
-     * @param  path        The remote directory path
-     * @return             An {@link Iterable} that can be used to iterate over all the directory entries (unlike
-     *                     {@link #readDir(Handle)})
-     * @throws IOException If failed to access the remote site
-     * @see                #readDir(Handle)
-     */
     @Override
     public Iterable<DirEntry> readDir(String path) throws IOException {
         if (!isOpen()) {
@@ -1297,13 +1292,6 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme
         return new SftpIterableDirEntry(this, path);
     }
 
-    /**
-     * @param  handle      A directory {@link Handle}
-     * @return             An {@link Iterable} that can be used to iterate over all the directory entries (like
-     *                     {@link #readDir(String)}). <B>Note:</B> the iterable instance is not re-usable - i.e., files
-     *                     can be iterated only <U>once</U>
-     * @throws IOException If failed to access the directory
-     */
     @Override
     public Iterable<DirEntry> listDir(Handle handle) throws IOException {
         if (!isOpen()) {
@@ -1313,20 +1301,8 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme
         return new StfpIterableDirHandle(this, handle);
     }
 
-    /**
-     * Opens an {@link FileChannel} on the specified remote path
-     *
-     * @param  path        The remote path
-     * @param  modes       The access mode(s) - if {@code null}/empty then the {@link #DEFAULT_CHANNEL_MODES} are used
-     * @return             The open {@link FileChannel} - <B>Note:</B> do not close this owner client instance until the
-     *                     channel is no longer needed since it uses the client for providing the channel's
-     *                     functionality.
-     * @throws IOException If failed to open the channel
-     * @see                java.nio.channels.Channels#newInputStream(java.nio.channels.ReadableByteChannel)
-     * @see                java.nio.channels.Channels#newOutputStream(java.nio.channels.WritableByteChannel)
-     */
     @Override
-    public SftpRemotePathChannel openRemoteFileChannel(String path, Collection<OpenMode> modes) throws IOException {
+    public FileChannel openRemoteFileChannel(String path, Collection<OpenMode> modes) throws IOException {
         return new SftpRemotePathChannel(path, this, false, GenericUtils.isEmpty(modes) ? DEFAULT_CHANNEL_MODES : modes);
     }
 
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java
index 2c13f5b..cf00605 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java
@@ -50,6 +50,8 @@ import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.channel.Channel;
 import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
 import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.io.IoOutputStream;
+import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.subsystem.sftp.SftpConstants;
@@ -79,7 +81,7 @@ public class DefaultSftpClient extends AbstractSftpClient {
         this.nameDecodingCharset = PropertyResolverUtils.getCharset(
                 clientSession, NAME_DECODING_CHARSET, DEFAULT_NAME_DECODING_CHARSET);
         this.clientSession = Objects.requireNonNull(clientSession, "No client session");
-        this.channel = new SftpChannelSubsystem();
+        this.channel = createSftpChannelSubsystem(clientSession);
         clientSession.getService(ConnectionService.class).registerChannel(channel);
 
         long initializationTimeout = clientSession.getLongProperty(
@@ -153,7 +155,7 @@ public class DefaultSftpClient extends AbstractSftpClient {
 
     /**
      * Receive binary data
-     * 
+     *
      * @param  buf         The buffer for the incoming data
      * @param  start       Offset in buffer to place the data
      * @param  len         Available space in buffer for the data
@@ -278,7 +280,10 @@ public class DefaultSftpClient extends AbstractSftpClient {
             buf.putInt(id);
             buf.putBuffer(buffer);
         }
-        channel.getAsyncIn().writePacket(buf).verify();
+
+        IoOutputStream asyncIn = channel.getAsyncIn();
+        IoWriteFuture writeFuture = asyncIn.writePacket(buf);
+        writeFuture.verify();
         return id;
     }
 
@@ -315,7 +320,7 @@ public class DefaultSftpClient extends AbstractSftpClient {
             if (buffer != null) {
                 return buffer;
             }
-            if (idleTimeout > 0) {
+            if (idleTimeout > 0L) {
                 try {
                     messages.wait(idleTimeout);
                 } catch (InterruptedException e) {
@@ -327,65 +332,37 @@ public class DefaultSftpClient extends AbstractSftpClient {
     }
 
     protected void init(long initializationTimeout) throws IOException {
-        ValidateUtils.checkTrue(initializationTimeout > 0L, "Invalid initialization timeout: %d", initializationTimeout);
-
         // Send init packet
-        Buffer buf = new ByteArrayBuffer(9);
-        buf.putInt(5);
+        Buffer buf = new ByteArrayBuffer(INIT_COMMAND_SIZE + SshConstants.SSH_PACKET_HEADER_LEN);
+        buf.putInt(INIT_COMMAND_SIZE);
         buf.putByte((byte) SftpConstants.SSH_FXP_INIT);
         buf.putInt(SftpConstants.SFTP_V6);
-        channel.getAsyncIn().writePacket(buf).verify();
-
-        Buffer buffer;
-        Integer reqId;
-        synchronized (messages) {
-            /*
-             * We need to use a timeout since if the remote server does not support SFTP, we will not know it
-             * immediately. This is due to the fact that the request for the subsystem does not contain a reply as to
-             * its success or failure. Thus, the SFTP channel is created by the client, but there is no one on the other
-             * side to reply - thus the need for the timeout
-             */
-            for (long remainingTimeout = initializationTimeout;
-                 (remainingTimeout > 0L) && messages.isEmpty() && (!isClosing()) && isOpen();) {
-                try {
-                    long sleepStart = System.nanoTime();
-                    messages.wait(remainingTimeout);
-                    long sleepEnd = System.nanoTime();
-                    long sleepDuration = sleepEnd - sleepStart;
-                    long sleepMillis = TimeUnit.NANOSECONDS.toMillis(sleepDuration);
-                    if (sleepMillis < 1L) {
-                        remainingTimeout--;
-                    } else {
-                        remainingTimeout -= sleepMillis;
-                    }
-                } catch (InterruptedException e) {
-                    throw (IOException) new InterruptedIOException(
-                            "Interrupted init() while " + remainingTimeout + " msec. remaining").initCause(e);
-                }
-            }
 
-            if (isClosing() || (!isOpen())) {
-                throw new EOFException("Closing while await init message");
-            }
-
-            if (messages.isEmpty()) {
-                throw new SocketTimeoutException(
-                        "No incoming initialization response received within " + initializationTimeout + " msec.");
-            }
+        boolean traceEnabled = log.isTraceEnabled();
+        IoOutputStream asyncIn = channel.getAsyncIn();
+        ClientChannel clientChannel = getClientChannel();
+        if (traceEnabled) {
+            log.trace("init({}) send SSH_FXP_INIT", clientChannel);
+        }
+        IoWriteFuture writeFuture = asyncIn.writePacket(buf);
+        writeFuture.verify();
 
-            Collection<Integer> ids = messages.keySet();
-            Iterator<Integer> iter = ids.iterator();
-            reqId = iter.next();
-            buffer = messages.remove(reqId);
+        if (traceEnabled) {
+            log.trace("init({}) wait for SSH_FXP_INIT respose (timeout={})", clientChannel, initializationTimeout);
         }
+        Buffer buffer = waitForInitResponse(initializationTimeout);
+        handleInitResponse(buffer);
+    }
 
+    protected void handleInitResponse(Buffer buffer) throws IOException {
+        boolean traceEnabled = log.isTraceEnabled();
+        ClientChannel clientChannel = getClientChannel();
         int length = buffer.getInt();
         int type = buffer.getUByte();
         int id = buffer.getInt();
-        boolean traceEnabled = log.isTraceEnabled();
         if (traceEnabled) {
-            log.trace("init({}) id={} type={} len={}",
-                    getClientChannel(), id, SftpConstants.getCommandMessageName(type), length);
+            log.trace("handleInitResponse({}) id={} type={} len={}",
+                    clientChannel, id, SftpConstants.getCommandMessageName(type), length);
         }
 
         if (type == SftpConstants.SSH_FXP_VERSION) {
@@ -395,14 +372,14 @@ public class DefaultSftpClient extends AbstractSftpClient {
             versionHolder.set(id);
 
             if (traceEnabled) {
-                log.trace("init({}) version={}", getClientChannel(), versionHolder);
+                log.trace("handleInitResponse({}) version={}", clientChannel, versionHolder);
             }
 
             while (buffer.available() > 0) {
                 String name = buffer.getString();
                 byte[] data = buffer.getBytes();
                 if (traceEnabled) {
-                    log.trace("init({}) added extension={}", getClientChannel(), name);
+                    log.trace("handleInitResponse({}) added extension={}", clientChannel, name);
                 }
                 extensions.put(name, data);
             }
@@ -411,8 +388,8 @@ public class DefaultSftpClient extends AbstractSftpClient {
             String msg = buffer.getString();
             String lang = buffer.getString();
             if (traceEnabled) {
-                log.trace("init({})[id={}] - status: {} [{}] {}",
-                        getClientChannel(), id, SftpConstants.getStatusName(substatus), lang, msg);
+                log.trace("handleInitResponse({})[id={}] - status: {} [{}] {}",
+                        clientChannel, id, SftpConstants.getStatusName(substatus), lang, msg);
             }
 
             throwStatusException(SftpConstants.SSH_FXP_INIT, id, substatus, msg, lang);
@@ -426,6 +403,51 @@ public class DefaultSftpClient extends AbstractSftpClient {
         }
     }
 
+    protected Buffer waitForInitResponse(long initializationTimeout) throws IOException {
+        ValidateUtils.checkTrue(initializationTimeout > 0L, "Invalid initialization timeout: %d", initializationTimeout);
+
+        synchronized (messages) {
+            /*
+             * We need to use a timeout since if the remote server does not support SFTP, we will not know it
+             * immediately. This is due to the fact that the request for the subsystem does not contain a reply as to
+             * its success or failure. Thus, the SFTP channel is created by the client, but there is no one on the other
+             * side to reply - thus the need for the timeout
+             */
+            for (long remainingTimeout = initializationTimeout;
+                 (remainingTimeout > 0L) && messages.isEmpty() && (!isClosing()) && isOpen();) {
+                try {
+                    long sleepStart = System.nanoTime();
+                    messages.wait(remainingTimeout);
+                    long sleepEnd = System.nanoTime();
+                    long sleepDuration = sleepEnd - sleepStart;
+                    long sleepMillis = TimeUnit.NANOSECONDS.toMillis(sleepDuration);
+                    if (sleepMillis < 1L) {
+                        remainingTimeout--;
+                    } else {
+                        remainingTimeout -= sleepMillis;
+                    }
+                } catch (InterruptedException e) {
+                    throw (IOException) new InterruptedIOException(
+                            "Interrupted init() while " + remainingTimeout + " msec. remaining").initCause(e);
+                }
+            }
+
+            if (isClosing() || (!isOpen())) {
+                throw new EOFException("Closing while await init message");
+            }
+
+            if (messages.isEmpty()) {
+                throw new SocketTimeoutException(
+                        "No incoming initialization response received within " + initializationTimeout + " msec.");
+            }
+
+            Collection<Integer> ids = messages.keySet();
+            Iterator<Integer> iter = ids.iterator();
+            Integer reqId = iter.next();
+            return messages.remove(reqId);
+        }
+    }
+
     /**
      * @param  selector    The {@link SftpVersionSelector} to use - ignored if {@code null}
      * @return             The selected version (may be same as current)
@@ -485,9 +507,12 @@ public class DefaultSftpClient extends AbstractSftpClient {
         return selected;
     }
 
-    private class SftpChannelSubsystem extends ChannelSubsystem {
+    protected ChannelSubsystem createSftpChannelSubsystem(ClientSession clientSession) {
+        return new SftpChannelSubsystem();
+    }
 
-        SftpChannelSubsystem() {
+    protected class SftpChannelSubsystem extends ChannelSubsystem {
+        protected SftpChannelSubsystem() {
             super(SftpConstants.SFTP_SUBSYSTEM_NAME);
         }
 
@@ -506,14 +531,19 @@ public class DefaultSftpClient extends AbstractSftpClient {
             addPendingRequest(Channel.CHANNEL_SUBSYSTEM, wantReply);
             writePacket(buffer);
 
-            asyncIn = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA) {
+            asyncIn = createAsyncInput(session);
+            setOut(createStdOutputStream(session));
+            setErr(createErrOutputStream(session));
+        }
+
+        protected ChannelAsyncOutputStream createAsyncInput(Session session) {
+            return new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA) {
                 @SuppressWarnings("synthetic-access")
                 @Override
                 protected CloseFuture doCloseGracefully() {
                     try {
                         sendEof();
                     } catch (IOException e) {
-                        Session session = getSession();
                         session.exceptionCaught(e);
                     }
                     return super.doCloseGracefully();
@@ -521,7 +551,7 @@ public class DefaultSftpClient extends AbstractSftpClient {
 
                 @Override
                 protected Buffer createSendBuffer(Buffer buffer, Channel channel, long length) {
-                    if (buffer.rpos() >= 9 && length == buffer.available()) {
+                    if ((buffer.rpos() >= 9) && (length == buffer.available())) {
                         int rpos = buffer.rpos();
                         int wpos = buffer.wpos();
                         buffer.rpos(rpos - 9);
@@ -537,7 +567,10 @@ public class DefaultSftpClient extends AbstractSftpClient {
                     }
                 }
             };
-            out = new OutputStream() {
+        }
+
+        protected OutputStream createStdOutputStream(Session session) {
+            return new OutputStream() {
                 private final byte[] singleByte = new byte[1];
 
                 @Override
@@ -553,7 +586,11 @@ public class DefaultSftpClient extends AbstractSftpClient {
                     data(b, off, len);
                 }
             };
-            err = new ByteArrayOutputStream();
+        }
+
+        protected OutputStream createErrOutputStream(Session session) {
+            // TODO use some limit in case some data is constantly written to this stream
+            return new ByteArrayOutputStream();
         }
     }
 }
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpAckData.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpAckData.java
new file mode 100644
index 0000000..e21d48e
--- /dev/null
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpAckData.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.client.subsystem.sftp.impl;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+@SuppressWarnings("checkstyle:VisibilityModifier")
+public class SftpAckData {
+    public final int id;
+    public final long offset;
+    public final int length;
+
+    public SftpAckData(int id, long offset, int length) {
+        this.id = id;
+        this.offset = offset;
+        this.length = length;
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName()
+               + "[id=" + id
+               + ", offset=" + offset
+               + ", length=" + length
+               + "]";
+    }
+}
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
index ec3e593..c1a73d2 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
@@ -32,6 +32,9 @@ import org.apache.sshd.client.subsystem.sftp.SftpClient;
 import org.apache.sshd.client.subsystem.sftp.SftpClient.CloseableHandle;
 import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode;
 import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.channel.Window;
+import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.subsystem.sftp.SftpConstants;
 import org.apache.sshd.common.subsystem.sftp.SftpHelper;
 import org.apache.sshd.common.util.buffer.Buffer;
@@ -39,30 +42,18 @@ import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.common.util.io.InputStreamWithChannel;
 
 public class SftpInputStreamAsync extends InputStreamWithChannel {
-
-    static class Ack {
-        int id;
-        long offset;
-        int length;
-
-        Ack(int id, long offset, int length) {
-            this.id = id;
-            this.offset = offset;
-            this.length = length;
-        }
-    }
+    protected final byte[] bb = new byte[1];
+    protected final int bufferSize;
+    protected final long fileSize;
+    protected Buffer buffer;
+    protected CloseableHandle handle;
+    protected long requestOffset;
+    protected long clientOffset;
+    protected final Deque<SftpAckData> pendingReads = new LinkedList<>();
+    protected boolean eofIndicator;
 
     private final AbstractSftpClient client;
     private final String path;
-    private final byte[] bb = new byte[1];
-    private final int bufferSize;
-    private final long fileSize;
-    private Buffer buffer;
-    private CloseableHandle handle;
-    private long requestOffset;
-    private long clientOffset;
-    private final Deque<Ack> pendingReads = new LinkedList<>();
-    private boolean eofIndicator;
 
     public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize,
                                 String path, Collection<OpenMode> mode) throws IOException {
@@ -156,8 +147,9 @@ public class SftpInputStreamAsync extends InputStreamWithChannel {
         if (!isOpen()) {
             throw new IOException("transferTo(" + getPath() + ") stream closed");
         }
+
         long orgOffset = clientOffset;
-        while (!eofIndicator && max > 0) {
+        while ((!eofIndicator) && (max > 0L)) {
             if (hasNoData()) {
                 fillData();
                 if (eofIndicator && hasNoData()) {
@@ -179,11 +171,11 @@ public class SftpInputStreamAsync extends InputStreamWithChannel {
         return clientOffset - orgOffset;
     }
 
-    @SuppressWarnings("PMD.MissingOverride")
     public long transferTo(OutputStream out) throws IOException {
         if (!isOpen()) {
             throw new IOException("transferTo(" + getPath() + ") stream closed");
         }
+
         long orgOffset = clientOffset;
         while (!eofIndicator) {
             if (hasNoData()) {
@@ -207,41 +199,46 @@ public class SftpInputStreamAsync extends InputStreamWithChannel {
         if (!isOpen()) {
             throw new IOException("skip(" + getPath() + ") stream closed");
         }
-        if (clientOffset == 0 && pendingReads.isEmpty()) {
+        if ((clientOffset == 0L) && pendingReads.isEmpty()) {
             clientOffset = n;
             return n;
         }
         return super.skip(n);
     }
 
-    boolean hasNoData() {
-        return buffer == null || buffer.available() == 0;
+    protected boolean hasNoData() {
+        return (buffer == null) || (buffer.available() == 0);
     }
 
-    void sendRequests() throws IOException {
+    protected void sendRequests() throws IOException {
         if (!eofIndicator) {
-            long windowSize = client.getChannel().getLocalWindow().getMaxSize();
-            while (pendingReads.size() < (int) (windowSize / bufferSize) && requestOffset < fileSize + bufferSize
+            Channel channel = client.getChannel();
+            Window localWindow = channel.getLocalWindow();
+            long windowSize = localWindow.getMaxSize();
+            Session session = client.getSession();
+            byte[] id = handle.getIdentifier();
+
+            while ((pendingReads.size() < (int) (windowSize / bufferSize)) && (requestOffset < (fileSize + bufferSize))
                     || pendingReads.isEmpty()) {
-                Buffer buf = client.getSession().createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA,
-                        23 /* sftp packet */ + 16 + handle.getIdentifier().length);
+                Buffer buf = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA,
+                        23 /* sftp packet */ + 16 + id.length);
                 buf.rpos(23);
                 buf.wpos(23);
-                buf.putBytes(handle.getIdentifier());
+                buf.putBytes(id);
                 buf.putLong(requestOffset);
                 buf.putInt(bufferSize);
                 int reqId = client.send(SftpConstants.SSH_FXP_READ, buf);
-                pendingReads.add(new Ack(reqId, requestOffset, bufferSize));
+                pendingReads.add(new SftpAckData(reqId, requestOffset, bufferSize));
                 requestOffset += bufferSize;
             }
         }
     }
 
-    void fillData() throws IOException {
-        Ack ack = pendingReads.pollFirst();
+    protected void fillData() throws IOException {
+        SftpAckData ack = pendingReads.pollFirst();
         if (ack != null) {
             pollBuffer(ack);
-            if (!eofIndicator && clientOffset < ack.offset) {
+            if ((!eofIndicator) && (clientOffset < ack.offset)) {
                 // we are actually missing some data
                 // so request is synchronously
                 byte[] data = new byte[(int) (ack.offset - clientOffset + buffer.available())];
@@ -250,7 +247,8 @@ public class SftpInputStreamAsync extends InputStreamWithChannel {
                 AtomicReference<Boolean> eof = new AtomicReference<>();
                 while (cur < nb) {
                     int dlen = client.read(handle, clientOffset, data, cur, nb - cur, eof);
-                    eofIndicator = dlen < 0 || eof.get() != null && eof.get();
+                    Boolean eofSignal = eof.getAndSet(null);
+                    eofIndicator = (dlen < 0) || ((eofSignal != null) && eofSignal.booleanValue());
                     cur += dlen;
                 }
                 buffer.getRawBytes(data, nb, buffer.available());
@@ -259,7 +257,7 @@ public class SftpInputStreamAsync extends InputStreamWithChannel {
         }
     }
 
-    void pollBuffer(Ack ack) throws IOException {
+    protected void pollBuffer(SftpAckData ack) throws IOException {
         Buffer buf = client.receive(ack.id);
         int length = buf.getInt();
         int type = buf.getUByte();
@@ -270,7 +268,7 @@ public class SftpInputStreamAsync extends InputStreamWithChannel {
             int rpos = buf.rpos();
             buf.rpos(rpos + dlen);
             Boolean b = SftpHelper.getEndOfFileIndicatorValue(buf, client.getVersion());
-            eofIndicator = b != null && b;
+            eofIndicator = (b != null) && b.booleanValue();
             buf.rpos(rpos);
             buf.wpos(rpos + dlen);
             this.buffer = buf;
@@ -298,7 +296,7 @@ public class SftpInputStreamAsync extends InputStreamWithChannel {
             try {
                 try {
                     while (!pendingReads.isEmpty()) {
-                        Ack ack = pendingReads.removeFirst();
+                        SftpAckData ack = pendingReads.removeFirst();
                         pollBuffer(ack);
                     }
                 } finally {
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java
index d8f1974..b5b809c 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java
@@ -28,6 +28,7 @@ import org.apache.sshd.client.subsystem.sftp.SftpClient;
 import org.apache.sshd.client.subsystem.sftp.SftpClient.CloseableHandle;
 import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode;
 import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.subsystem.sftp.SftpConstants;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
@@ -39,27 +40,15 @@ import org.apache.sshd.common.util.io.OutputStreamWithChannel;
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
 public class SftpOutputStreamAsync extends OutputStreamWithChannel {
-
-    static class Ack {
-        int id;
-        long offset;
-        int length;
-
-        Ack(int id, long offset, int length) {
-            this.id = id;
-            this.offset = offset;
-            this.length = length;
-        }
-    }
+    protected final byte[] bb = new byte[1];
+    protected final int bufferSize;
+    protected Buffer buffer;
+    protected CloseableHandle handle;
+    protected long offset;
+    protected final Deque<SftpAckData> pendingWrites = new LinkedList<>();
 
     private final AbstractSftpClient client;
     private final String path;
-    private final byte[] bb = new byte[1];
-    private final int bufferSize;
-    private Buffer buffer;
-    private CloseableHandle handle;
-    private long offset;
-    private final Deque<Ack> pendingWrites = new LinkedList<>();
 
     public SftpOutputStreamAsync(AbstractSftpClient client, int bufferSize,
                                  String path, Collection<OpenMode> mode) throws IOException {
@@ -112,14 +101,17 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel {
 
     @Override
     public void write(byte[] b, int off, int len) throws IOException {
+        byte[] id = handle.getIdentifier();
+        Session session = client.getSession();
+
         do {
             if (buffer == null) {
-                buffer = client.getSession().createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA, bufferSize);
-                int hdr = (9 + 16 + 8 + handle.getIdentifier().length) + buffer.wpos();
+                buffer = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA, bufferSize);
+                int hdr = 9 + 16 + 8 + id.length + buffer.wpos();
                 buffer.rpos(hdr);
                 buffer.wpos(hdr);
             }
-            int max = bufferSize - (9 + 16 + handle.getIdentifier().length + 72);
+            int max = bufferSize - (9 + 16 + id.length + 72);
             int nb = Math.min(len, max - (buffer.wpos() - buffer.rpos()));
             buffer.putRawBytes(b, off, nb);
             if (buffer.available() == max) {
@@ -137,9 +129,9 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel {
         }
 
         for (;;) {
-            Ack ack = pendingWrites.peek();
+            SftpAckData ack = pendingWrites.peek();
             if (ack != null) {
-                Buffer response = client.receive(ack.id, 0);
+                Buffer response = client.receive(ack.id, 0L);
                 if (response != null) {
                     pendingWrites.removeFirst();
                     client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response);
@@ -154,7 +146,7 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel {
         byte[] id = handle.getIdentifier();
         int avail = buffer.available();
         Buffer buf;
-        if (buffer.rpos() >= 16 + id.length) {
+        if (buffer.rpos() >= (16 + id.length)) {
             int wpos = buffer.wpos();
             buffer.rpos(buffer.rpos() - 16 - id.length);
             buffer.wpos(buffer.rpos());
@@ -171,7 +163,7 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel {
         }
 
         int reqId = client.send(SftpConstants.SSH_FXP_WRITE, buf);
-        pendingWrites.add(new Ack(reqId, offset, avail));
+        pendingWrites.add(new SftpAckData(reqId, offset, avail));
 
         offset += avail;
         buffer = null;
@@ -182,11 +174,11 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel {
         if (isOpen()) {
             try {
                 try {
-                    if (buffer != null && buffer.available() > 0) {
+                    if ((buffer != null) && (buffer.available() > 0)) {
                         flush();
                     }
                     while (!pendingWrites.isEmpty()) {
-                        Ack ack = pendingWrites.removeFirst();
+                        SftpAckData ack = pendingWrites.removeFirst();
                         Buffer response = client.receive(ack.id);
                         client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response);
                     }
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpRemotePathChannel.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpRemotePathChannel.java
index b6275d1..b698c9f 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpRemotePathChannel.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpRemotePathChannel.java
@@ -227,18 +227,6 @@ public class SftpRemotePathChannel extends FileChannel {
         return doWrite(buffers, -1L);
     }
 
-    static class Ack {
-        int id;
-        long offset;
-        int length;
-
-        Ack(int id, long offset, int length) {
-            this.id = id;
-            this.offset = offset;
-            this.length = length;
-        }
-    }
-
     protected long doWrite(Collection<? extends ByteBuffer> buffers, long position) throws IOException {
         ensureOpen(WRITE_MODES);
 
@@ -362,11 +350,12 @@ public class SftpRemotePathChannel extends FileChannel {
             try {
                 beginBlocking("transferTo");
 
+                // DO NOT CLOSE THE STREAM AS IT WOULD CLOSE THE HANDLE
+                @SuppressWarnings("resource")
                 SftpInputStreamAsync input = new SftpInputStreamAsync(
                         (AbstractSftpClient) sftp,
                         copySize, position, count, getRemotePath(), handle);
                 totalRead = input.transferTo(count, target);
-                // DO NOT CLOSE THE STREAM AS IT WOULD CLOSE THE HANDLE
                 eof = input.isEof();
                 completed = true;
             } finally {
@@ -379,7 +368,7 @@ public class SftpRemotePathChannel extends FileChannel {
                     this, position, count, copySize, totalRead, eof, target);
         }
 
-        return totalRead > 0L ? totalRead : eof ? -1L : 0L;
+        return (totalRead > 0L) ? totalRead : eof ? -1L : 0L;
     }
 
     @Override
@@ -400,7 +389,6 @@ public class SftpRemotePathChannel extends FileChannel {
         }
 
         boolean completed = false;
-        long curPos = (position >= 0L) ? position : posTracker.get();
         long totalRead = 0L;
         byte[] buffer = new byte[(int) Math.min(copySize, count)];
 
@@ -408,6 +396,8 @@ public class SftpRemotePathChannel extends FileChannel {
             try {
                 beginBlocking("transferFrom");
 
+                // DO NOT CLOSE THE OUTPUT STREAM AS IT WOULD CLOSE THE HANDLE
+                @SuppressWarnings("resource")
                 SftpOutputStreamAsync output = new SftpOutputStreamAsync(
                         (AbstractSftpClient) sftp,
                         copySize, getRemotePath(), handle);
@@ -417,7 +407,6 @@ public class SftpRemotePathChannel extends FileChannel {
                     int read = src.read(wrap);
                     if (read > 0) {
                         output.write(buffer, 0, read);
-                        curPos += read;
                         totalRead += read;
                     } else {
                         break;
@@ -441,8 +430,7 @@ public class SftpRemotePathChannel extends FileChannel {
     @Override
     public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException {
         throw new UnsupportedOperationException(
-                "map(" + getRemotePath() + ")"
-                                                + "[" + mode + "," + position + "," + size + "] N/A");
+                "map(" + getRemotePath() + ")[" + mode + "," + position + "," + size + "] N/A");
     }
 
     @Override
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/FileHandle.java b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/FileHandle.java
index 6676eb4..9e4237d 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/FileHandle.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/FileHandle.java
@@ -110,6 +110,7 @@ public class FileHandle extends Handle {
         return read(data, 0, data.length, offset);
     }
 
+    @SuppressWarnings("resource")
     public int read(byte[] data, int doff, int length, long offset) throws IOException {
         SeekableByteChannel channel = getFileChannel();
         channel = channel.position(offset);


[mina-sshd] 02/02: [SSHD-979] Ignore by default any data written to error stream of SFTP subsystem client channel

Posted by lg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lgoldstein pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git

commit 0da9e23c693323fe1d4f42ee702f0c2d8a1bd534
Author: Lyor Goldstein <lg...@apache.org>
AuthorDate: Thu Apr 23 21:32:35 2020 +0300

    [SSHD-979] Ignore by default any data written to error stream of SFTP subsystem client channel
---
 .../sshd/client/subsystem/sftp/impl/DefaultSftpClient.java       | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java
index cf00605..136503d 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java
@@ -18,7 +18,6 @@
  */
 package org.apache.sshd.client.subsystem.sftp.impl;
 
-import java.io.ByteArrayOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -61,6 +60,7 @@ import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.io.NullOutputStream;
 
 /**
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
@@ -589,8 +589,11 @@ public class DefaultSftpClient extends AbstractSftpClient {
         }
 
         protected OutputStream createErrOutputStream(Session session) {
-            // TODO use some limit in case some data is constantly written to this stream
-            return new ByteArrayOutputStream();
+            /*
+             * The protocol does not specify how to handle such data but we are lenient and ignore it - similar to
+             * /dev/null
+             */
+            return new NullOutputStream();
         }
     }
 }