You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2021/03/20 08:03:51 UTC
[mina-sshd] 02/02: [SSHD-1123] Provide configurable behavior for
ChannelAsyncOutputStream chunking behavior
This is an automated email from the ASF dual-hosted git repository.
lgoldstein pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
commit f58f006bc416963893c42ca034f2a9b1dfd1fe19
Author: Lyor Goldstein <lg...@apache.org>
AuthorDate: Sat Mar 20 10:01:42 2021 +0200
[SSHD-1123] Provide configurable behavior for ChannelAsyncOutputStream chunking behavior
---
CHANGES.md | 1 +
.../common/channel/ChannelAsyncOutputStream.java | 28 +++++++++++++++++++---
.../sshd/common/channel/ChannelOutputStream.java | 24 ++++++++++++-------
.../org/apache/sshd/core/CoreModuleProperties.java | 18 ++++++++++++++
.../apache/sshd/server/channel/ChannelSession.java | 25 +++++++++++++------
5 files changed, 78 insertions(+), 18 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index a909dfd..8db2755 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -44,6 +44,7 @@
* [SSHD-1114](https://issues.apache.org/jira/browse/SSHD-1114) Added callbacks for client-side host-based authentication progress
* [SSHD-1114](https://issues.apache.org/jira/browse/SSHD-1114) Added capability for interactive password authentication participation via UserInteraction
* [SSHD-1114](https://issues.apache.org/jira/browse/SSHD-1114) Added capability for interactive key based authentication participation via UserInteraction
+* [SSHD-1123](https://issues.apache.org/jira/browse/SSHD-1123) Add option to chunk data in ChannelAsyncOutputStream if window size is smaller than packet size
* [SSHD-1125](https://issues.apache.org/jira/browse/SSHD-1125) Added mechanism to throttle pending write requests in BufferedIoOutputStream
* [SSHD-1127](https://issues.apache.org/jira/browse/SSHD-1127) Added capability to register a custom receiver for SFTP STDERR channel raw or stream data
* [SSHD-1133](https://issues.apache.org/jira/browse/SSHD-1133) Added capability to specify a custom charset for parsing incoming commands to the `ScpShell`
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
index 685d79e..eb2d5c5 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
@@ -42,16 +42,30 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
private final Object packetWriteId;
private boolean sendChunkIfRemoteWindowIsSmallerThanPacketSize;
+ /**
+ * @param channel The {@link Channel} through which the stream is communicating
+ * @param cmd Either {@link SshConstants#SSH_MSG_CHANNEL_DATA SSH_MSG_CHANNEL_DATA} or
+ * {@link SshConstants#SSH_MSG_CHANNEL_EXTENDED_DATA SSH_MSG_CHANNEL_EXTENDED_DATA} indicating the
+ * output stream type
+ */
public ChannelAsyncOutputStream(Channel channel, byte cmd) {
this(channel, cmd, false);
}
/**
+ * @param channel The {@link Channel} through which the stream is
+ * communicating
+ * @param cmd Either {@link SshConstants#SSH_MSG_CHANNEL_DATA
+ * SSH_MSG_CHANNEL_DATA} or
+ * {@link SshConstants#SSH_MSG_CHANNEL_EXTENDED_DATA
+ * SSH_MSG_CHANNEL_EXTENDED_DATA} indicating the output stream
+ * type
* @param sendChunkIfRemoteWindowIsSmallerThanPacketSize Determines the chunking behaviour, if the remote window
* size is smaller than the packet size. Can be use to
* establish compatibility with certain clients, that wait
- * until the window size is 0 before adjusting it (see
- * SSHD-1123). Default is false;
+ * until the window size is 0 before adjusting it.
+ * @see <A HREF=
+ * "https://issues.apache.org/jira/browse/SSHD-1123">SSHD-1123</A>
*/
public ChannelAsyncOutputStream(Channel channel, byte cmd, boolean sendChunkIfRemoteWindowIsSmallerThanPacketSize) {
this.channelInstance = Objects.requireNonNull(channel, "No channel");
@@ -66,6 +80,15 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
return channelInstance;
}
+ /**
+ * @return Either {@link SshConstants#SSH_MSG_CHANNEL_DATA SSH_MSG_CHANNEL_DATA} or
+ * {@link SshConstants#SSH_MSG_CHANNEL_EXTENDED_DATA SSH_MSG_CHANNEL_EXTENDED_DATA} indicating the output
+ * stream type
+ */
+ public byte getCommandType() {
+ return cmd;
+ }
+
public void onWindowExpanded() throws IOException {
doWriteIfPossible(true);
}
@@ -256,5 +279,4 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
public void setSendChunkIfRemoteWindowIsSmallerThanPacketSize(boolean sendChunkIfRemoteWindowIsSmallerThanPacketSize) {
this.sendChunkIfRemoteWindowIsSmallerThanPacketSize = sendChunkIfRemoteWindowIsSmallerThanPacketSize;
}
-
}
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
index 42f4e28..2937c7d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
@@ -42,12 +42,12 @@ import org.slf4j.Logger;
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
public class ChannelOutputStream extends OutputStream implements java.nio.channels.Channel, ChannelHolder {
+ protected final Logger log;
private final AbstractChannel channelInstance;
private final ChannelStreamWriter packetWriter;
private final Window remoteWindow;
private final Duration maxWaitTimeout;
- private final Logger log;
private final byte cmd;
private final boolean eofOnClose;
private final byte[] b = new byte[1];
@@ -79,8 +79,7 @@ public class ChannelOutputStream extends OutputStream implements java.nio.channe
this.packetWriter = channelInstance.resolveChannelStreamWriter(channel, cmd);
this.remoteWindow = Objects.requireNonNull(remoteWindow, "No remote window");
Objects.requireNonNull(maxWaitTimeout, "No maxWaitTimeout");
- ValidateUtils.checkTrue(GenericUtils.isPositive(maxWaitTimeout), "Non-positive max. wait time: %s",
- maxWaitTimeout.toString());
+ ValidateUtils.checkTrue(GenericUtils.isPositive(maxWaitTimeout), "Non-positive max. wait time: %s", maxWaitTimeout);
this.maxWaitTimeout = maxWaitTimeout;
this.log = Objects.requireNonNull(log, "No logger");
this.cmd = cmd;
@@ -93,18 +92,27 @@ public class ChannelOutputStream extends OutputStream implements java.nio.channe
return channelInstance;
}
- public boolean isEofOnClose() {
- return eofOnClose;
+ /**
+ * @return Either {@link SshConstants#SSH_MSG_CHANNEL_DATA SSH_MSG_CHANNEL_DATA} or
+ * {@link SshConstants#SSH_MSG_CHANNEL_EXTENDED_DATA SSH_MSG_CHANNEL_EXTENDED_DATA} indicating the output
+ * stream type
+ */
+ public byte getCommandType() {
+ return cmd;
}
- public void setNoDelay(boolean noDelay) {
- this.noDelay = noDelay;
+ public boolean isEofOnClose() {
+ return eofOnClose;
}
public boolean isNoDelay() {
return noDelay;
}
+ public void setNoDelay(boolean noDelay) {
+ this.noDelay = noDelay;
+ }
+
@Override
public boolean isOpen() {
return !closedState.get();
@@ -185,7 +193,7 @@ public class ChannelOutputStream extends OutputStream implements java.nio.channe
@Override
public synchronized void flush() throws IOException {
- AbstractChannel channel = getChannel();
+ Channel channel = getChannel();
if (!isOpen()) {
throw new SshChannelClosedException(
channel.getId(),
diff --git a/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java b/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java
index 88d9724..062166d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java
+++ b/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java
@@ -222,6 +222,24 @@ public final class CoreModuleProperties {
public static final Property<Boolean> REQUEST_SUBSYSTEM_REPLY
= Property.bool("channel-subsystem-want-reply", true);
+ /**
+ * If should chunk data sent via {@code ChannelAsyncOutputStream} when reported remote STDOUT stream window size is
+ * less than its packet size
+ *
+ * @see <A HREF="https://issues.apache.org/jira/browse/SSHD-1123">SSHD-1123</A>
+ */
+ public static final Property<Boolean> ASYNC_SERVER_STDOUT_CHUNK_BELOW_WINDOW_SIZE
+ = Property.bool("server-async-stdout-chunk-below-window-size", false);
+
+ /**
+ * If should chunk data sent via {@code ChannelAsyncOutputStream} when reported remote STDERR stream window size is
+ * less than its packet size
+ *
+ * @see <A HREF="https://issues.apache.org/jira/browse/SSHD-1123">SSHD-1123</A>
+ */
+ public static final Property<Boolean> ASYNC_SERVER_STDERR_CHUNK_BELOW_WINDOW_SIZE
+ = Property.bool("server-async-stderr-chunk-below-window-size", false);
+
public static final Property<Integer> PROP_DHGEX_CLIENT_MIN_KEY
= Property.integer("dhgex-client-min");
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
index 93821eb..963a14b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
@@ -722,10 +722,10 @@ public class ChannelSession extends AbstractServerChannel {
if (command instanceof AsyncCommandStreamsAware) {
asyncOut = new ChannelAsyncOutputStream(
this, SshConstants.SSH_MSG_CHANNEL_DATA,
- isSendChunkIfRemoteWindowIsSmallerThanPacketSize());
+ isSendChunkIfRemoteWindowIsSmallerThanPacketSize(SshConstants.SSH_MSG_CHANNEL_DATA));
asyncErr = new ChannelAsyncOutputStream(
this, SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA,
- isSendChunkIfRemoteWindowIsSmallerThanPacketSize());
+ isSendChunkIfRemoteWindowIsSmallerThanPacketSize(SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA));
((AsyncCommandStreamsAware) command).setIoOutputStream(asyncOut);
((AsyncCommandStreamsAware) command).setIoErrorStream(asyncErr);
} else {
@@ -922,11 +922,22 @@ public class ChannelSession extends AbstractServerChannel {
/**
* Chance for specializations to vary chunking behaviour depending on the SFTP client version.
*
- * @return {@code true} if chunk data sent via {@link ChannelAsyncOutputStream} when reported remote window size is
- * less than its packet size
- * @see ChannelAsyncOutputStream#ChannelAsyncOutputStream(Channel, byte, boolean)
+ * @param cmd Either {@link SshConstants#SSH_MSG_CHANNEL_DATA SSH_MSG_CHANNEL_DATA} or
+ * {@link SshConstants#SSH_MSG_CHANNEL_EXTENDED_DATA
+ * SSH_MSG_CHANNEL_EXTENDED_DATA} indicating the output stream type
+ * @return {@code true} if should chunk data sent via {@link ChannelAsyncOutputStream}
+ * when reported remote window size is less than its packet size
+ * @see ChannelAsyncOutputStream#ChannelAsyncOutputStream(Channel, byte, boolean)
+ * @throws UnsupportedOperationException if the command is neither of the supported ones
*/
- protected boolean isSendChunkIfRemoteWindowIsSmallerThanPacketSize() {
- return false;
+ protected boolean isSendChunkIfRemoteWindowIsSmallerThanPacketSize(byte cmd) {
+ if (cmd == SshConstants.SSH_MSG_CHANNEL_DATA) {
+ return CoreModuleProperties.ASYNC_SERVER_STDOUT_CHUNK_BELOW_WINDOW_SIZE.getRequired(this);
+ } else if (cmd == SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA) {
+ return CoreModuleProperties.ASYNC_SERVER_STDERR_CHUNK_BELOW_WINDOW_SIZE.getRequired(this);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported channel data stream command: " + SshConstants.getCommandMessageName(cmd & 0xFF));
+ }
}
}