You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2020/09/22 09:39:02 UTC
[mina-sshd] branch master updated: [SSHD-1080] Rework the
PacketWriter to split according to the various semantics
This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
The following commit(s) were added to refs/heads/master by this push:
new 8e59075 [SSHD-1080] Rework the PacketWriter to split according to the various semantics
8e59075 is described below
commit 8e59075bc55c497e4d4056f457e4d657d3c61c1a
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Tue Sep 22 11:38:55 2020 +0200
[SSHD-1080] Rework the PacketWriter to split according to the various semantics
---
CHANGES.md | 1 +
.../org/apache/sshd/common/io/IoInputStream.java | 5 ++
.../org/apache/sshd/common/io/IoOutputStream.java | 22 +++++-
.../java/org/apache/sshd/common/io/IoSession.java | 30 ++++++--
.../org/apache/sshd/common/io/PacketWriter.java | 81 ----------------------
...ter.java => ThrottlingChannelStreamWriter.java} | 35 +++++-----
...java => ThrottlingChannelStreamWriterTest.java} | 37 +++++-----
.../java/org/apache/sshd/common/BaseBuilder.java | 8 +--
.../org/apache/sshd/common/FactoryManager.java | 4 +-
.../sshd/common/channel/AbstractChannel.java | 18 ++---
.../common/channel/BufferedIoOutputStream.java | 6 +-
.../org/apache/sshd/common/channel/Channel.java | 19 +++--
.../common/channel/ChannelAsyncOutputStream.java | 12 ++--
.../sshd/common/channel/ChannelOutputStream.java | 8 +--
.../sshd/common/channel/SimpleIoOutputStream.java | 5 +-
.../sshd/common/channel/StreamingChannel.java | 2 +-
.../channel/throttle/ChannelStreamWriter.java | 48 +++++++++++++
...olver.java => ChannelStreamWriterResolver.java} | 12 ++--
...ava => ChannelStreamWriterResolverManager.java} | 19 +++--
...anager.java => DefaultChannelStreamWriter.java} | 36 +++++++---
.../org/apache/sshd/common/forward/SocksProxy.java | 8 +--
.../sshd/common/forward/TcpipClientChannel.java | 2 +-
.../common/helpers/AbstractFactoryManager.java | 12 ++--
.../apache/sshd/common/io/nio2/Nio2Session.java | 4 +-
.../org/apache/sshd/common/session/Session.java | 34 +++++----
.../common/session/helpers/AbstractSession.java | 49 +++++++++++--
.../sshd/common/session/helpers/SessionHelper.java | 20 +++---
.../sshd/server/forward/TcpipServerChannel.java | 34 ++++-----
.../sshd/server/session/AbstractServerSession.java | 4 +-
.../sshd/server/x11/ChannelForwardedX11.java | 2 +-
.../src/test/java/org/apache/sshd/LoadTest.java | 4 --
.../java/org/apache/sshd/WindowAdjustTest.java | 2 +-
.../java/org/apache/sshd/client/ClientTest.java | 9 +--
.../org/apache/sshd/common/channel/WindowTest.java | 5 +-
.../common/forward/PortForwardingLoadTest.java | 5 +-
.../session/helpers/AbstractSessionTest.java | 2 +-
.../sshd/server/ServerProxyAcceptorTest.java | 2 +-
.../sshd/util/test/AsyncEchoShellFactory.java | 2 +-
.../org/apache/sshd/util/test/BogusChannel.java | 14 ++--
.../java/org/apache/sshd/mina/MinaSession.java | 2 +-
.../java/org/apache/sshd/netty/NettyIoSession.java | 2 +-
.../sshd/sftp/client/impl/DefaultSftpClient.java | 4 +-
.../org/apache/sshd/sftp/server/SftpSubsystem.java | 2 +-
43 files changed, 359 insertions(+), 273 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index fc2776d..23f04ef 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -17,6 +17,7 @@ or `-key-file` command line option.
* [SSHD-1034](https://issues.apache.org/jira/browse/SSHD-1034) Rename `org.apache.sshd.common.ForwardingFilter` to `Forwarder`.
* [SSHD-1035](https://issues.apache.org/jira/browse/SSHD-1035) Move property definitions to common locations.
* [SSHD-1038](https://issues.apache.org/jira/browse/SSHD-1038) Refactor packages from a module into a cleaner hierarchy.
+* [SSHD-1080](https://issues.apache.org/jira/browse/SSHD-1080) Rework the PacketWriter to split according to the various semantics
* [SSHD-1084](https://issues.apache.org/jira/browse/SSHD-1084) Revert the usage of asynchronous streams when forwarding ports.
## Minor code helpers
diff --git a/sshd-common/src/main/java/org/apache/sshd/common/io/IoInputStream.java b/sshd-common/src/main/java/org/apache/sshd/common/io/IoInputStream.java
index faa509c..8fb86e3 100644
--- a/sshd-common/src/main/java/org/apache/sshd/common/io/IoInputStream.java
+++ b/sshd-common/src/main/java/org/apache/sshd/common/io/IoInputStream.java
@@ -21,6 +21,11 @@ package org.apache.sshd.common.io;
import org.apache.sshd.common.Closeable;
import org.apache.sshd.common.util.buffer.Buffer;
+/**
+ * Represents a stream that can be read asynchronously.
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
public interface IoInputStream extends Closeable {
/**
* NOTE: the buffer must not be touched until the returned read future is completed.
diff --git a/sshd-common/src/main/java/org/apache/sshd/common/io/IoOutputStream.java b/sshd-common/src/main/java/org/apache/sshd/common/io/IoOutputStream.java
index e98e5f0..64b8876 100644
--- a/sshd-common/src/main/java/org/apache/sshd/common/io/IoOutputStream.java
+++ b/sshd-common/src/main/java/org/apache/sshd/common/io/IoOutputStream.java
@@ -18,8 +18,26 @@
*/
package org.apache.sshd.common.io;
+import java.io.IOException;
+
import org.apache.sshd.common.Closeable;
+import org.apache.sshd.common.util.buffer.Buffer;
+
+/**
+ * Represents a stream that can be written asynchronously.
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface IoOutputStream extends Closeable {
+
+ /**
+ * Write the given buffer.
+ *
+ * @param buffer the data to write. <B>NOTE:</B> the buffer must not be touched until the returned write
+ * future is completed.
+ * @return An {@code IoWriteFuture} that can be used to check when the data has actually been written.
+ * @throws IOException if an error occurred when writing the data
+ */
+ IoWriteFuture writeBuffer(Buffer buffer) throws IOException;
-public interface IoOutputStream extends Closeable, PacketWriter {
- // nothing extra
}
diff --git a/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java b/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java
index f8de2b4..76b0022 100644
--- a/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java
+++ b/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java
@@ -22,9 +22,11 @@ import java.io.IOException;
import java.net.SocketAddress;
import org.apache.sshd.common.Closeable;
+import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.net.ConnectionEndpointsIndicator;
-public interface IoSession extends ConnectionEndpointsIndicator, PacketWriter, Closeable {
+public interface IoSession extends ConnectionEndpointsIndicator, Closeable {
/**
* @return a unique identifier for this session. Every session has its own ID which is different from any other.
@@ -83,6 +85,27 @@ public interface IoSession extends ConnectionEndpointsIndicator, PacketWriter, C
Object removeAttribute(Object key);
/**
+ * Write a packet on the socket. Multiple writes can be issued concurrently and will be queued.
+ *
+ * @param buffer the buffer send. <B>NOTE:</B> the buffer must not be touched until the returned write future
+ * is completed.
+ * @return An {@code IoWriteFuture} that can be used to check when the packet has actually been sent
+ * @throws IOException if an error occurred when sending the packet
+ */
+ IoWriteFuture writeBuffer(Buffer buffer) throws IOException;
+
+ /**
+ * Closes this session immediately or after all queued write requests are flushed. This operation is asynchronous.
+ * Wait for the returned {@link CloseFuture} if you want to wait for the session actually closed.
+ *
+ * @param immediately {@code true} to close this session immediately. The pending write requests will simply be
+ * discarded. {@code false} to close this session after all queued write requests are flushed.
+ * @return The generated {@link CloseFuture}
+ */
+ @Override
+ CloseFuture close(boolean immediately);
+
+ /**
* @return the {@link IoService} that created this session.
*/
IoService getService();
@@ -97,9 +120,8 @@ public interface IoSession extends ConnectionEndpointsIndicator, PacketWriter, C
/**
* Suspend read operations on this session. May do nothing if not supported by the session implementation.
*
- * If the session usage includes a graceful shutdown with messages being exchanged, the caller needs to
- * take care of resuming reading the input in order to actually be able to carry on the conversation with
- * the peer.
+ * If the session usage includes a graceful shutdown with messages being exchanged, the caller needs to take care of
+ * resuming reading the input in order to actually be able to carry on the conversation with the peer.
*/
default void suspendRead() {
// Do nothing by default, but can be overriden by implementations
diff --git a/sshd-common/src/main/java/org/apache/sshd/common/io/PacketWriter.java b/sshd-common/src/main/java/org/apache/sshd/common/io/PacketWriter.java
deleted file mode 100644
index 0862728..0000000
--- a/sshd-common/src/main/java/org/apache/sshd/common/io/PacketWriter.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sshd.common.io;
-
-import java.io.IOException;
-import java.nio.channels.Channel;
-
-import org.apache.sshd.common.util.buffer.Buffer;
-
-/**
- * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
- */
-public interface PacketWriter extends Channel {
- /**
- * Encode and send the given buffer. <B>Note:</B> for session packets the buffer has to have 5 bytes free at the
- * beginning to allow the encoding to take place. Also, the write position of the buffer has to be set to the
- * position of the last byte to write.
- *
- * @param buffer the buffer to encode and send. <B>NOTE:</B> the buffer must not be touched until the returned
- * write future is completed.
- * @return An {@code IoWriteFuture} that can be used to check when the packet has actually been sent
- * @throws IOException if an error occurred when encoding sending the packet
- */
- IoWriteFuture writePacket(Buffer buffer) throws IOException;
-
- /**
- * @param len The packet payload size
- * @param blockSize The cipher block size
- * @param etmMode Whether using "encrypt-then-MAC" mode
- * @return The required padding length
- */
- static int calculatePadLength(int len, int blockSize, boolean etmMode) {
- /*
- * Note: according to RFC-4253 section 6:
- *
- * The minimum size of a packet is 16 (or the cipher block size, whichever is larger) bytes (plus 'mac').
- *
- * Since all out ciphers, MAC(s), etc. have a block size > 8 then the minimum size of the packet will be at
- * least 16 due to the padding at the very least - so even packets that contain an opcode with no arguments will
- * be above this value. This avoids an un-necessary call to Math.max(len, 16) for each and every packet
- */
-
- len++; // the pad length
- if (!etmMode) {
- len += Integer.BYTES;
- }
-
- /*
- * Note: according to RFC-4253 section 6:
- *
- * Note that the length of the concatenation of 'packet_length', 'padding_length', 'payload', and 'random
- * padding' MUST be a multiple of the cipher block size or 8, whichever is larger.
- *
- * However, we currently do not have ciphers with a block size of less than 8 so we do not take this into
- * account in order to accelerate the calculation and avoiding an un-necessary call to Math.max(blockSize, 8)
- * for each and every packet.
- */
- int pad = (-len) & (blockSize - 1);
- if (pad < blockSize) {
- pad += blockSize;
- }
-
- return pad;
- }
-}
diff --git a/sshd-contrib/src/main/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriter.java b/sshd-contrib/src/main/java/org/apache/sshd/common/channel/throttle/ThrottlingChannelStreamWriter.java
similarity index 83%
rename from sshd-contrib/src/main/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriter.java
rename to sshd-contrib/src/main/java/org/apache/sshd/common/channel/throttle/ThrottlingChannelStreamWriter.java
index a62e25c..5092b5e 100644
--- a/sshd-contrib/src/main/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriter.java
+++ b/sshd-contrib/src/main/java/org/apache/sshd/common/channel/throttle/ThrottlingChannelStreamWriter.java
@@ -34,22 +34,22 @@ import org.apache.sshd.common.PropertyResolver;
import org.apache.sshd.common.channel.Channel;
import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.io.IoWriteFuture;
-import org.apache.sshd.common.io.PacketWriter;
import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.logging.AbstractLoggingBean;
/**
- * A {@link PacketWriter} delegate implementation that "throttles" the output by having a limit on the
- * outstanding packets that have not been sent yet. The {@link #writePacket(Buffer) writePacket} implementation make
- * sure that the limit has not been exceeded - if so, then it waits until pending packets have been successfully sent
- * before sending the next packet.
+ * A {@link ChannelStreamWriter} delegate implementation that "throttles" the output by having a limit on the
+ * outstanding packets that have not been sent yet. The {@link #writeData(Buffer) writePacket} implementation make sure
+ * that the limit has not been exceeded - if so, then it waits until pending packets have been successfully sent before
+ * sending the next packet.
*
* <B>Note:</B> {@link #close() closing} the throttler does not close the delegate writer
*
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public class ThrottlingPacketWriter extends AbstractLoggingBean implements PacketWriter, SshFutureListener<IoWriteFuture> {
+public class ThrottlingChannelStreamWriter extends AbstractLoggingBean
+ implements ChannelStreamWriter, SshFutureListener<IoWriteFuture> {
/** Timeout (seconds) for throttling packet writer to wait for pending packets send */
public static final Property<Duration> WAIT_TIME
= Property.durationSec("packet-writer-wait-time", Duration.ofSeconds(30L));
@@ -58,29 +58,30 @@ public class ThrottlingPacketWriter extends AbstractLoggingBean implements Packe
public static final Property<Integer> MAX_PEND_COUNT
= Property.integer("packet-writer-max-pend-count", 4096);
- private final PacketWriter delegate;
+ private final ChannelStreamWriter delegate;
private final int maxPendingPackets;
private final long maxWait;
private final AtomicBoolean open = new AtomicBoolean(true);
private final AtomicInteger availableCount;
- public ThrottlingPacketWriter(Channel channel) {
- this(channel, channel);
+ public ThrottlingChannelStreamWriter(Channel channel) {
+ this(new DefaultChannelStreamWriter(channel), channel);
}
- public ThrottlingPacketWriter(PacketWriter delegate, PropertyResolver resolver) {
+ public ThrottlingChannelStreamWriter(ChannelStreamWriter delegate, PropertyResolver resolver) {
this(delegate, MAX_PEND_COUNT.getRequired(resolver), WAIT_TIME.getRequired(resolver));
}
- public ThrottlingPacketWriter(PacketWriter delegate, int maxPendingPackets, TimeUnit waitUnit, long waitCount) {
+ public ThrottlingChannelStreamWriter(ChannelStreamWriter delegate, int maxPendingPackets, TimeUnit waitUnit,
+ long waitCount) {
this(delegate, maxPendingPackets, waitUnit.toMillis(waitCount));
}
- public ThrottlingPacketWriter(PacketWriter delegate, int maxPendingPackets, Duration maxWait) {
+ public ThrottlingChannelStreamWriter(ChannelStreamWriter delegate, int maxPendingPackets, Duration maxWait) {
this(delegate, maxPendingPackets, maxWait.toMillis());
}
- public ThrottlingPacketWriter(PacketWriter delegate, int maxPendingPackets, long maxWait) {
+ public ThrottlingChannelStreamWriter(ChannelStreamWriter delegate, int maxPendingPackets, long maxWait) {
this.delegate = Objects.requireNonNull(delegate, "No delegate provided");
ValidateUtils.checkTrue(maxPendingPackets > 0, "Invalid pending packets limit: %d", maxPendingPackets);
this.maxPendingPackets = maxPendingPackets;
@@ -89,7 +90,7 @@ public class ThrottlingPacketWriter extends AbstractLoggingBean implements Packe
this.maxWait = maxWait;
}
- public PacketWriter getDelegate() {
+ public ChannelStreamWriter getDelegate() {
return delegate;
}
@@ -111,7 +112,7 @@ public class ThrottlingPacketWriter extends AbstractLoggingBean implements Packe
}
@Override
- public IoWriteFuture writePacket(Buffer buffer) throws IOException {
+ public IoWriteFuture writeData(Buffer buffer) throws IOException {
if (!isOpen()) {
throw new ClosedSelectorException();
}
@@ -147,8 +148,8 @@ public class ThrottlingPacketWriter extends AbstractLoggingBean implements Packe
throw new EOFException("Negative available packets count: " + available);
}
- PacketWriter writer = getDelegate();
- return writer.writePacket(buffer).addListener(this);
+ ChannelStreamWriter writer = getDelegate();
+ return writer.writeData(buffer).addListener(this);
}
@Override
diff --git a/sshd-contrib/src/test/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriterTest.java b/sshd-contrib/src/test/java/org/apache/sshd/common/channel/throttle/ThrottlingChannelStreamWriterTest.java
similarity index 76%
rename from sshd-contrib/src/test/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriterTest.java
rename to sshd-contrib/src/test/java/org/apache/sshd/common/channel/throttle/ThrottlingChannelStreamWriterTest.java
index 7bed6dc..14eb5ca 100644
--- a/sshd-contrib/src/test/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriterTest.java
+++ b/sshd-contrib/src/test/java/org/apache/sshd/common/channel/throttle/ThrottlingChannelStreamWriterTest.java
@@ -28,7 +28,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.sshd.common.channel.IoWriteFutureImpl;
import org.apache.sshd.common.io.IoWriteFuture;
-import org.apache.sshd.common.io.PacketWriter;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
import org.apache.sshd.util.test.BaseTestSupport;
@@ -45,27 +44,27 @@ import org.junit.runners.MethodSorters;
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@Category({ NoIoTestCase.class })
-public class ThrottlingPacketWriterTest extends BaseTestSupport {
- public ThrottlingPacketWriterTest() {
+public class ThrottlingChannelStreamWriterTest extends BaseTestSupport {
+ public ThrottlingChannelStreamWriterTest() {
super();
}
@Test(timeout = 10_000)
public void testThrottlerWaitsUntilPacketSendSignalled() throws IOException {
- try (ThrottlingPacketWriter throttler
- = new ThrottlingPacketWriter(new MockPacketWriter(), Byte.SIZE, TimeUnit.SECONDS.toMillis(3L))) {
+ try (ThrottlingChannelStreamWriter throttler
+ = new ThrottlingChannelStreamWriter(new MockChannelStreamWriter(), Byte.SIZE, TimeUnit.SECONDS.toMillis(3L))) {
int maxSize = throttler.getMaxPendingPackets();
List<IoWriteFuture> pendingWrites = new ArrayList<>(maxSize);
Buffer buf = new ByteArrayBuffer(Byte.SIZE);
for (int index = maxSize; index > 0; index--) {
- IoWriteFuture future = throttler.writePacket(buf);
+ IoWriteFuture future = throttler.writeData(buf);
pendingWrites.add(future);
assertEquals("Mismatched available packets count", index - 1, throttler.getAvailablePacketsCount());
}
assertEquals("Not all available packet window size exhausted", 0, throttler.getAvailablePacketsCount());
try {
- IoWriteFuture future = throttler.writePacket(buf);
+ IoWriteFuture future = throttler.writeData(buf);
fail("Unexpected extra packet success: " + future);
} catch (InterruptedByTimeoutException e) {
// expected
@@ -79,41 +78,41 @@ public class ThrottlingPacketWriterTest extends BaseTestSupport {
}
for (int index = throttler.getAvailablePacketsCount(); index < maxSize; index++) {
- throttler.writePacket(buf);
+ throttler.writeData(buf);
}
}
}
@Test(expected = ClosedSelectorException.class, timeout = 10_000)
public void testThrottlerDoesNotSendIfClosed() throws IOException {
- try (PacketWriter throttler
- = new ThrottlingPacketWriter(new MockPacketWriter(), Byte.SIZE, TimeUnit.SECONDS.toMillis(3L))) {
+ try (ChannelStreamWriter throttler
+ = new ThrottlingChannelStreamWriter(new MockChannelStreamWriter(), Byte.SIZE, TimeUnit.SECONDS.toMillis(3L))) {
assertTrue("Throttler not marked as open", throttler.isOpen());
throttler.close();
assertFalse("Throttler not marked as closed", throttler.isOpen());
- IoWriteFuture future = throttler.writePacket(new ByteArrayBuffer(Byte.SIZE));
+ IoWriteFuture future = throttler.writeData(new ByteArrayBuffer(Byte.SIZE));
fail("Unexpected success: " + future);
}
}
@Test(expected = ClosedSelectorException.class, timeout = 10_000)
public void testThrottlerStopsSendingIfExceptionSignaledOnFutureOperationCompletion() throws IOException {
- try (PacketWriter throttler
- = new ThrottlingPacketWriter(new MockPacketWriter(), Byte.SIZE, TimeUnit.SECONDS.toMillis(3L))) {
+ try (ChannelStreamWriter throttler
+ = new ThrottlingChannelStreamWriter(new MockChannelStreamWriter(), Byte.SIZE, TimeUnit.SECONDS.toMillis(3L))) {
assertTrue("Throttler not marked as open", throttler.isOpen());
- IoWriteFutureImpl futureImpl = (IoWriteFutureImpl) throttler.writePacket(new ByteArrayBuffer(Byte.SIZE));
+ IoWriteFutureImpl futureImpl = (IoWriteFutureImpl) throttler.writeData(new ByteArrayBuffer(Byte.SIZE));
futureImpl.setValue(new StreamCorruptedException(getCurrentTestName()));
assertFalse("Throttler not marked as closed", throttler.isOpen());
- IoWriteFuture future = throttler.writePacket(new ByteArrayBuffer(Byte.SIZE));
+ IoWriteFuture future = throttler.writeData(new ByteArrayBuffer(Byte.SIZE));
fail("Unexpected success: " + future);
}
}
- private static class MockPacketWriter implements PacketWriter {
- MockPacketWriter() {
+ private static class MockChannelStreamWriter implements ChannelStreamWriter {
+ MockChannelStreamWriter() {
super();
}
@@ -128,8 +127,8 @@ public class ThrottlingPacketWriterTest extends BaseTestSupport {
}
@Override
- public IoWriteFuture writePacket(Buffer buffer) throws IOException {
- return new IoWriteFutureImpl(MockPacketWriter.class.getSimpleName(), buffer);
+ public IoWriteFuture writeData(Buffer buffer) throws IOException {
+ return new IoWriteFutureImpl(MockChannelStreamWriter.class.getSimpleName(), buffer);
}
}
}
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java b/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java
index 6b7aa97..6c24a5c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java
@@ -25,7 +25,7 @@ import java.util.List;
import org.apache.sshd.common.channel.ChannelFactory;
import org.apache.sshd.common.channel.RequestHandler;
-import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolver;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolver;
import org.apache.sshd.common.cipher.BuiltinCiphers;
import org.apache.sshd.common.cipher.Cipher;
import org.apache.sshd.common.compression.Compression;
@@ -148,7 +148,7 @@ public class BaseBuilder<T extends AbstractFactoryManager, S extends BaseBuilder
protected ForwarderFactory forwarderFactory;
protected List<RequestHandler<ConnectionService>> globalRequestHandlers;
protected ForwardingFilter forwardingFilter;
- protected ChannelStreamPacketWriterResolver channelStreamPacketWriterResolver;
+ protected ChannelStreamWriterResolver channelStreamPacketWriterResolver;
protected UnknownChannelReferenceHandler unknownChannelReferenceHandler;
public BaseBuilder() {
@@ -247,7 +247,7 @@ public class BaseBuilder<T extends AbstractFactoryManager, S extends BaseBuilder
return me();
}
- public S channelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver resolver) {
+ public S channelStreamPacketWriterResolver(ChannelStreamWriterResolver resolver) {
channelStreamPacketWriterResolver = resolver;
return me();
}
@@ -275,7 +275,7 @@ public class BaseBuilder<T extends AbstractFactoryManager, S extends BaseBuilder
ssh.setForwardingFilter(forwardingFilter);
ssh.setForwarderFactory(forwarderFactory);
ssh.setGlobalRequestHandlers(globalRequestHandlers);
- ssh.setChannelStreamPacketWriterResolver(channelStreamPacketWriterResolver);
+ ssh.setChannelStreamWriterResolver(channelStreamPacketWriterResolver);
ssh.setUnknownChannelReferenceHandler(unknownChannelReferenceHandler);
return ssh;
}
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
index 47340c9..52df0a9 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
@@ -26,7 +26,7 @@ import org.apache.sshd.agent.SshAgentFactory;
import org.apache.sshd.common.channel.ChannelFactory;
import org.apache.sshd.common.channel.ChannelListenerManager;
import org.apache.sshd.common.channel.RequestHandler;
-import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolverManager;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolverManager;
import org.apache.sshd.common.file.FileSystemFactory;
import org.apache.sshd.common.forward.ForwarderFactory;
import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
@@ -56,7 +56,7 @@ public interface FactoryManager
ReservedSessionMessagesManager,
SessionDisconnectHandlerManager,
ChannelListenerManager,
- ChannelStreamPacketWriterResolverManager,
+ ChannelStreamWriterResolverManager,
UnknownChannelReferenceHandlerManager,
PortForwardingEventListenerManager,
IoServiceEventListenerManager,
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index 1ce2566..8bb4d99 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -41,8 +41,8 @@ import org.apache.sshd.common.Closeable;
import org.apache.sshd.common.FactoryManager;
import org.apache.sshd.common.PropertyResolver;
import org.apache.sshd.common.SshConstants;
-import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolver;
-import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolverManager;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolver;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolverManager;
import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.future.DefaultCloseFuture;
import org.apache.sshd.common.future.SshFutureListener;
@@ -105,7 +105,7 @@ public abstract class AbstractChannel
private final Window localWindow;
private final Window remoteWindow;
- private ChannelStreamPacketWriterResolver channelStreamPacketWriterResolver;
+ private ChannelStreamWriterResolver channelStreamPacketWriterResolver;
/**
* A {@link Map} of sent requests - key = request name, value = timestamp when request was sent.
@@ -196,24 +196,24 @@ public abstract class AbstractChannel
}
@Override
- public ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver() {
+ public ChannelStreamWriterResolver getChannelStreamWriterResolver() {
return channelStreamPacketWriterResolver;
}
@Override
- public void setChannelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver resolver) {
+ public void setChannelStreamWriterResolver(ChannelStreamWriterResolver resolver) {
channelStreamPacketWriterResolver = resolver;
}
@Override
- public ChannelStreamPacketWriterResolver resolveChannelStreamPacketWriterResolver() {
- ChannelStreamPacketWriterResolver resolver = getChannelStreamPacketWriterResolver();
+ public ChannelStreamWriterResolver resolveChannelStreamWriterResolver() {
+ ChannelStreamWriterResolver resolver = getChannelStreamWriterResolver();
if (resolver != null) {
return resolver;
}
- ChannelStreamPacketWriterResolverManager manager = getSession();
- return manager.resolveChannelStreamPacketWriterResolver();
+ ChannelStreamWriterResolverManager manager = getSession();
+ return manager.resolveChannelStreamWriterResolver();
}
/**
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
index f1f64fd..3ee3ece 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
@@ -32,7 +32,7 @@ import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.closeable.AbstractInnerCloseable;
/**
- * An {@link IoOutputStream} capable of queuing write requests
+ * An {@link IoOutputStream} capable of queuing write requests.
*/
public class BufferedIoOutputStream extends AbstractInnerCloseable implements IoOutputStream {
protected final IoOutputStream out;
@@ -50,7 +50,7 @@ public class BufferedIoOutputStream extends AbstractInnerCloseable implements Io
}
@Override
- public IoWriteFuture writePacket(Buffer buffer) throws IOException {
+ public IoWriteFuture writeBuffer(Buffer buffer) throws IOException {
if (isClosing()) {
throw new EOFException("Closed - state=" + state);
}
@@ -71,7 +71,7 @@ public class BufferedIoOutputStream extends AbstractInnerCloseable implements Io
return;
}
- out.writePacket(future.getBuffer()).addListener(
+ out.writeBuffer(future.getBuffer()).addListener(
new SshFutureListener<IoWriteFuture>() {
@Override
public void operationComplete(IoWriteFuture f) {
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java
index 4374e88..dae4aa1 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java
@@ -28,8 +28,8 @@ import org.apache.sshd.common.AttributeRepository;
import org.apache.sshd.common.AttributeStore;
import org.apache.sshd.common.Closeable;
import org.apache.sshd.common.PropertyResolver;
-import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolverManager;
-import org.apache.sshd.common.io.PacketWriter;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolverManager;
+import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.session.ConnectionService;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.session.SessionHolder;
@@ -47,8 +47,7 @@ public interface Channel
ChannelListenerManager,
PropertyResolver,
AttributeStore,
- PacketWriter,
- ChannelStreamPacketWriterResolverManager,
+ ChannelStreamWriterResolverManager,
Closeable {
// Known types of channels
String CHANNEL_EXEC = "exec";
@@ -224,4 +223,16 @@ public interface Channel
T value = channel.getAttribute(key);
return (value != null) ? value : Session.resolveAttribute(channel.getSession(), key);
}
+
+ /**
+ * Encode and send the given buffer. <B>Note:</B> for session packets the buffer has to have 5 bytes free at the
+ * beginning to allow the encoding to take place. Also, the write position of the buffer has to be set to the
+ * position of the last byte to write.
+ *
+ * @param buffer the buffer to encode and send. <B>NOTE:</B> the buffer must not be touched until the returned
+ * write future is completed.
+ * @return An {@code IoWriteFuture} that can be used to check when the packet has actually been sent
+ * @throws IOException if an error occurred when encoding or sending the packet
+ */
+ IoWriteFuture writePacket(Buffer buffer) throws IOException;
}
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
index 8b69e69..8d1701f 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
@@ -24,10 +24,10 @@ import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriter;
import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.io.IoOutputStream;
import org.apache.sshd.common.io.IoWriteFuture;
-import org.apache.sshd.common.io.PacketWriter;
import org.apache.sshd.common.io.WritePendingException;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.buffer.Buffer;
@@ -36,14 +36,14 @@ import org.apache.sshd.common.util.closeable.AbstractCloseable;
public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOutputStream, ChannelHolder {
private final Channel channelInstance;
- private final PacketWriter packetWriter;
+ private final ChannelStreamWriter packetWriter;
private final byte cmd;
private final AtomicReference<IoWriteFutureImpl> pendingWrite = new AtomicReference<>();
private final Object packetWriteId;
public ChannelAsyncOutputStream(Channel channel, byte cmd) {
this.channelInstance = Objects.requireNonNull(channel, "No channel");
- this.packetWriter = channelInstance.resolveChannelStreamPacketWriter(channel, cmd);
+ this.packetWriter = channelInstance.resolveChannelStreamWriter(channel, cmd);
this.cmd = cmd;
this.packetWriteId = channel.toString() + "[" + SshConstants.getCommandMessageName(cmd) + "]";
}
@@ -58,14 +58,14 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
}
@Override
- public synchronized IoWriteFuture writePacket(Buffer buffer) throws IOException {
+ public synchronized IoWriteFuture writeBuffer(Buffer buffer) throws IOException {
if (isClosing()) {
throw new EOFException("Closing: " + state);
}
IoWriteFutureImpl future = new IoWriteFutureImpl(packetWriteId, buffer);
if (!pendingWrite.compareAndSet(null, future)) {
- throw new WritePendingException("No write pending future");
+ throw new WritePendingException("A write operation is already pending");
}
doWriteIfPossible(false);
return future;
@@ -164,7 +164,7 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
remoteWindow.consume(length);
try {
- IoWriteFuture writeFuture = packetWriter.writePacket(buf);
+ IoWriteFuture writeFuture = packetWriter.writeData(buf);
writeFuture.addListener(f -> onWritten(future, total, length, f));
} catch (IOException e) {
future.setValue(e);
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
index b78ff72..42f4e28 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
@@ -29,7 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.channel.exception.SshChannelClosedException;
-import org.apache.sshd.common.io.PacketWriter;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriter;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.ValidateUtils;
@@ -44,7 +44,7 @@ import org.slf4j.Logger;
public class ChannelOutputStream extends OutputStream implements java.nio.channels.Channel, ChannelHolder {
private final AbstractChannel channelInstance;
- private final PacketWriter packetWriter;
+ private final ChannelStreamWriter packetWriter;
private final Window remoteWindow;
private final Duration maxWaitTimeout;
private final Logger log;
@@ -76,7 +76,7 @@ public class ChannelOutputStream extends OutputStream implements java.nio.channe
AbstractChannel channel, Window remoteWindow, Duration maxWaitTimeout, Logger log, byte cmd,
boolean eofOnClose) {
this.channelInstance = Objects.requireNonNull(channel, "No channel");
- this.packetWriter = channelInstance.resolveChannelStreamPacketWriter(channel, cmd);
+ this.packetWriter = channelInstance.resolveChannelStreamWriter(channel, cmd);
this.remoteWindow = Objects.requireNonNull(remoteWindow, "No remote window");
Objects.requireNonNull(maxWaitTimeout, "No maxWaitTimeout");
ValidateUtils.checkTrue(GenericUtils.isPositive(maxWaitTimeout), "Non-positive max. wait time: %s",
@@ -240,7 +240,7 @@ public class ChannelOutputStream extends OutputStream implements java.nio.channe
log.trace("flush({}) send {} len={}",
channel, SshConstants.getCommandMessageName(cmd), length);
}
- packetWriter.writePacket(buf);
+ packetWriter.writeData(buf);
}
} catch (WindowClosedException e) {
if (!closedState.getAndSet(true)) {
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/SimpleIoOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/SimpleIoOutputStream.java
index 6fee66a..7bdba6b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/SimpleIoOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/SimpleIoOutputStream.java
@@ -26,12 +26,11 @@ import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.closeable.AbstractCloseable;
import org.apache.sshd.common.util.io.IoUtils;
-import org.apache.sshd.server.forward.TcpipServerChannel;
/**
* An implementation of {@link IoOutputStream} using a synchronous {@link ChannelOutputStream}.
*
- * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
public class SimpleIoOutputStream extends AbstractCloseable implements IoOutputStream {
@@ -48,7 +47,7 @@ public class SimpleIoOutputStream extends AbstractCloseable implements IoOutputS
}
@Override
- public IoWriteFuture writePacket(Buffer buffer) throws IOException {
+ public IoWriteFuture writeBuffer(Buffer buffer) throws IOException {
os.write(buffer.array(), buffer.rpos(), buffer.available());
os.flush();
DefaultIoWriteFuture f = new DefaultIoWriteFuture(this, null);
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/StreamingChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/StreamingChannel.java
index e2d7b94..0b33ad5 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/StreamingChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/StreamingChannel.java
@@ -21,7 +21,7 @@ package org.apache.sshd.common.channel;
/**
* A channel that can be either configured to use synchronous or asynchrounous streams.
*
- * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
public interface StreamingChannel {
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriter.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriter.java
new file mode 100644
index 0000000..a9b643a
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.channel.throttle;
+
+import java.io.IOException;
+import java.nio.channels.Channel;
+
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.util.buffer.Buffer;
+
+/**
+ * The ChannelStreamWriter is used when writing to the channel data stream. This data is encoded and sent with the
+ * {@link org.apache.sshd.common.SshConstants#SSH_MSG_CHANNEL_DATA} and
+ * {@link org.apache.sshd.common.SshConstants#SSH_MSG_CHANNEL_EXTENDED_DATA} commands.
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface ChannelStreamWriter extends Channel {
+
+ /**
+ * Encode and send the given data packet buffer. <B>Note:</B> the buffer has to have 5 bytes free at the beginning
+ * to allow the encoding to take place. Also, the write position of the buffer has to be set to the position of the
+ * last byte to write.
+ *
+ * @param buffer the buffer to encode and send. <B>NOTE:</B> the buffer must not be touched until the returned
+ * write future is completed.
+ * @return An {@code IoWriteFuture} that can be used to check when the packet has actually been sent
+ * @throws IOException if an error occurred when encoding or sending the packet
+ */
+ IoWriteFuture writeData(Buffer buffer) throws IOException;
+
+}
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolver.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriterResolver.java
similarity index 77%
rename from sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolver.java
rename to sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriterResolver.java
index 75aaaea..0b71544 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolver.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriterResolver.java
@@ -19,7 +19,6 @@
package org.apache.sshd.common.channel.throttle;
import org.apache.sshd.common.channel.Channel;
-import org.apache.sshd.common.io.PacketWriter;
/**
* A special mechanism that enables users to intervene in the way packets are sent from {@code ChannelOutputStream}-s -
@@ -28,18 +27,19 @@ import org.apache.sshd.common.io.PacketWriter;
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
@FunctionalInterface
-public interface ChannelStreamPacketWriterResolver {
+public interface ChannelStreamWriterResolver {
/**
* An identity resolver - i.e., no special intervention - simply use the channel itself
*/
- ChannelStreamPacketWriterResolver NONE = (channel, cmd) -> channel;
+ ChannelStreamWriterResolver NONE = (channel, cmd) -> new DefaultChannelStreamWriter(channel);
/**
* @param channel The original {@link Channel}
* @param cmd The {@code SSH_MSG_CHANNEL_DATA} or {@code SSH_MSG_CHANNEL_EXTENDED_DATA} command that triggered
* the resolution
- * @return The {@link PacketWriter} to use - <B>Note:</B> if the return value is not a {@link Channel} then
- * it will be closed when the stream is closed
+ * @return The {@link ChannelStreamWriter} to use - <B>Note:</B> if the return value is not a
+ * {@link Channel} then it will be closed when the stream is closed
*/
- PacketWriter resolveChannelStreamPacketWriter(Channel channel, byte cmd);
+ ChannelStreamWriter resolveChannelStreamWriter(Channel channel, byte cmd);
+
}
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolverManager.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriterResolverManager.java
similarity index 54%
copy from sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolverManager.java
copy to sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriterResolverManager.java
index e50eeb8..34f8391 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolverManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriterResolverManager.java
@@ -19,26 +19,25 @@
package org.apache.sshd.common.channel.throttle;
import org.apache.sshd.common.channel.Channel;
-import org.apache.sshd.common.io.PacketWriter;
/**
* TODO Add javadoc
*
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public interface ChannelStreamPacketWriterResolverManager extends ChannelStreamPacketWriterResolver {
- ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver();
+public interface ChannelStreamWriterResolverManager extends ChannelStreamWriterResolver {
+ ChannelStreamWriterResolver getChannelStreamWriterResolver();
- void setChannelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver resolver);
+ void setChannelStreamWriterResolver(ChannelStreamWriterResolver resolver);
- default ChannelStreamPacketWriterResolver resolveChannelStreamPacketWriterResolver() {
- ChannelStreamPacketWriterResolver resolver = getChannelStreamPacketWriterResolver();
- return (resolver == null) ? ChannelStreamPacketWriterResolver.NONE : resolver;
+ default ChannelStreamWriterResolver resolveChannelStreamWriterResolver() {
+ return getChannelStreamWriterResolver();
}
@Override
- default PacketWriter resolveChannelStreamPacketWriter(Channel channel, byte cmd) {
- ChannelStreamPacketWriterResolver resolver = resolveChannelStreamPacketWriterResolver();
- return (resolver == null) ? channel : resolver.resolveChannelStreamPacketWriter(channel, cmd);
+ default ChannelStreamWriter resolveChannelStreamWriter(Channel channel, byte cmd) {
+ ChannelStreamWriterResolver resolver = resolveChannelStreamWriterResolver();
+ ChannelStreamWriterResolver actual = (resolver == null) ? ChannelStreamWriterResolver.NONE : resolver;
+ return actual.resolveChannelStreamWriter(channel, cmd);
}
}
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolverManager.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/DefaultChannelStreamWriter.java
similarity index 53%
rename from sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolverManager.java
rename to sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/DefaultChannelStreamWriter.java
index e50eeb8..83b1a91 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolverManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/DefaultChannelStreamWriter.java
@@ -18,27 +18,41 @@
*/
package org.apache.sshd.common.channel.throttle;
+import java.io.IOException;
+
import org.apache.sshd.common.channel.Channel;
-import org.apache.sshd.common.io.PacketWriter;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.util.buffer.Buffer;
/**
- * TODO Add javadoc
+ * A ChannelStreamWriter that simply calls the {@link Channel#writePacket(Buffer)} method.
*
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public interface ChannelStreamPacketWriterResolverManager extends ChannelStreamPacketWriterResolver {
- ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver();
+public class DefaultChannelStreamWriter implements ChannelStreamWriter {
+
+ protected final Channel channel;
+ protected volatile boolean closed;
- void setChannelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver resolver);
+ public DefaultChannelStreamWriter(Channel channel) {
+ this.channel = channel;
+ }
- default ChannelStreamPacketWriterResolver resolveChannelStreamPacketWriterResolver() {
- ChannelStreamPacketWriterResolver resolver = getChannelStreamPacketWriterResolver();
- return (resolver == null) ? ChannelStreamPacketWriterResolver.NONE : resolver;
+ @Override
+ public IoWriteFuture writeData(Buffer buffer) throws IOException {
+ if (closed) {
+ throw new IOException("ChannelStreamPacketWriter has been closed");
+ }
+ return channel.writePacket(buffer);
+ }
+
+ @Override
+ public boolean isOpen() {
+ return !closed;
}
@Override
- default PacketWriter resolveChannelStreamPacketWriter(Channel channel, byte cmd) {
- ChannelStreamPacketWriterResolver resolver = resolveChannelStreamPacketWriterResolver();
- return (resolver == null) ? channel : resolver.resolveChannelStreamPacketWriter(channel, cmd);
+ public void close() throws IOException {
+ closed = true;
}
}
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
index f87d35c..156780e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
@@ -103,7 +103,7 @@ public class SocksProxy extends AbstractCloseable implements IoHandler {
protected void onMessage(Buffer buffer) throws IOException {
IoOutputStream asyncIn = channel.getAsyncIn();
if (asyncIn != null) {
- asyncIn.writePacket(buffer);
+ asyncIn.writeBuffer(buffer);
} else {
OutputStream invertedIn = channel.getInvertedIn();
invertedIn.write(buffer.array(), buffer.rpos(), buffer.available());
@@ -185,7 +185,7 @@ public class SocksProxy extends AbstractCloseable implements IoHandler {
buffer.putByte((byte) 0x00);
buffer.putByte((byte) 0x00);
try {
- session.writePacket(buffer);
+ session.writeBuffer(buffer);
} catch (IOException e) {
// TODO Auto-generated catch block
log.error("Failed ({}) to send channel open packet for {}: {}", e.getClass().getSimpleName(), channel,
@@ -229,7 +229,7 @@ public class SocksProxy extends AbstractCloseable implements IoHandler {
buffer = new ByteArrayBuffer(Byte.SIZE, false);
buffer.putByte((byte) 0x05);
buffer.putByte((byte) (foundNoAuth ? 0x00 : 0xFF));
- session.writePacket(buffer);
+ session.writeBuffer(buffer);
if (!foundNoAuth) {
throw new IllegalStateException("Received socks5 greeting without NoAuth method");
} else if (debugEnabled) {
@@ -304,7 +304,7 @@ public class SocksProxy extends AbstractCloseable implements IoHandler {
}
response.wpos(wpos);
try {
- session.writePacket(response);
+ session.writeBuffer(response);
} catch (IOException e) {
log.error("Failed ({}) to send channel open response for {}: {}", e.getClass().getSimpleName(), channel,
e.getMessage());
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
index c743948..2282b9a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
@@ -210,7 +210,7 @@ public class TcpipClientChannel extends AbstractClientChannel implements Forward
Buffer buf = ByteArrayBuffer.getCompactClone(data, off, (int) len);
Window wLocal = getLocalWindow();
wLocal.consumeAndCheck(len);
- serverSession.writePacket(buf);
+ serverSession.writeBuffer(buf);
}
@Override
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java
index 577da74..3abf709 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java
@@ -42,7 +42,7 @@ import org.apache.sshd.common.SyspropsMapWrapper;
import org.apache.sshd.common.channel.ChannelFactory;
import org.apache.sshd.common.channel.ChannelListener;
import org.apache.sshd.common.channel.RequestHandler;
-import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolver;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolver;
import org.apache.sshd.common.config.VersionProperties;
import org.apache.sshd.common.file.FileSystemFactory;
import org.apache.sshd.common.forward.ForwarderFactory;
@@ -96,7 +96,7 @@ public abstract class AbstractFactoryManager extends AbstractKexFactoryManager i
private PropertyResolver parentResolver = SyspropsMapWrapper.SYSPROPS_RESOLVER;
private ReservedSessionMessagesHandler reservedSessionMessagesHandler;
private SessionDisconnectHandler sessionDisconnectHandler;
- private ChannelStreamPacketWriterResolver channelStreamPacketWriterResolver;
+ private ChannelStreamWriterResolver channelStreamWriterResolver;
private UnknownChannelReferenceHandler unknownChannelReferenceHandler;
private IoServiceEventListener eventListener;
@@ -314,13 +314,13 @@ public abstract class AbstractFactoryManager extends AbstractKexFactoryManager i
}
@Override
- public ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver() {
- return channelStreamPacketWriterResolver;
+ public ChannelStreamWriterResolver getChannelStreamWriterResolver() {
+ return channelStreamWriterResolver;
}
@Override
- public void setChannelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver resolver) {
- channelStreamPacketWriterResolver = resolver;
+ public void setChannelStreamWriterResolver(ChannelStreamWriterResolver resolver) {
+ channelStreamWriterResolver = resolver;
}
@Override
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
index 2200ba2..af9d0e2 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
@@ -166,9 +166,9 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
}
@Override
- public IoWriteFuture writePacket(Buffer buffer) throws IOException {
+ public IoWriteFuture writeBuffer(Buffer buffer) throws IOException {
if (log.isDebugEnabled()) {
- log.debug("writePacket({}) Writing {} bytes", this, buffer.available());
+ log.debug("writeBuffer({}) writing {} bytes", this, buffer.available());
}
ByteBuffer buf = ByteBuffer.wrap(buffer.array(), buffer.rpos(), buffer.available());
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
index 047dac7..081ccf4 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
@@ -31,13 +31,12 @@ import org.apache.sshd.common.FactoryManagerHolder;
import org.apache.sshd.common.Service;
import org.apache.sshd.common.auth.MutableUserHolder;
import org.apache.sshd.common.channel.ChannelListenerManager;
-import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolverManager;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolverManager;
import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
import org.apache.sshd.common.forward.PortForwardingInformationProvider;
import org.apache.sshd.common.future.KeyExchangeFuture;
import org.apache.sshd.common.io.IoSession;
import org.apache.sshd.common.io.IoWriteFuture;
-import org.apache.sshd.common.io.PacketWriter;
import org.apache.sshd.common.kex.KexFactoryManager;
import org.apache.sshd.common.kex.KeyExchange;
import org.apache.sshd.common.session.helpers.TimeoutIndicator;
@@ -58,12 +57,11 @@ public interface Session
ReservedSessionMessagesManager,
SessionDisconnectHandlerManager,
ChannelListenerManager,
- ChannelStreamPacketWriterResolverManager,
+ ChannelStreamWriterResolverManager,
PortForwardingEventListenerManager,
UnknownChannelReferenceHandlerManager,
FactoryManagerHolder,
- PortForwardingInformationProvider,
- PacketWriter {
+ PortForwardingInformationProvider {
/**
* Create a new buffer for the specified SSH packet and reserve the needed space (5 bytes) for the packet header.
@@ -103,7 +101,7 @@ public interface Session
* "null" string is sent
* @param lang The language - {@code null}/empty if some pre-agreed default is used
* @return An {@code IoWriteFuture} that can be used to check when the packet has actually been sent
- * @throws IOException if an error occurred when encoding sending the packet
+ * @throws IOException if an error occurred when encoding or sending the packet
* @see <A HREF="https://tools.ietf.org/html/rfc4253#section-11.3">RFC 4253 - section 11.3</A>
*/
IoWriteFuture sendDebugMessage(boolean display, Object msg, String lang) throws IOException;
@@ -113,12 +111,22 @@ public interface Session
*
* @param data The message data
* @return An {@code IoWriteFuture} that can be used to check when the packet has actually been sent
- * @throws IOException if an error occurred when encoding sending the packet
+ * @throws IOException if an error occurred when encoding or sending the packet
* @see <A HREF="https://tools.ietf.org/html/rfc4253#section-11.2">RFC 4253 - section 11.2</A>
*/
IoWriteFuture sendIgnoreMessage(byte... data) throws IOException;
/**
+ * Encode and send the given buffer. The buffer has to have 5 bytes free at the beginning to allow the encoding to
+ * take place. Also, the write position of the buffer has to be set to the position of the last byte to write.
+ *
+ * @param buffer the buffer to encode and send
+ * @return An {@code IoWriteFuture} that can be used to check when the packet has actually been sent
+ * @throws IOException if an error occurred when encoding sending the packet
+ */
+ IoWriteFuture writePacket(Buffer buffer) throws IOException;
+
+ /**
* Encode and send the given buffer with the specified timeout. If the buffer could not be written before the
* timeout elapses, the returned {@link org.apache.sshd.common.io.IoWriteFuture} will be set with a
* {@link java.util.concurrent.TimeoutException} exception to indicate a timeout.
@@ -127,7 +135,7 @@ public interface Session
* @param timeout the (never {@code null}) timeout value - its {@link Duration#toMillis() milliseconds} value
* will be used
* @return a future that can be used to check when the packet has actually been sent
- * @throws IOException if an error occurred when encoding sending the packet
+ * @throws IOException if an error occurred when encoding or sending the packet
* @see #writePacket(Buffer, long)
*/
default IoWriteFuture writePacket(Buffer buffer, Duration timeout) throws IOException {
@@ -143,7 +151,7 @@ public interface Session
* @param buffer the buffer to encode and spend
* @param maxWaitMillis the timeout in milliseconds
* @return a future that can be used to check when the packet has actually been sent
- * @throws IOException if an error occurred when encoding sending the packet
+ * @throws IOException if an error occurred when encoding or sending the packet
*/
default IoWriteFuture writePacket(Buffer buffer, long maxWaitMillis) throws IOException {
return writePacket(buffer, maxWaitMillis, TimeUnit.MILLISECONDS);
@@ -158,7 +166,7 @@ public interface Session
* @param timeout the timeout
* @param unit the time unit of the timeout parameter
* @return a future that can be used to check when the packet has actually been sent
- * @throws IOException if an error occurred when encoding sending the packet
+ * @throws IOException if an error occurred when encoding or sending the packet
*/
IoWriteFuture writePacket(Buffer buffer, long timeout, TimeUnit unit) throws IOException;
@@ -171,7 +179,7 @@ public interface Session
* @param timeout The number of time units to wait - must be <U>positive</U>
* @param unit The {@link TimeUnit} to wait for the response
* @return the return buffer if the request was successful, {@code null} otherwise.
- * @throws IOException if an error occurred when encoding sending the packet
+ * @throws IOException if an error occurred when encoding or sending the packet
* @throws java.net.SocketTimeoutException If no response received within specified timeout
*/
default Buffer request(
@@ -190,7 +198,7 @@ public interface Session
* @param buffer the buffer containing the global request
* @param timeout The (never {@code null}) timeout to wait - its milliseconds value is used
* @return the return buffer if the request was successful, {@code null} otherwise.
- * @throws IOException if an error occurred when encoding sending the packet
+ * @throws IOException if an error occurred when encoding or sending the packet
* @throws java.net.SocketTimeoutException If no response received within specified timeout
*/
default Buffer request(String request, Buffer buffer, Duration timeout) throws IOException {
@@ -206,7 +214,7 @@ public interface Session
* @param buffer the buffer containing the global request
* @param maxWaitMillis Max. time to wait for response (millis) - must be <U>positive</U>
* @return the return buffer if the request was successful, {@code null} otherwise.
- * @throws IOException if an error occurred when encoding sending the packet
+ * @throws IOException if an error occurred when encoding or sending the packet
* @throws java.net.SocketTimeoutException If no response received within specified timeout
*/
Buffer request(String request, Buffer buffer, long maxWaitMillis) throws IOException;
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
index 636a212..124895f 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
@@ -63,7 +63,6 @@ import org.apache.sshd.common.future.KeyExchangeFuture;
import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.io.IoSession;
import org.apache.sshd.common.io.IoWriteFuture;
-import org.apache.sshd.common.io.PacketWriter;
import org.apache.sshd.common.kex.KexProposalOption;
import org.apache.sshd.common.kex.KexState;
import org.apache.sshd.common.kex.KeyExchange;
@@ -254,6 +253,46 @@ public abstract class AbstractSession extends SessionHelper {
}
}
+ /**
+ * @param len The packet payload size
+ * @param blockSize The cipher block size
+ * @param etmMode Whether using "encrypt-then-MAC" mode
+ * @return The required padding length
+ */
+ public static int calculatePadLength(int len, int blockSize, boolean etmMode) {
+ /*
+ * Note: according to RFC-4253 section 6:
+ *
+ * The minimum size of a packet is 16 (or the cipher block size, whichever is larger) bytes (plus 'mac').
+ *
+ * Since all out ciphers, MAC(s), etc. have a block size > 8 then the minimum size of the packet will be at
+ * least 16 due to the padding at the very least - so even packets that contain an opcode with no arguments will
+ * be above this value. This avoids an un-necessary call to Math.max(len, 16) for each and every packet
+ */
+
+ len++; // the pad length
+ if (!etmMode) {
+ len += Integer.BYTES;
+ }
+
+ /*
+ * Note: according to RFC-4253 section 6:
+ *
+ * Note that the length of the concatenation of 'packet_length', 'padding_length', 'payload', and 'random
+ * padding' MUST be a multiple of the cipher block size or 8, whichever is larger.
+ *
+ * However, we currently do not have ciphers with a block size of less than 8 so we do not take this into
+ * account in order to accelerate the calculation and avoiding an un-necessary call to Math.max(blockSize, 8)
+ * for each and every packet.
+ */
+ int pad = (-len) & (blockSize - 1);
+ if (pad < blockSize) {
+ pad += blockSize;
+ }
+
+ return pad;
+ }
+
@Override
public String getServerVersion() {
return serverVersion;
@@ -935,7 +974,7 @@ public abstract class AbstractSession extends SessionHelper {
ignoreBuf = encode(ignoreBuf);
IoSession networkSession = getIoSession();
- networkSession.writePacket(ignoreBuf);
+ networkSession.writeBuffer(ignoreBuf);
}
return encode(buffer);
@@ -948,7 +987,7 @@ public abstract class AbstractSession extends SessionHelper {
synchronized (encodeLock) {
Buffer packet = resolveOutputPacket(buffer);
IoSession networkSession = getIoSession();
- IoWriteFuture future = networkSession.writePacket(packet);
+ IoWriteFuture future = networkSession.writeBuffer(packet);
return future;
}
}
@@ -1104,7 +1143,7 @@ public abstract class AbstractSession extends SessionHelper {
boolean etmMode = outMac != null && outMac.isEncryptThenMac();
int authLen = outCipher != null ? outCipher.getAuthenticationTagSize() : 0;
boolean authMode = authLen > 0;
- int pad = PacketWriter.calculatePadLength(len, outCipherSize, etmMode || authMode);
+ int pad = calculatePadLength(len, outCipherSize, etmMode || authMode);
len += SshConstants.SSH_PACKET_HEADER_LEN + pad + authLen;
if (outMac != null) {
len += outMacSize;
@@ -1204,7 +1243,7 @@ public abstract class AbstractSession extends SessionHelper {
boolean authMode = authSize > 0;
int oldLen = len;
- int pad = PacketWriter.calculatePadLength(len, outCipherSize, etmMode || authMode);
+ int pad = calculatePadLength(len, outCipherSize, etmMode || authMode);
len += Byte.BYTES + pad;
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java
index 9c15dda..6883451 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java
@@ -48,8 +48,8 @@ import org.apache.sshd.common.PropertyResolver;
import org.apache.sshd.common.RuntimeSshException;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.SshException;
-import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolver;
-import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolverManager;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolver;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolverManager;
import org.apache.sshd.common.digest.Digest;
import org.apache.sshd.common.forward.Forwarder;
import org.apache.sshd.common.future.DefaultSshFuture;
@@ -104,7 +104,7 @@ public abstract class SessionHelper extends AbstractKexFactoryManager implements
private ReservedSessionMessagesHandler reservedSessionMessagesHandler;
private SessionDisconnectHandler sessionDisconnectHandler;
private UnknownChannelReferenceHandler unknownChannelReferenceHandler;
- private ChannelStreamPacketWriterResolver channelStreamPacketWriterResolver;
+ private ChannelStreamWriterResolver channelStreamPacketWriterResolver;
/**
* The name of the authenticated user
@@ -514,24 +514,24 @@ public abstract class SessionHelper extends AbstractKexFactoryManager implements
}
@Override
- public ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver() {
+ public ChannelStreamWriterResolver getChannelStreamWriterResolver() {
return channelStreamPacketWriterResolver;
}
@Override
- public void setChannelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver resolver) {
+ public void setChannelStreamWriterResolver(ChannelStreamWriterResolver resolver) {
channelStreamPacketWriterResolver = resolver;
}
@Override
- public ChannelStreamPacketWriterResolver resolveChannelStreamPacketWriterResolver() {
- ChannelStreamPacketWriterResolver resolver = getChannelStreamPacketWriterResolver();
+ public ChannelStreamWriterResolver resolveChannelStreamWriterResolver() {
+ ChannelStreamWriterResolver resolver = getChannelStreamWriterResolver();
if (resolver != null) {
return resolver;
}
- ChannelStreamPacketWriterResolverManager manager = getFactoryManager();
- return manager.resolveChannelStreamPacketWriterResolver();
+ ChannelStreamWriterResolverManager manager = getFactoryManager();
+ return manager.resolveChannelStreamWriterResolver();
}
@Override
@@ -793,7 +793,7 @@ public abstract class SessionHelper extends AbstractKexFactoryManager implements
IoSession networkSession = getIoSession();
byte[] data = (ident + "\r\n").getBytes(StandardCharsets.UTF_8);
- return networkSession.writePacket(new ByteArrayBuffer(data));
+ return networkSession.writeBuffer(new ByteArrayBuffer(data));
}
/**
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
index e14775a..874b49e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
@@ -217,24 +217,26 @@ public class TcpipServerChannel extends AbstractServerChannel implements Streami
if (streaming == Streaming.Async) {
out = new BufferedIoOutputStream(
"tcpip channel", new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA) {
- @SuppressWarnings("synthetic-access")
- @Override
- protected CloseFuture doCloseGracefully() {
- try {
- sendEof();
- } catch (IOException e) {
- session.exceptionCaught(e);
- }
- return super.doCloseGracefully();
- }
- });
+ @SuppressWarnings("synthetic-access")
+ @Override
+ protected CloseFuture doCloseGracefully() {
+ try {
+ sendEof();
+ } catch (IOException e) {
+ session.exceptionCaught(e);
+ }
+ return super.doCloseGracefully();
+ }
+ });
} else {
- this.out = new SimpleIoOutputStream(new ChannelOutputStream(
- this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, true));
+ this.out = new SimpleIoOutputStream(
+ new ChannelOutputStream(
+ this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, true));
}
long thresholdHigh = CoreModuleProperties.TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_HIGH.getRequired(this);
- long thresholdLow = CoreModuleProperties.TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_LOW.get(this).orElse(thresholdHigh / 2);
+ long thresholdLow
+ = CoreModuleProperties.TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_LOW.get(this).orElse(thresholdHigh / 2);
IoHandler handler = new IoHandler() {
@Override
@SuppressWarnings("synthetic-access")
@@ -251,7 +253,7 @@ public class TcpipServerChannel extends AbstractServerChannel implements Streami
if (total > thresholdHigh) {
session.suspendRead();
}
- IoWriteFuture ioWriteFuture = out.writePacket(buffer);
+ IoWriteFuture ioWriteFuture = out.writeBuffer(buffer);
ioWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
@Override
public void operationComplete(IoWriteFuture future) {
@@ -379,7 +381,7 @@ public class TcpipServerChannel extends AbstractServerChannel implements Streami
ValidateUtils.checkTrue(len <= Integer.MAX_VALUE, "Data length exceeds int boundaries: %d", len);
// Make sure we copy the data as the incoming buffer may be reused
Buffer buf = ByteArrayBuffer.getCompactClone(data, off, (int) len);
- ioSession.writePacket(buf).addListener(future -> {
+ ioSession.writeBuffer(buf).addListener(future -> {
if (future.isWritten()) {
handleWriteDataSuccess(
SshConstants.SSH_MSG_CHANNEL_DATA, buf.array(), 0, (int) len);
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/session/AbstractServerSession.java b/sshd-core/src/main/java/org/apache/sshd/server/session/AbstractServerSession.java
index 06fcc46..4c2c9b3 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/session/AbstractServerSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/session/AbstractServerSession.java
@@ -325,7 +325,7 @@ public abstract class AbstractServerSession extends AbstractSession implements S
startService(authService, buffer);
// Now we can inform the peer that authentication is successful
- future = networkSession.writePacket(packet);
+ future = networkSession.writeBuffer(packet);
}
resetIdleTimeout();
@@ -491,7 +491,7 @@ public abstract class AbstractServerSession extends AbstractSession implements S
if (err != null) {
IoSession networkSession = getIoSession();
- networkSession.writePacket(
+ networkSession.writeBuffer(
new ByteArrayBuffer((err.getMessage() + "\n").getBytes(StandardCharsets.UTF_8)))
.addListener(future -> close(true));
throw err;
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java b/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java
index 2c1cdee..9dd046a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java
@@ -101,7 +101,7 @@ public class ChannelForwardedX11 extends AbstractClientChannel {
wLocal.consumeAndCheck(len);
// use a clone in case data buffer is re-used
Buffer packet = ByteArrayBuffer.getCompactClone(data, off, (int) len);
- serverSession.writePacket(packet);
+ serverSession.writeBuffer(packet);
}
@Override
diff --git a/sshd-core/src/test/java/org/apache/sshd/LoadTest.java b/sshd-core/src/test/java/org/apache/sshd/LoadTest.java
index 7afae32..5c33c0c 100644
--- a/sshd-core/src/test/java/org/apache/sshd/LoadTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/LoadTest.java
@@ -23,20 +23,16 @@ import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.apache.sshd.client.ClientBuilder;
import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.channel.ClientChannel;
import org.apache.sshd.client.channel.ClientChannelEvent;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.common.channel.Channel;
-import org.apache.sshd.common.cipher.BuiltinCiphers;
-import org.apache.sshd.common.kex.BuiltinDHFactories;
import org.apache.sshd.common.util.security.SecurityUtils;
import org.apache.sshd.core.CoreModuleProperties;
import org.apache.sshd.server.SshServer;
diff --git a/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java b/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
index 98afcb8..bb43718 100644
--- a/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
@@ -300,7 +300,7 @@ public class WindowAdjustTest extends BaseTestSupport {
private void writeWithPendingDetection(Buffer msg, boolean wasPending) throws IOException {
try {
- asyncIn.writePacket(msg).addListener(future -> {
+ asyncIn.writeBuffer(msg).addListener(future -> {
if (future.isWritten()) {
if (wasPending) {
pending.remove();
diff --git a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
index c7487f6..cf2e0cd 100644
--- a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
@@ -73,6 +73,7 @@ import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.channel.Channel;
import org.apache.sshd.common.channel.ChannelListener;
+import org.apache.sshd.common.channel.StreamingChannel;
import org.apache.sshd.common.channel.exception.SshChannelClosedException;
import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.future.SshFutureListener;
@@ -523,7 +524,7 @@ public class ClientTest extends BaseTestSupport {
try (ClientSession session = createTestClientSession();
ChannelShell channel = session.createShellChannel()) {
- channel.setStreaming(ClientChannel.Streaming.Async);
+ channel.setStreaming(StreamingChannel.Streaming.Async);
channel.open().verify(OPEN_TIMEOUT);
byte[] message = "0123456789\n".getBytes(StandardCharsets.UTF_8);
@@ -533,14 +534,14 @@ public class ClientTest extends BaseTestSupport {
AtomicInteger writes = new AtomicInteger(nbMessages);
IoOutputStream asyncIn = channel.getAsyncIn();
- asyncIn.writePacket(new ByteArrayBuffer(message))
+ asyncIn.writeBuffer(new ByteArrayBuffer(message))
.addListener(new SshFutureListener<IoWriteFuture>() {
@Override
public void operationComplete(IoWriteFuture future) {
try {
if (future.isWritten()) {
if (writes.decrementAndGet() > 0) {
- asyncIn.writePacket(new ByteArrayBuffer(message)).addListener(this);
+ asyncIn.writeBuffer(new ByteArrayBuffer(message)).addListener(this);
} else {
asyncIn.close(false);
}
@@ -622,7 +623,7 @@ public class ClientTest extends BaseTestSupport {
ByteArrayOutputStream baosErr = new ByteArrayOutputStream();
try (ChannelExec channel = session.createExecChannel("test")) {
- channel.setStreaming(ClientChannel.Streaming.Async);
+ channel.setStreaming(StreamingChannel.Streaming.Async);
OpenFuture open = channel.open();
Thread.sleep(100L); // Removing this line will make the test succeed
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTest.java b/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTest.java
index 7618206..f42ad61 100644
--- a/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTest.java
@@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.channel.ChannelShell;
-import org.apache.sshd.client.channel.ClientChannel;
import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.common.RuntimeSshException;
@@ -273,7 +272,7 @@ public class WindowTest extends BaseTestSupport {
session.auth().verify(AUTH_TIMEOUT);
try (ChannelShell channel = session.createShellChannel()) {
- channel.setStreaming(ClientChannel.Streaming.Async);
+ channel.setStreaming(StreamingChannel.Streaming.Async);
channel.open().verify(OPEN_TIMEOUT);
try (Channel serverChannel = GenericUtils.head(GenericUtils.head(sshd.getActiveSessions())
@@ -290,7 +289,7 @@ public class WindowTest extends BaseTestSupport {
IoInputStream input = channel.getAsyncOut();
for (int i = 0; i < nbMessages; i++) {
Buffer buffer = new ByteArrayBuffer(bytes);
- output.writePacket(buffer).verify(DEFAULT_TIMEOUT);
+ output.writeBuffer(buffer).verify(DEFAULT_TIMEOUT);
waitForWindowNotEquals(clientLocal, serverRemote, "client local", "server remote",
TimeUnit.SECONDS.toMillis(3L));
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java
index 1715ed2..1bbe342 100644
--- a/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java
@@ -322,7 +322,10 @@ public class PortForwardingLoadTest extends BaseTestSupport {
lastReport = readSize;
}
} catch (SocketTimeoutException e) {
- throw new IOException("Error reading data at index " + readSize + "/" + dataBytes.length + " of iteration #" + i, e);
+ throw new IOException(
+ "Error reading data at index " + readSize + "/" + dataBytes.length + " of iteration #"
+ + i,
+ e);
}
}
assertPayloadEquals("Mismatched payload at iteration #" + i, dataBytes, baos.toByteArray());
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java b/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java
index 35bd8b5..d8fd35f 100644
--- a/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java
@@ -389,7 +389,7 @@ public class AbstractSessionTest extends BaseTestSupport {
}
@Override
- public IoWriteFuture writePacket(Buffer buffer) throws IOException {
+ public IoWriteFuture writeBuffer(Buffer buffer) throws IOException {
if (!isOpen()) {
throw new EOFException("Not open");
}
diff --git a/sshd-core/src/test/java/org/apache/sshd/server/ServerProxyAcceptorTest.java b/sshd-core/src/test/java/org/apache/sshd/server/ServerProxyAcceptorTest.java
index 0d09ec9..553142e 100644
--- a/sshd-core/src/test/java/org/apache/sshd/server/ServerProxyAcceptorTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/server/ServerProxyAcceptorTest.java
@@ -132,7 +132,7 @@ public class ServerProxyAcceptorTest extends BaseTestSupport {
client.setClientProxyConnector(session -> {
IoSession ioSession = session.getIoSession();
- ioSession.writePacket(new ByteArrayBuffer(metaDataBytes));
+ ioSession.writeBuffer(new ByteArrayBuffer(metaDataBytes));
});
client.start();
diff --git a/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java b/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java
index 9084a13..b550893 100644
--- a/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java
+++ b/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java
@@ -140,7 +140,7 @@ public class AsyncEchoShellFactory implements ShellFactory {
if (buffer.charAt(i) == '\n') {
String s = buffer.substring(0, i + 1);
byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
- out.writePacket(new ByteArrayBuffer(bytes)).addListener(future -> {
+ out.writeBuffer(new ByteArrayBuffer(bytes)).addListener(future -> {
Session session1 = channel.getSession();
if (future.isWritten()) {
try {
diff --git a/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java b/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java
index 8700e7a..80e576c 100644
--- a/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java
+++ b/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java
@@ -24,8 +24,9 @@ import org.apache.sshd.client.future.DefaultOpenFuture;
import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.common.channel.AbstractChannel;
import org.apache.sshd.common.channel.Channel;
-import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolver;
-import org.apache.sshd.common.io.PacketWriter;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriter;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolver;
+import org.apache.sshd.common.channel.throttle.DefaultChannelStreamWriter;
import org.apache.sshd.common.util.buffer.Buffer;
public class BogusChannel extends AbstractChannel {
@@ -64,12 +65,13 @@ public class BogusChannel extends AbstractChannel {
}
@Override
- public ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver() {
- return ChannelStreamPacketWriterResolver.NONE;
+ public ChannelStreamWriterResolver getChannelStreamWriterResolver() {
+ return ChannelStreamWriterResolver.NONE;
}
@Override
- public PacketWriter resolveChannelStreamPacketWriter(Channel channel, byte cmd) {
- return channel;
+ public ChannelStreamWriter resolveChannelStreamWriter(Channel channel, byte cmd) {
+ return new DefaultChannelStreamWriter(channel);
}
+
}
diff --git a/sshd-mina/src/main/java/org/apache/sshd/mina/MinaSession.java b/sshd-mina/src/main/java/org/apache/sshd/mina/MinaSession.java
index af92e6e..04c7ea7 100644
--- a/sshd-mina/src/main/java/org/apache/sshd/mina/MinaSession.java
+++ b/sshd-mina/src/main/java/org/apache/sshd/mina/MinaSession.java
@@ -167,7 +167,7 @@ public class MinaSession extends AbstractInnerCloseable implements IoSession {
}
@Override // NOTE !!! data buffer may NOT be re-used when method returns - at least until IoWriteFuture is signalled
- public IoWriteFuture writePacket(Buffer buffer) {
+ public IoWriteFuture writeBuffer(Buffer buffer) {
return write(MinaSupport.asIoBuffer(buffer));
}
diff --git a/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java
index 984c32d..01ab21d 100644
--- a/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java
+++ b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java
@@ -136,7 +136,7 @@ public class NettyIoSession extends AbstractCloseable implements IoSession {
}
@Override
- public IoWriteFuture writePacket(Buffer buffer) {
+ public IoWriteFuture writeBuffer(Buffer buffer) {
int bufLen = buffer.available();
ByteBuf buf = Unpooled.buffer(bufLen);
buf.writeBytes(buffer.array(), buffer.rpos(), bufLen);
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/DefaultSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/DefaultSftpClient.java
index 9db3731..ba29ed4 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/DefaultSftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/DefaultSftpClient.java
@@ -288,7 +288,7 @@ public class DefaultSftpClient extends AbstractSftpClient {
}
IoOutputStream asyncIn = channel.getAsyncIn();
- IoWriteFuture writeFuture = asyncIn.writePacket(buf);
+ IoWriteFuture writeFuture = asyncIn.writeBuffer(buf);
writeFuture.verify();
return id;
}
@@ -368,7 +368,7 @@ public class DefaultSftpClient extends AbstractSftpClient {
if (traceEnabled) {
log.trace("init({}) send SSH_FXP_INIT - initial version={}", clientChannel, initialVersion);
}
- IoWriteFuture writeFuture = asyncIn.writePacket(buf);
+ IoWriteFuture writeFuture = asyncIn.writeBuffer(buf);
writeFuture.verify();
if (traceEnabled) {
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/sftp/server/SftpSubsystem.java b/sshd-sftp/src/main/java/org/apache/sshd/sftp/server/SftpSubsystem.java
index 034794f..5ad6869 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/sftp/server/SftpSubsystem.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/server/SftpSubsystem.java
@@ -943,7 +943,7 @@ public class SftpSubsystem
@Override
protected void send(Buffer buffer) throws IOException {
BufferUtils.updateLengthPlaceholder(buffer, 0);
- out.writePacket(buffer);
+ out.writeBuffer(buffer);
}
@Override