You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2021/03/20 08:03:50 UTC
[mina-sshd] 01/02: [SSHD-1123] Add option to chunk data in
ChannelAsyncOutputStream if window size is smaller than packet size
This is an automated email from the ASF dual-hosted git repository.
lgoldstein pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
commit 098760248c37e628723529c2c49c5a353606ff23
Author: =?UTF-8?q?Achim=20H=C3=BCgen?= <ac...@deutschepost.de>
AuthorDate: Sat Mar 20 09:07:58 2021 +0200
[SSHD-1123] Add option to chunk data in ChannelAsyncOutputStream if window size is smaller than packet size
---
.../common/channel/ChannelAsyncOutputStream.java | 48 ++++++--
.../apache/sshd/server/channel/ChannelSession.java | 19 ++-
.../channel/ChannelAsyncOutputStreamTest.java | 131 +++++++++++++++++++++
3 files changed, 186 insertions(+), 12 deletions(-)
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
index 8d1701f..685d79e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
@@ -40,9 +40,22 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
private final byte cmd;
private final AtomicReference<IoWriteFutureImpl> pendingWrite = new AtomicReference<>();
private final Object packetWriteId;
+ private boolean sendChunkIfRemoteWindowIsSmallerThanPacketSize;
public ChannelAsyncOutputStream(Channel channel, byte cmd) {
+ this(channel, cmd, false);
+ }
+
+ /**
+ * @param sendChunkIfRemoteWindowIsSmallerThanPacketSize Determines the chunking behaviour, if the remote window
+ * size is smaller than the packet size. Can be use to
+ * establish compatibility with certain clients, that wait
+ * until the window size is 0 before adjusting it (see
+ * SSHD-1123). Default is false;
+ */
+ public ChannelAsyncOutputStream(Channel channel, byte cmd, boolean sendChunkIfRemoteWindowIsSmallerThanPacketSize) {
this.channelInstance = Objects.requireNonNull(channel, "No channel");
+ this.sendChunkIfRemoteWindowIsSmallerThanPacketSize = sendChunkIfRemoteWindowIsSmallerThanPacketSize;
this.packetWriter = channelInstance.resolveChannelStreamWriter(channel, cmd);
this.cmd = cmd;
this.packetWriteId = channel.toString() + "[" + SshConstants.getCommandMessageName(cmd) + "]";
@@ -113,15 +126,21 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
// send the first chunk as we have enough space in the window
length = packetSize;
} else {
- // do not chunk when the window is smaller than the packet size
- length = 0;
- // do a defensive copy in case the user reuses the buffer
- IoWriteFutureImpl f = new IoWriteFutureImpl(future.getId(), new ByteArrayBuffer(buffer.getCompactData()));
- f.addListener(w -> future.setValue(w.getException() != null ? w.getException() : w.isWritten()));
- pendingWrite.set(f);
- if (log.isTraceEnabled()) {
- log.trace("doWriteIfPossible({})[resume={}] waiting for window space {}",
- this, resume, remoteWindowSize);
+ // Window size is even smaller than packet size. Determine how to handle this.
+ if (isSendChunkIfRemoteWindowIsSmallerThanPacketSize()) {
+ length = remoteWindowSize;
+ } else {
+ // do not chunk when the window is smaller than the packet size
+ length = 0L;
+ // do a defensive copy in case the user reuses the buffer
+ IoWriteFutureImpl f
+ = new IoWriteFutureImpl(future.getId(), new ByteArrayBuffer(buffer.getCompactData()));
+ f.addListener(w -> future.setValue(w.getException() != null ? w.getException() : w.isWritten()));
+ pendingWrite.set(f);
+ if (log.isTraceEnabled()) {
+ log.trace("doWriteIfPossible({})[resume={}] waiting for window space {}",
+ this, resume, remoteWindowSize);
+ }
}
}
} else if (total > packetSize) {
@@ -147,7 +166,7 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
}
}
- if (length > 0) {
+ if (length > 0L) {
if (resume) {
if (log.isDebugEnabled()) {
log.debug("Resuming {} write due to more space ({}) available in the remote window", this, length);
@@ -229,4 +248,13 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
public String toString() {
return getClass().getSimpleName() + "[" + getChannel() + "] cmd=" + SshConstants.getCommandMessageName(cmd & 0xFF);
}
+
+ public boolean isSendChunkIfRemoteWindowIsSmallerThanPacketSize() {
+ return sendChunkIfRemoteWindowIsSmallerThanPacketSize;
+ }
+
+ public void setSendChunkIfRemoteWindowIsSmallerThanPacketSize(boolean sendChunkIfRemoteWindowIsSmallerThanPacketSize) {
+ this.sendChunkIfRemoteWindowIsSmallerThanPacketSize = sendChunkIfRemoteWindowIsSmallerThanPacketSize;
+ }
+
}
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
index adae173..93821eb 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
@@ -720,8 +720,12 @@ public class ChannelSession extends AbstractServerChannel {
}
// If the shell wants to use non-blocking io
if (command instanceof AsyncCommandStreamsAware) {
- asyncOut = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA);
- asyncErr = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA);
+ asyncOut = new ChannelAsyncOutputStream(
+ this, SshConstants.SSH_MSG_CHANNEL_DATA,
+ isSendChunkIfRemoteWindowIsSmallerThanPacketSize());
+ asyncErr = new ChannelAsyncOutputStream(
+ this, SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA,
+ isSendChunkIfRemoteWindowIsSmallerThanPacketSize());
((AsyncCommandStreamsAware) command).setIoOutputStream(asyncOut);
((AsyncCommandStreamsAware) command).setIoErrorStream(asyncErr);
} else {
@@ -914,4 +918,15 @@ public class ChannelSession extends AbstractServerChannel {
commandExitFuture.setClosed();
}
}
+
+ /**
+ * Chance for specializations to vary chunking behaviour depending on the SFTP client version.
+ *
+ * @return {@code true} if chunk data sent via {@link ChannelAsyncOutputStream} when reported remote window size is
+ * less than its packet size
+ * @see ChannelAsyncOutputStream#ChannelAsyncOutputStream(Channel, byte, boolean)
+ */
+ protected boolean isSendChunkIfRemoteWindowIsSmallerThanPacketSize() {
+ return false;
+ }
}
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/channel/ChannelAsyncOutputStreamTest.java b/sshd-core/src/test/java/org/apache/sshd/common/channel/ChannelAsyncOutputStreamTest.java
new file mode 100644
index 0000000..34228d8
--- /dev/null
+++ b/sshd-core/src/test/java/org/apache/sshd/common/channel/ChannelAsyncOutputStreamTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.channel;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.sshd.common.PropertyResolver;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriter;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.session.Session;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.util.test.BaseTestSupport;
+import org.apache.sshd.util.test.NoIoTestCase;
+import org.junit.Before;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.MethodSorters;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+/**
+ * Tests the behaviour of {@link ChannelAsyncOutputStream} regarding the chunking of the data to sent.
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+@Category({ NoIoTestCase.class })
+public class ChannelAsyncOutputStreamTest extends BaseTestSupport {
+
+ private static final String CLIENT_WITH_COMPATIBILITY_ISSUE = "specialClient";
+ private Window remoteWindow;
+ private ChannelStreamWriter channelStreamWriter;
+ private AbstractChannel channel;
+ private Session session;
+ private IoWriteFuture ioWriteFuture;
+
+ public ChannelAsyncOutputStreamTest() {
+ super();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ channel = Mockito.mock(AbstractChannel.class);
+ channelStreamWriter = Mockito.mock(ChannelStreamWriter.class);
+ remoteWindow = new Window(channel, null, true, true);
+ ioWriteFuture = Mockito.mock(IoWriteFuture.class);
+ session = Mockito.mock(Session.class);
+
+ Mockito.when(channel.getRemoteWindow()).thenReturn(remoteWindow);
+ Mockito.when(channel.getSession()).thenReturn(session);
+
+ Mockito.when(channel.resolveChannelStreamWriter(ArgumentMatchers.any(Channel.class), ArgumentMatchers.anyByte()))
+ .thenReturn(channelStreamWriter);
+ Mockito.when(channelStreamWriter.writeData(ArgumentMatchers.any())).thenReturn(ioWriteFuture);
+
+ Mockito.when(session.createBuffer(ArgumentMatchers.anyByte(), ArgumentMatchers.anyInt()))
+ .thenReturn(new ByteArrayBuffer());
+
+ Mockito.when(session.getClientVersion()).thenReturn(CLIENT_WITH_COMPATIBILITY_ISSUE);
+
+ }
+
+ @Test
+ public void testCompleteDataSentIfDataFitsIntoPacketAndPacketFitsInRemoteWindow() throws IOException {
+ ChannelAsyncOutputStream channelAsyncOutputStream = new ChannelAsyncOutputStream(channel, (byte) 0);
+ checkChangeOfRemoteWindowSizeOnBufferWrite(channelAsyncOutputStream, 40000, 32000, 30000, 40000 - 30000);
+ }
+
+ /*
+ * Only partial Data of packet size should be sent if data is larger than packet size and packet size fits into
+ * remote window
+ */
+ @Test
+ public void testChunkOfPacketSizeSentIfDataLargerThanPacketSizeAndPacketFitsInRemoteWindow() throws IOException {
+ ChannelAsyncOutputStream channelAsyncOutputStream = new ChannelAsyncOutputStream(channel, (byte) 0);
+ checkChangeOfRemoteWindowSizeOnBufferWrite(channelAsyncOutputStream, 40000, 32000, 35000, 40000 - 32000);
+ }
+
+ @Test
+ public void testChunkOfPacketSizeSentIfDataLargerThanRemoteWindowAndPacketFitsInRemoteWindow() throws IOException {
+ ChannelAsyncOutputStream channelAsyncOutputStream = new ChannelAsyncOutputStream(channel, (byte) 0);
+ checkChangeOfRemoteWindowSizeOnBufferWrite(channelAsyncOutputStream, 40000, 32000, 50000, 40000 - 32000);
+ }
+
+ @Test
+ public void testNoChunkingIfRemoteWindowSmallerThanPacketSize() throws IOException {
+ ChannelAsyncOutputStream channelAsyncOutputStream = new ChannelAsyncOutputStream(channel, (byte) 0);
+ checkChangeOfRemoteWindowSizeOnBufferWrite(channelAsyncOutputStream, 30000, 32000, 50000, 30000);
+ }
+
+ @Test
+ public void testChunkingIfRemoteWindowSmallerThanPacketSize() throws IOException {
+ ChannelAsyncOutputStream channelAsyncOutputStream = new ChannelAsyncOutputStream(channel, (byte) 0, true);
+ checkChangeOfRemoteWindowSizeOnBufferWrite(channelAsyncOutputStream, 30000, 32000, 50000, 0);
+ }
+
+ private void checkChangeOfRemoteWindowSizeOnBufferWrite(
+ ChannelAsyncOutputStream channelAsyncOutputStream, int initialWindowSize, int packetSize, int totalDataToSent,
+ int expectedWindowSize)
+ throws IOException {
+
+ remoteWindow.init(initialWindowSize, packetSize, PropertyResolver.EMPTY);
+ Buffer buffer = createBuffer(totalDataToSent);
+ channelAsyncOutputStream.writeBuffer(buffer);
+
+ assertEquals(expectedWindowSize, remoteWindow.getSize());
+ }
+
+ private ByteArrayBuffer createBuffer(int size) {
+ byte[] randomBytes = new byte[size];
+ new Random().nextBytes(randomBytes);
+ return new ByteArrayBuffer(randomBytes);
+ }
+}