You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2021/03/20 08:03:51 UTC

[mina-sshd] 02/02: [SSHD-1123] Provide configurable behavior for ChannelAsyncOutputStream chunking behavior

This is an automated email from the ASF dual-hosted git repository.

lgoldstein pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git

commit f58f006bc416963893c42ca034f2a9b1dfd1fe19
Author: Lyor Goldstein <lg...@apache.org>
AuthorDate: Sat Mar 20 10:01:42 2021 +0200

    [SSHD-1123] Provide configurable behavior for ChannelAsyncOutputStream chunking behavior
---
 CHANGES.md                                         |  1 +
 .../common/channel/ChannelAsyncOutputStream.java   | 28 +++++++++++++++++++---
 .../sshd/common/channel/ChannelOutputStream.java   | 24 ++++++++++++-------
 .../org/apache/sshd/core/CoreModuleProperties.java | 18 ++++++++++++++
 .../apache/sshd/server/channel/ChannelSession.java | 25 +++++++++++++------
 5 files changed, 78 insertions(+), 18 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index a909dfd..8db2755 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -44,6 +44,7 @@
 * [SSHD-1114](https://issues.apache.org/jira/browse/SSHD-1114) Added callbacks for client-side host-based authentication progress
 * [SSHD-1114](https://issues.apache.org/jira/browse/SSHD-1114) Added capability for interactive password authentication participation via UserInteraction
 * [SSHD-1114](https://issues.apache.org/jira/browse/SSHD-1114) Added capability for interactive key based authentication participation via UserInteraction
+* [SSHD-1123](https://issues.apache.org/jira/browse/SSHD-1123) Add option to chunk data in ChannelAsyncOutputStream if window size is smaller than packet size
 * [SSHD-1125](https://issues.apache.org/jira/browse/SSHD-1125) Added mechanism to throttle pending write requests in BufferedIoOutputStream
 * [SSHD-1127](https://issues.apache.org/jira/browse/SSHD-1127) Added capability to register a custom receiver for SFTP STDERR channel raw or stream data
 * [SSHD-1133](https://issues.apache.org/jira/browse/SSHD-1133) Added capability to specify a custom charset for parsing incoming commands to the `ScpShell`
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
index 685d79e..eb2d5c5 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
@@ -42,16 +42,30 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
     private final Object packetWriteId;
     private boolean sendChunkIfRemoteWindowIsSmallerThanPacketSize;
 
+    /**
+     * @param channel The {@link Channel} through which the stream is communicating
+     * @param cmd     Either {@link SshConstants#SSH_MSG_CHANNEL_DATA SSH_MSG_CHANNEL_DATA} or
+     *                {@link SshConstants#SSH_MSG_CHANNEL_EXTENDED_DATA SSH_MSG_CHANNEL_EXTENDED_DATA} indicating the
+     *                output stream type
+     */
     public ChannelAsyncOutputStream(Channel channel, byte cmd) {
         this(channel, cmd, false);
     }
 
     /**
+     * @param channel                                        The {@link Channel} through which the stream is
+     *                                                       communicating
+     * @param cmd                                            Either {@link SshConstants#SSH_MSG_CHANNEL_DATA
+     *                                                       SSH_MSG_CHANNEL_DATA} or
+     *                                                       {@link SshConstants#SSH_MSG_CHANNEL_EXTENDED_DATA
+     *                                                       SSH_MSG_CHANNEL_EXTENDED_DATA} indicating the output stream
+     *                                                       type
      * @param sendChunkIfRemoteWindowIsSmallerThanPacketSize Determines the chunking behaviour, if the remote window
      *                                                       size is smaller than the packet size. Can be use to
      *                                                       establish compatibility with certain clients, that wait
-     *                                                       until the window size is 0 before adjusting it (see
-     *                                                       SSHD-1123). Default is false;
+     *                                                       until the window size is 0 before adjusting it.
+     * @see                                                  <A HREF=
+     *                                                       "https://issues.apache.org/jira/browse/SSHD-1123">SSHD-1123</A>
      */
     public ChannelAsyncOutputStream(Channel channel, byte cmd, boolean sendChunkIfRemoteWindowIsSmallerThanPacketSize) {
         this.channelInstance = Objects.requireNonNull(channel, "No channel");
@@ -66,6 +80,15 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
         return channelInstance;
     }
 
+    /**
+     * @return Either {@link SshConstants#SSH_MSG_CHANNEL_DATA SSH_MSG_CHANNEL_DATA} or
+     *         {@link SshConstants#SSH_MSG_CHANNEL_EXTENDED_DATA SSH_MSG_CHANNEL_EXTENDED_DATA} indicating the output
+     *         stream type
+     */
+    public byte getCommandType() {
+        return cmd;
+    }
+
     public void onWindowExpanded() throws IOException {
         doWriteIfPossible(true);
     }
@@ -256,5 +279,4 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
     public void setSendChunkIfRemoteWindowIsSmallerThanPacketSize(boolean sendChunkIfRemoteWindowIsSmallerThanPacketSize) {
         this.sendChunkIfRemoteWindowIsSmallerThanPacketSize = sendChunkIfRemoteWindowIsSmallerThanPacketSize;
     }
-
 }
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
index 42f4e28..2937c7d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
@@ -42,12 +42,12 @@ import org.slf4j.Logger;
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
 public class ChannelOutputStream extends OutputStream implements java.nio.channels.Channel, ChannelHolder {
+    protected final Logger log;
 
     private final AbstractChannel channelInstance;
     private final ChannelStreamWriter packetWriter;
     private final Window remoteWindow;
     private final Duration maxWaitTimeout;
-    private final Logger log;
     private final byte cmd;
     private final boolean eofOnClose;
     private final byte[] b = new byte[1];
@@ -79,8 +79,7 @@ public class ChannelOutputStream extends OutputStream implements java.nio.channe
         this.packetWriter = channelInstance.resolveChannelStreamWriter(channel, cmd);
         this.remoteWindow = Objects.requireNonNull(remoteWindow, "No remote window");
         Objects.requireNonNull(maxWaitTimeout, "No maxWaitTimeout");
-        ValidateUtils.checkTrue(GenericUtils.isPositive(maxWaitTimeout), "Non-positive max. wait time: %s",
-                maxWaitTimeout.toString());
+        ValidateUtils.checkTrue(GenericUtils.isPositive(maxWaitTimeout), "Non-positive max. wait time: %s", maxWaitTimeout);
         this.maxWaitTimeout = maxWaitTimeout;
         this.log = Objects.requireNonNull(log, "No logger");
         this.cmd = cmd;
@@ -93,18 +92,27 @@ public class ChannelOutputStream extends OutputStream implements java.nio.channe
         return channelInstance;
     }
 
-    public boolean isEofOnClose() {
-        return eofOnClose;
+    /**
+     * @return Either {@link SshConstants#SSH_MSG_CHANNEL_DATA SSH_MSG_CHANNEL_DATA} or
+     *         {@link SshConstants#SSH_MSG_CHANNEL_EXTENDED_DATA SSH_MSG_CHANNEL_EXTENDED_DATA} indicating the output
+     *         stream type
+     */
+    public byte getCommandType() {
+        return cmd;
     }
 
-    public void setNoDelay(boolean noDelay) {
-        this.noDelay = noDelay;
+    public boolean isEofOnClose() {
+        return eofOnClose;
     }
 
     public boolean isNoDelay() {
         return noDelay;
     }
 
+    public void setNoDelay(boolean noDelay) {
+        this.noDelay = noDelay;
+    }
+
     @Override
     public boolean isOpen() {
         return !closedState.get();
@@ -185,7 +193,7 @@ public class ChannelOutputStream extends OutputStream implements java.nio.channe
 
     @Override
     public synchronized void flush() throws IOException {
-        AbstractChannel channel = getChannel();
+        Channel channel = getChannel();
         if (!isOpen()) {
             throw new SshChannelClosedException(
                     channel.getId(),
diff --git a/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java b/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java
index 88d9724..062166d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java
+++ b/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java
@@ -222,6 +222,24 @@ public final class CoreModuleProperties {
     public static final Property<Boolean> REQUEST_SUBSYSTEM_REPLY
             = Property.bool("channel-subsystem-want-reply", true);
 
+    /**
+     * If should chunk data sent via {@code ChannelAsyncOutputStream} when reported remote STDOUT stream window size is
+     * less than its packet size
+     *
+     * @see <A HREF="https://issues.apache.org/jira/browse/SSHD-1123">SSHD-1123</A>
+     */
+    public static final Property<Boolean> ASYNC_SERVER_STDOUT_CHUNK_BELOW_WINDOW_SIZE
+            = Property.bool("server-async-stdout-chunk-below-window-size", false);
+
+    /**
+     * If should chunk data sent via {@code ChannelAsyncOutputStream} when reported remote STDERR stream window size is
+     * less than its packet size
+     *
+     * @see <A HREF="https://issues.apache.org/jira/browse/SSHD-1123">SSHD-1123</A>
+     */
+    public static final Property<Boolean> ASYNC_SERVER_STDERR_CHUNK_BELOW_WINDOW_SIZE
+            = Property.bool("server-async-stderr-chunk-below-window-size", false);
+
     public static final Property<Integer> PROP_DHGEX_CLIENT_MIN_KEY
             = Property.integer("dhgex-client-min");
 
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
index 93821eb..963a14b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
@@ -722,10 +722,10 @@ public class ChannelSession extends AbstractServerChannel {
         if (command instanceof AsyncCommandStreamsAware) {
             asyncOut = new ChannelAsyncOutputStream(
                     this, SshConstants.SSH_MSG_CHANNEL_DATA,
-                    isSendChunkIfRemoteWindowIsSmallerThanPacketSize());
+                    isSendChunkIfRemoteWindowIsSmallerThanPacketSize(SshConstants.SSH_MSG_CHANNEL_DATA));
             asyncErr = new ChannelAsyncOutputStream(
                     this, SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA,
-                    isSendChunkIfRemoteWindowIsSmallerThanPacketSize());
+                    isSendChunkIfRemoteWindowIsSmallerThanPacketSize(SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA));
             ((AsyncCommandStreamsAware) command).setIoOutputStream(asyncOut);
             ((AsyncCommandStreamsAware) command).setIoErrorStream(asyncErr);
         } else {
@@ -922,11 +922,22 @@ public class ChannelSession extends AbstractServerChannel {
     /**
      * Chance for specializations to vary chunking behaviour depending on the SFTP client version.
      *
-     * @return {@code true} if chunk data sent via {@link ChannelAsyncOutputStream} when reported remote window size is
-     *         less than its packet size
-     * @see    ChannelAsyncOutputStream#ChannelAsyncOutputStream(Channel, byte, boolean)
+     * @param  cmd                           Either {@link SshConstants#SSH_MSG_CHANNEL_DATA SSH_MSG_CHANNEL_DATA} or
+     *                                       {@link SshConstants#SSH_MSG_CHANNEL_EXTENDED_DATA
+     *                                       SSH_MSG_CHANNEL_EXTENDED_DATA} indicating the output stream type
+     * @return                               {@code true} if should chunk data sent via {@link ChannelAsyncOutputStream}
+     *                                       when reported remote window size is less than its packet size
+     * @see                                  ChannelAsyncOutputStream#ChannelAsyncOutputStream(Channel, byte, boolean)
+     * @throws UnsupportedOperationException if the command is neither of the supported ones
      */
-    protected boolean isSendChunkIfRemoteWindowIsSmallerThanPacketSize() {
-        return false;
+    protected boolean isSendChunkIfRemoteWindowIsSmallerThanPacketSize(byte cmd) {
+        if (cmd == SshConstants.SSH_MSG_CHANNEL_DATA) {
+            return CoreModuleProperties.ASYNC_SERVER_STDOUT_CHUNK_BELOW_WINDOW_SIZE.getRequired(this);
+        } else if (cmd == SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA) {
+            return CoreModuleProperties.ASYNC_SERVER_STDERR_CHUNK_BELOW_WINDOW_SIZE.getRequired(this);
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unsupported channel data stream command: " + SshConstants.getCommandMessageName(cmd & 0xFF));
+        }
     }
 }