You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2021/03/20 08:03:49 UTC

[mina-sshd] branch master updated (898939d -> f58f006)

This is an automated email from the ASF dual-hosted git repository.

lgoldstein pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git.


    from 898939d  [SSHD-1146] Add missing Import-Package header in sshd-osgi
     new 0987602  [SSHD-1123] Add option to chunk data in ChannelAsyncOutputStream if window size is smaller than packet size
     new f58f006  [SSHD-1123] Provide configurable behavior for ChannelAsyncOutputStream chunking behavior

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.md                                         |   1 +
 .../common/channel/ChannelAsyncOutputStream.java   |  70 +++++++++--
 .../sshd/common/channel/ChannelOutputStream.java   |  24 ++--
 .../org/apache/sshd/core/CoreModuleProperties.java |  18 +++
 .../apache/sshd/server/channel/ChannelSession.java |  30 ++++-
 .../channel/ChannelAsyncOutputStreamTest.java      | 131 +++++++++++++++++++++
 6 files changed, 254 insertions(+), 20 deletions(-)
 create mode 100644 sshd-core/src/test/java/org/apache/sshd/common/channel/ChannelAsyncOutputStreamTest.java

[mina-sshd] 01/02: [SSHD-1123] Add option to chunk data in ChannelAsyncOutputStream if window size is smaller than packet size

Posted by lg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lgoldstein pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git

commit 098760248c37e628723529c2c49c5a353606ff23
Author: =?UTF-8?q?Achim=20H=C3=BCgen?= <ac...@deutschepost.de>
AuthorDate: Sat Mar 20 09:07:58 2021 +0200

    [SSHD-1123] Add option to chunk data in ChannelAsyncOutputStream if window size is smaller than packet size
---
 .../common/channel/ChannelAsyncOutputStream.java   |  48 ++++++--
 .../apache/sshd/server/channel/ChannelSession.java |  19 ++-
 .../channel/ChannelAsyncOutputStreamTest.java      | 131 +++++++++++++++++++++
 3 files changed, 186 insertions(+), 12 deletions(-)

diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
index 8d1701f..685d79e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
@@ -40,9 +40,22 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
     private final byte cmd;
     private final AtomicReference<IoWriteFutureImpl> pendingWrite = new AtomicReference<>();
     private final Object packetWriteId;
+    private boolean sendChunkIfRemoteWindowIsSmallerThanPacketSize;
 
     public ChannelAsyncOutputStream(Channel channel, byte cmd) {
+        this(channel, cmd, false);
+    }
+
+    /**
+     * @param sendChunkIfRemoteWindowIsSmallerThanPacketSize Determines the chunking behaviour, if the remote window
+     *                                                       size is smaller than the packet size. Can be use to
+     *                                                       establish compatibility with certain clients, that wait
+     *                                                       until the window size is 0 before adjusting it (see
+     *                                                       SSHD-1123). Default is false;
+     */
+    public ChannelAsyncOutputStream(Channel channel, byte cmd, boolean sendChunkIfRemoteWindowIsSmallerThanPacketSize) {
         this.channelInstance = Objects.requireNonNull(channel, "No channel");
+        this.sendChunkIfRemoteWindowIsSmallerThanPacketSize = sendChunkIfRemoteWindowIsSmallerThanPacketSize;
         this.packetWriter = channelInstance.resolveChannelStreamWriter(channel, cmd);
         this.cmd = cmd;
         this.packetWriteId = channel.toString() + "[" + SshConstants.getCommandMessageName(cmd) + "]";
@@ -113,15 +126,21 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
                     // send the first chunk as we have enough space in the window
                     length = packetSize;
                 } else {
-                    // do not chunk when the window is smaller than the packet size
-                    length = 0;
-                    // do a defensive copy in case the user reuses the buffer
-                    IoWriteFutureImpl f = new IoWriteFutureImpl(future.getId(), new ByteArrayBuffer(buffer.getCompactData()));
-                    f.addListener(w -> future.setValue(w.getException() != null ? w.getException() : w.isWritten()));
-                    pendingWrite.set(f);
-                    if (log.isTraceEnabled()) {
-                        log.trace("doWriteIfPossible({})[resume={}] waiting for window space {}",
-                                this, resume, remoteWindowSize);
+                    // Window size is even smaller than packet size. Determine how to handle this.
+                    if (isSendChunkIfRemoteWindowIsSmallerThanPacketSize()) {
+                        length = remoteWindowSize;
+                    } else {
+                        // do not chunk when the window is smaller than the packet size
+                        length = 0L;
+                        // do a defensive copy in case the user reuses the buffer
+                        IoWriteFutureImpl f
+                                = new IoWriteFutureImpl(future.getId(), new ByteArrayBuffer(buffer.getCompactData()));
+                        f.addListener(w -> future.setValue(w.getException() != null ? w.getException() : w.isWritten()));
+                        pendingWrite.set(f);
+                        if (log.isTraceEnabled()) {
+                            log.trace("doWriteIfPossible({})[resume={}] waiting for window space {}",
+                                    this, resume, remoteWindowSize);
+                        }
                     }
                 }
             } else if (total > packetSize) {
@@ -147,7 +166,7 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
                 }
             }
 
-            if (length > 0) {
+            if (length > 0L) {
                 if (resume) {
                     if (log.isDebugEnabled()) {
                         log.debug("Resuming {} write due to more space ({}) available in the remote window", this, length);
@@ -229,4 +248,13 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
     public String toString() {
         return getClass().getSimpleName() + "[" + getChannel() + "] cmd=" + SshConstants.getCommandMessageName(cmd & 0xFF);
     }
+
+    public boolean isSendChunkIfRemoteWindowIsSmallerThanPacketSize() {
+        return sendChunkIfRemoteWindowIsSmallerThanPacketSize;
+    }
+
+    public void setSendChunkIfRemoteWindowIsSmallerThanPacketSize(boolean sendChunkIfRemoteWindowIsSmallerThanPacketSize) {
+        this.sendChunkIfRemoteWindowIsSmallerThanPacketSize = sendChunkIfRemoteWindowIsSmallerThanPacketSize;
+    }
+
 }
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
index adae173..93821eb 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
@@ -720,8 +720,12 @@ public class ChannelSession extends AbstractServerChannel {
         }
         // If the shell wants to use non-blocking io
         if (command instanceof AsyncCommandStreamsAware) {
-            asyncOut = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA);
-            asyncErr = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA);
+            asyncOut = new ChannelAsyncOutputStream(
+                    this, SshConstants.SSH_MSG_CHANNEL_DATA,
+                    isSendChunkIfRemoteWindowIsSmallerThanPacketSize());
+            asyncErr = new ChannelAsyncOutputStream(
+                    this, SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA,
+                    isSendChunkIfRemoteWindowIsSmallerThanPacketSize());
             ((AsyncCommandStreamsAware) command).setIoOutputStream(asyncOut);
             ((AsyncCommandStreamsAware) command).setIoErrorStream(asyncErr);
         } else {
@@ -914,4 +918,15 @@ public class ChannelSession extends AbstractServerChannel {
             commandExitFuture.setClosed();
         }
     }
+
+    /**
+     * Chance for specializations to vary chunking behaviour depending on the SFTP client version.
+     *
+     * @return {@code true} if chunk data sent via {@link ChannelAsyncOutputStream} when reported remote window size is
+     *         less than its packet size
+     * @see    ChannelAsyncOutputStream#ChannelAsyncOutputStream(Channel, byte, boolean)
+     */
+    protected boolean isSendChunkIfRemoteWindowIsSmallerThanPacketSize() {
+        return false;
+    }
 }
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/channel/ChannelAsyncOutputStreamTest.java b/sshd-core/src/test/java/org/apache/sshd/common/channel/ChannelAsyncOutputStreamTest.java
new file mode 100644
index 0000000..34228d8
--- /dev/null
+++ b/sshd-core/src/test/java/org/apache/sshd/common/channel/ChannelAsyncOutputStreamTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.channel;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.sshd.common.PropertyResolver;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriter;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.session.Session;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.util.test.BaseTestSupport;
+import org.apache.sshd.util.test.NoIoTestCase;
+import org.junit.Before;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.MethodSorters;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+/**
+ * Tests the behaviour of {@link ChannelAsyncOutputStream} regarding the chunking of the data to sent.
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+@Category({ NoIoTestCase.class })
+public class ChannelAsyncOutputStreamTest extends BaseTestSupport {
+
+    private static final String CLIENT_WITH_COMPATIBILITY_ISSUE = "specialClient";
+    private Window remoteWindow;
+    private ChannelStreamWriter channelStreamWriter;
+    private AbstractChannel channel;
+    private Session session;
+    private IoWriteFuture ioWriteFuture;
+
+    public ChannelAsyncOutputStreamTest() {
+        super();
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        channel = Mockito.mock(AbstractChannel.class);
+        channelStreamWriter = Mockito.mock(ChannelStreamWriter.class);
+        remoteWindow = new Window(channel, null, true, true);
+        ioWriteFuture = Mockito.mock(IoWriteFuture.class);
+        session = Mockito.mock(Session.class);
+
+        Mockito.when(channel.getRemoteWindow()).thenReturn(remoteWindow);
+        Mockito.when(channel.getSession()).thenReturn(session);
+
+        Mockito.when(channel.resolveChannelStreamWriter(ArgumentMatchers.any(Channel.class), ArgumentMatchers.anyByte()))
+                .thenReturn(channelStreamWriter);
+        Mockito.when(channelStreamWriter.writeData(ArgumentMatchers.any())).thenReturn(ioWriteFuture);
+
+        Mockito.when(session.createBuffer(ArgumentMatchers.anyByte(), ArgumentMatchers.anyInt()))
+                .thenReturn(new ByteArrayBuffer());
+
+        Mockito.when(session.getClientVersion()).thenReturn(CLIENT_WITH_COMPATIBILITY_ISSUE);
+
+    }
+
+    @Test
+    public void testCompleteDataSentIfDataFitsIntoPacketAndPacketFitsInRemoteWindow() throws IOException {
+        ChannelAsyncOutputStream channelAsyncOutputStream = new ChannelAsyncOutputStream(channel, (byte) 0);
+        checkChangeOfRemoteWindowSizeOnBufferWrite(channelAsyncOutputStream, 40000, 32000, 30000, 40000 - 30000);
+    }
+
+    /*
+     * Only partial Data of packet size should be sent if data is larger than packet size and packet size fits into
+     * remote window
+     */
+    @Test
+    public void testChunkOfPacketSizeSentIfDataLargerThanPacketSizeAndPacketFitsInRemoteWindow() throws IOException {
+        ChannelAsyncOutputStream channelAsyncOutputStream = new ChannelAsyncOutputStream(channel, (byte) 0);
+        checkChangeOfRemoteWindowSizeOnBufferWrite(channelAsyncOutputStream, 40000, 32000, 35000, 40000 - 32000);
+    }
+
+    @Test
+    public void testChunkOfPacketSizeSentIfDataLargerThanRemoteWindowAndPacketFitsInRemoteWindow() throws IOException {
+        ChannelAsyncOutputStream channelAsyncOutputStream = new ChannelAsyncOutputStream(channel, (byte) 0);
+        checkChangeOfRemoteWindowSizeOnBufferWrite(channelAsyncOutputStream, 40000, 32000, 50000, 40000 - 32000);
+    }
+
+    @Test
+    public void testNoChunkingIfRemoteWindowSmallerThanPacketSize() throws IOException {
+        ChannelAsyncOutputStream channelAsyncOutputStream = new ChannelAsyncOutputStream(channel, (byte) 0);
+        checkChangeOfRemoteWindowSizeOnBufferWrite(channelAsyncOutputStream, 30000, 32000, 50000, 30000);
+    }
+
+    @Test
+    public void testChunkingIfRemoteWindowSmallerThanPacketSize() throws IOException {
+        ChannelAsyncOutputStream channelAsyncOutputStream = new ChannelAsyncOutputStream(channel, (byte) 0, true);
+        checkChangeOfRemoteWindowSizeOnBufferWrite(channelAsyncOutputStream, 30000, 32000, 50000, 0);
+    }
+
+    private void checkChangeOfRemoteWindowSizeOnBufferWrite(
+            ChannelAsyncOutputStream channelAsyncOutputStream, int initialWindowSize, int packetSize, int totalDataToSent,
+            int expectedWindowSize)
+            throws IOException {
+
+        remoteWindow.init(initialWindowSize, packetSize, PropertyResolver.EMPTY);
+        Buffer buffer = createBuffer(totalDataToSent);
+        channelAsyncOutputStream.writeBuffer(buffer);
+
+        assertEquals(expectedWindowSize, remoteWindow.getSize());
+    }
+
+    private ByteArrayBuffer createBuffer(int size) {
+        byte[] randomBytes = new byte[size];
+        new Random().nextBytes(randomBytes);
+        return new ByteArrayBuffer(randomBytes);
+    }
+}

[mina-sshd] 02/02: [SSHD-1123] Provide configurable behavior for ChannelAsyncOutputStream chunking behavior

Posted by lg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lgoldstein pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git

commit f58f006bc416963893c42ca034f2a9b1dfd1fe19
Author: Lyor Goldstein <lg...@apache.org>
AuthorDate: Sat Mar 20 10:01:42 2021 +0200

    [SSHD-1123] Provide configurable behavior for ChannelAsyncOutputStream chunking behavior
---
 CHANGES.md                                         |  1 +
 .../common/channel/ChannelAsyncOutputStream.java   | 28 +++++++++++++++++++---
 .../sshd/common/channel/ChannelOutputStream.java   | 24 ++++++++++++-------
 .../org/apache/sshd/core/CoreModuleProperties.java | 18 ++++++++++++++
 .../apache/sshd/server/channel/ChannelSession.java | 25 +++++++++++++------
 5 files changed, 78 insertions(+), 18 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index a909dfd..8db2755 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -44,6 +44,7 @@
 * [SSHD-1114](https://issues.apache.org/jira/browse/SSHD-1114) Added callbacks for client-side host-based authentication progress
 * [SSHD-1114](https://issues.apache.org/jira/browse/SSHD-1114) Added capability for interactive password authentication participation via UserInteraction
 * [SSHD-1114](https://issues.apache.org/jira/browse/SSHD-1114) Added capability for interactive key based authentication participation via UserInteraction
+* [SSHD-1123](https://issues.apache.org/jira/browse/SSHD-1123) Add option to chunk data in ChannelAsyncOutputStream if window size is smaller than packet size
 * [SSHD-1125](https://issues.apache.org/jira/browse/SSHD-1125) Added mechanism to throttle pending write requests in BufferedIoOutputStream
 * [SSHD-1127](https://issues.apache.org/jira/browse/SSHD-1127) Added capability to register a custom receiver for SFTP STDERR channel raw or stream data
 * [SSHD-1133](https://issues.apache.org/jira/browse/SSHD-1133) Added capability to specify a custom charset for parsing incoming commands to the `ScpShell`
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
index 685d79e..eb2d5c5 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
@@ -42,16 +42,30 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
     private final Object packetWriteId;
     private boolean sendChunkIfRemoteWindowIsSmallerThanPacketSize;
 
+    /**
+     * @param channel The {@link Channel} through which the stream is communicating
+     * @param cmd     Either {@link SshConstants#SSH_MSG_CHANNEL_DATA SSH_MSG_CHANNEL_DATA} or
+     *                {@link SshConstants#SSH_MSG_CHANNEL_EXTENDED_DATA SSH_MSG_CHANNEL_EXTENDED_DATA} indicating the
+     *                output stream type
+     */
     public ChannelAsyncOutputStream(Channel channel, byte cmd) {
         this(channel, cmd, false);
     }
 
     /**
+     * @param channel                                        The {@link Channel} through which the stream is
+     *                                                       communicating
+     * @param cmd                                            Either {@link SshConstants#SSH_MSG_CHANNEL_DATA
+     *                                                       SSH_MSG_CHANNEL_DATA} or
+     *                                                       {@link SshConstants#SSH_MSG_CHANNEL_EXTENDED_DATA
+     *                                                       SSH_MSG_CHANNEL_EXTENDED_DATA} indicating the output stream
+     *                                                       type
      * @param sendChunkIfRemoteWindowIsSmallerThanPacketSize Determines the chunking behaviour, if the remote window
      *                                                       size is smaller than the packet size. Can be use to
      *                                                       establish compatibility with certain clients, that wait
-     *                                                       until the window size is 0 before adjusting it (see
-     *                                                       SSHD-1123). Default is false;
+     *                                                       until the window size is 0 before adjusting it.
+     * @see                                                  <A HREF=
+     *                                                       "https://issues.apache.org/jira/browse/SSHD-1123">SSHD-1123</A>
      */
     public ChannelAsyncOutputStream(Channel channel, byte cmd, boolean sendChunkIfRemoteWindowIsSmallerThanPacketSize) {
         this.channelInstance = Objects.requireNonNull(channel, "No channel");
@@ -66,6 +80,15 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
         return channelInstance;
     }
 
+    /**
+     * @return Either {@link SshConstants#SSH_MSG_CHANNEL_DATA SSH_MSG_CHANNEL_DATA} or
+     *         {@link SshConstants#SSH_MSG_CHANNEL_EXTENDED_DATA SSH_MSG_CHANNEL_EXTENDED_DATA} indicating the output
+     *         stream type
+     */
+    public byte getCommandType() {
+        return cmd;
+    }
+
     public void onWindowExpanded() throws IOException {
         doWriteIfPossible(true);
     }
@@ -256,5 +279,4 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
     public void setSendChunkIfRemoteWindowIsSmallerThanPacketSize(boolean sendChunkIfRemoteWindowIsSmallerThanPacketSize) {
         this.sendChunkIfRemoteWindowIsSmallerThanPacketSize = sendChunkIfRemoteWindowIsSmallerThanPacketSize;
     }
-
 }
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
index 42f4e28..2937c7d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
@@ -42,12 +42,12 @@ import org.slf4j.Logger;
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
 public class ChannelOutputStream extends OutputStream implements java.nio.channels.Channel, ChannelHolder {
+    protected final Logger log;
 
     private final AbstractChannel channelInstance;
     private final ChannelStreamWriter packetWriter;
     private final Window remoteWindow;
     private final Duration maxWaitTimeout;
-    private final Logger log;
     private final byte cmd;
     private final boolean eofOnClose;
     private final byte[] b = new byte[1];
@@ -79,8 +79,7 @@ public class ChannelOutputStream extends OutputStream implements java.nio.channe
         this.packetWriter = channelInstance.resolveChannelStreamWriter(channel, cmd);
         this.remoteWindow = Objects.requireNonNull(remoteWindow, "No remote window");
         Objects.requireNonNull(maxWaitTimeout, "No maxWaitTimeout");
-        ValidateUtils.checkTrue(GenericUtils.isPositive(maxWaitTimeout), "Non-positive max. wait time: %s",
-                maxWaitTimeout.toString());
+        ValidateUtils.checkTrue(GenericUtils.isPositive(maxWaitTimeout), "Non-positive max. wait time: %s", maxWaitTimeout);
         this.maxWaitTimeout = maxWaitTimeout;
         this.log = Objects.requireNonNull(log, "No logger");
         this.cmd = cmd;
@@ -93,18 +92,27 @@ public class ChannelOutputStream extends OutputStream implements java.nio.channe
         return channelInstance;
     }
 
-    public boolean isEofOnClose() {
-        return eofOnClose;
+    /**
+     * @return Either {@link SshConstants#SSH_MSG_CHANNEL_DATA SSH_MSG_CHANNEL_DATA} or
+     *         {@link SshConstants#SSH_MSG_CHANNEL_EXTENDED_DATA SSH_MSG_CHANNEL_EXTENDED_DATA} indicating the output
+     *         stream type
+     */
+    public byte getCommandType() {
+        return cmd;
     }
 
-    public void setNoDelay(boolean noDelay) {
-        this.noDelay = noDelay;
+    public boolean isEofOnClose() {
+        return eofOnClose;
     }
 
     public boolean isNoDelay() {
         return noDelay;
     }
 
+    public void setNoDelay(boolean noDelay) {
+        this.noDelay = noDelay;
+    }
+
     @Override
     public boolean isOpen() {
         return !closedState.get();
@@ -185,7 +193,7 @@ public class ChannelOutputStream extends OutputStream implements java.nio.channe
 
     @Override
     public synchronized void flush() throws IOException {
-        AbstractChannel channel = getChannel();
+        Channel channel = getChannel();
         if (!isOpen()) {
             throw new SshChannelClosedException(
                     channel.getId(),
diff --git a/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java b/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java
index 88d9724..062166d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java
+++ b/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java
@@ -222,6 +222,24 @@ public final class CoreModuleProperties {
     public static final Property<Boolean> REQUEST_SUBSYSTEM_REPLY
             = Property.bool("channel-subsystem-want-reply", true);
 
+    /**
+     * If should chunk data sent via {@code ChannelAsyncOutputStream} when reported remote STDOUT stream window size is
+     * less than its packet size
+     *
+     * @see <A HREF="https://issues.apache.org/jira/browse/SSHD-1123">SSHD-1123</A>
+     */
+    public static final Property<Boolean> ASYNC_SERVER_STDOUT_CHUNK_BELOW_WINDOW_SIZE
+            = Property.bool("server-async-stdout-chunk-below-window-size", false);
+
+    /**
+     * If should chunk data sent via {@code ChannelAsyncOutputStream} when reported remote STDERR stream window size is
+     * less than its packet size
+     *
+     * @see <A HREF="https://issues.apache.org/jira/browse/SSHD-1123">SSHD-1123</A>
+     */
+    public static final Property<Boolean> ASYNC_SERVER_STDERR_CHUNK_BELOW_WINDOW_SIZE
+            = Property.bool("server-async-stderr-chunk-below-window-size", false);
+
     public static final Property<Integer> PROP_DHGEX_CLIENT_MIN_KEY
             = Property.integer("dhgex-client-min");
 
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
index 93821eb..963a14b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
@@ -722,10 +722,10 @@ public class ChannelSession extends AbstractServerChannel {
         if (command instanceof AsyncCommandStreamsAware) {
             asyncOut = new ChannelAsyncOutputStream(
                     this, SshConstants.SSH_MSG_CHANNEL_DATA,
-                    isSendChunkIfRemoteWindowIsSmallerThanPacketSize());
+                    isSendChunkIfRemoteWindowIsSmallerThanPacketSize(SshConstants.SSH_MSG_CHANNEL_DATA));
             asyncErr = new ChannelAsyncOutputStream(
                     this, SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA,
-                    isSendChunkIfRemoteWindowIsSmallerThanPacketSize());
+                    isSendChunkIfRemoteWindowIsSmallerThanPacketSize(SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA));
             ((AsyncCommandStreamsAware) command).setIoOutputStream(asyncOut);
             ((AsyncCommandStreamsAware) command).setIoErrorStream(asyncErr);
         } else {
@@ -922,11 +922,22 @@ public class ChannelSession extends AbstractServerChannel {
     /**
      * Chance for specializations to vary chunking behaviour depending on the SFTP client version.
      *
-     * @return {@code true} if chunk data sent via {@link ChannelAsyncOutputStream} when reported remote window size is
-     *         less than its packet size
-     * @see    ChannelAsyncOutputStream#ChannelAsyncOutputStream(Channel, byte, boolean)
+     * @param  cmd                           Either {@link SshConstants#SSH_MSG_CHANNEL_DATA SSH_MSG_CHANNEL_DATA} or
+     *                                       {@link SshConstants#SSH_MSG_CHANNEL_EXTENDED_DATA
+     *                                       SSH_MSG_CHANNEL_EXTENDED_DATA} indicating the output stream type
+     * @return                               {@code true} if should chunk data sent via {@link ChannelAsyncOutputStream}
+     *                                       when reported remote window size is less than its packet size
+     * @see                                  ChannelAsyncOutputStream#ChannelAsyncOutputStream(Channel, byte, boolean)
+     * @throws UnsupportedOperationException if the command is neither of the supported ones
      */
-    protected boolean isSendChunkIfRemoteWindowIsSmallerThanPacketSize() {
-        return false;
+    protected boolean isSendChunkIfRemoteWindowIsSmallerThanPacketSize(byte cmd) {
+        if (cmd == SshConstants.SSH_MSG_CHANNEL_DATA) {
+            return CoreModuleProperties.ASYNC_SERVER_STDOUT_CHUNK_BELOW_WINDOW_SIZE.getRequired(this);
+        } else if (cmd == SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA) {
+            return CoreModuleProperties.ASYNC_SERVER_STDERR_CHUNK_BELOW_WINDOW_SIZE.getRequired(this);
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unsupported channel data stream command: " + SshConstants.getCommandMessageName(cmd & 0xFF));
+        }
     }
 }