You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2016/02/15 17:51:22 UTC
[4/6] mina-sshd git commit: Take into account IoWriteFuture#isWritten
result
Take into account IoWriteFuture#isWritten result
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/823540d4
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/823540d4
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/823540d4
Branch: refs/heads/master
Commit: 823540d4c68014cb02a272e04c743a4b14a29926
Parents: 2bde68e
Author: Lyor Goldstein <ly...@gmail.com>
Authored: Mon Feb 15 18:51:10 2016 +0200
Committer: Lyor Goldstein <ly...@gmail.com>
Committed: Mon Feb 15 18:51:10 2016 +0200
----------------------------------------------------------------------
.../channel/ChannelAsyncOutputStream.java | 63 +++++++++++++++++---
.../sshd/server/forward/TcpipServerChannel.java | 32 ++++++++--
.../java/org/apache/sshd/WindowAdjustTest.java | 16 +++--
.../sshd/util/test/AsyncEchoShellFactory.java | 17 ++++--
4 files changed, 104 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/823540d4/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
index c790f29..7305937 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
@@ -28,20 +28,26 @@ import org.apache.sshd.common.io.IoOutputStream;
import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.io.WritePendingException;
import org.apache.sshd.common.session.Session;
+import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.closeable.AbstractCloseable;
-public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOutputStream {
+public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOutputStream, ChannelHolder {
- private final Channel channel;
+ private final Channel channelInstance;
private final byte cmd;
private final AtomicReference<IoWriteFutureImpl> pendingWrite = new AtomicReference<>();
public ChannelAsyncOutputStream(Channel channel, byte cmd) {
- this.channel = channel;
+ this.channelInstance = ValidateUtils.checkNotNull(channel, "No channel");
this.cmd = cmd;
}
+ @Override
+ public Channel getChannel() {
+ return channelInstance;
+ }
+
public void onWindowExpanded() throws IOException {
doWriteIfPossible(true);
}
@@ -77,6 +83,7 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
final Buffer buffer = future.getBuffer();
final int total = buffer.available();
if (total > 0) {
+ Channel channel = getChannel();
Window remoteWindow = channel.getRemoteWindow();
final int length = Math.min(Math.min(remoteWindow.getSize(), total), remoteWindow.getPacketSize());
if (log.isTraceEnabled()) {
@@ -101,32 +108,72 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
buffer.rpos(buffer.rpos() + length);
remoteWindow.consume(length);
try {
+ final ChannelAsyncOutputStream stream = this;
s.writePacket(buf).addListener(new SshFutureListener<IoWriteFuture>() {
- @SuppressWarnings("synthetic-access")
@Override
public void operationComplete(IoWriteFuture f) {
+ if (f.isWritten()) {
+ handleOperationCompleted();
+ } else {
+ handleOperationFailed(f.getException());
+ }
+ }
+
+ @SuppressWarnings("synthetic-access")
+ private void handleOperationCompleted() {
if (total > length) {
+ if (log.isTraceEnabled()) {
+ log.trace("doWriteIfPossible({}) completed write of {} out of {}", stream, length, total);
+ }
doWriteIfPossible(false);
} else {
- pendingWrite.compareAndSet(future, null);
+ boolean nullified = pendingWrite.compareAndSet(future, null);
+ if (log.isTraceEnabled()) {
+ log.trace("doWriteIfPossible({}) completed write len={}, more={}",
+ stream, total, !nullified);
+ }
future.setValue(Boolean.TRUE);
}
}
+
+ @SuppressWarnings("synthetic-access")
+ private void handleOperationFailed(Throwable reason) {
+ if (log.isDebugEnabled()) {
+ log.debug("doWriteIfPossible({}) failed ({}) to complete write of {} out of {}: {}",
+ stream, reason.getClass().getSimpleName(), length, total, reason.getMessage());
+ }
+
+ if (log.isTraceEnabled()) {
+ log.trace("doWriteIfPossible(" + this + ") write failure details", reason);
+ }
+
+ boolean nullified = pendingWrite.compareAndSet(future, null);
+ if (log.isTraceEnabled()) {
+ log.trace("doWriteIfPossible({}) failed write len={}, more={}",
+ stream, total, !nullified);
+ }
+ future.setValue(reason);
+ }
});
} catch (IOException e) {
future.setValue(e);
}
} else if (!resume) {
- log.debug("Delaying write to {} until space is available in the remote window", this);
+ if (log.isDebugEnabled()) {
+ log.debug("doWriteIfPossible({}) delaying write until space is available in the remote window", this);
+ }
}
} else {
- pendingWrite.compareAndSet(future, null);
+ boolean nullified = pendingWrite.compareAndSet(future, null);
+ if (log.isTraceEnabled()) {
+ log.trace("doWriteIfPossible({}) current buffer sent - more={}", this, !nullified);
+ }
future.setValue(Boolean.TRUE);
}
}
@Override
public String toString() {
- return "ChannelAsyncOutputStream[" + channel + "]";
+ return getClass().getSimpleName() + "[" + getChannel() + "] cmd=" + SshConstants.getCommandMessageName(cmd & 0xFF);
}
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/823540d4/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
index 11ff1b4..4b66a24 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
@@ -52,6 +52,7 @@ import org.apache.sshd.common.util.net.SshdSocketAddress;
import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
import org.apache.sshd.common.util.threads.ThreadUtils;
import org.apache.sshd.server.channel.AbstractServerChannel;
+import org.apache.sshd.server.channel.ServerChannel;
/**
* TODO Add javadoc
@@ -348,15 +349,34 @@ public class TcpipServerChannel extends AbstractServerChannel {
protected void doWriteData(byte[] data, int off, final int len) throws IOException {
// Make sure we copy the data as the incoming buffer may be reused
Buffer buf = ByteArrayBuffer.getCompactClone(data, off, len);
+ final ServerChannel channel = this;
ioSession.write(buf).addListener(new SshFutureListener<IoWriteFuture>() {
@Override
+ @SuppressWarnings("synthetic-access")
public void operationComplete(IoWriteFuture future) {
- try {
- Window wLocal = getLocalWindow();
- wLocal.consumeAndCheck(len);
- } catch (IOException e) {
- Session session = getSession();
- session.exceptionCaught(e);
+ Session session = getSession();
+ if (future.isWritten()) {
+ try {
+ Window wLocal = getLocalWindow();
+ wLocal.consumeAndCheck(len);
+ } catch (IOException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("doWriteData({}) failed ({}) to consume len={}: {}",
+ channel, e.getClass().getSimpleName(), len, e.getMessage());
+ }
+ session.exceptionCaught(e);
+ }
+ } else {
+ Throwable t = future.getException();
+ if (log.isDebugEnabled()) {
+ log.debug("doWriteData({}) failed ({}) to write len={}: {}",
+ channel, t.getClass().getSimpleName(), len, t.getMessage());
+ }
+
+ if (log.isTraceEnabled()) {
+ log.trace("doWriteData(" + channel + ") len=" + len + " write failure details", t);
+ }
+ session.exceptionCaught(t);
}
}
});
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/823540d4/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java b/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
index f7017b3..b20cfb7 100644
--- a/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
@@ -112,7 +112,6 @@ public class WindowAdjustTest extends BaseTestSupport {
@Test(timeout = 6L * 60L * 1000L)
public void testTrafficHeavyLoad() throws Exception {
-
try (SshClient client = setupTestClient()) {
client.start();
@@ -274,7 +273,7 @@ public class WindowAdjustTest extends BaseTestSupport {
/**
* Wrapper for asyncIn stream that catches Pending exception and queues the pending messages for later retry (send after previous messages were fully transfered)
*/
- private static class AsyncInPendingWrapper {
+ private static class AsyncInPendingWrapper extends AbstractLoggingBean {
private IoOutputStream asyncIn;
// Order has to be preserved for queued writes
@@ -309,11 +308,16 @@ public class WindowAdjustTest extends BaseTestSupport {
asyncIn.write(msg).addListener(new SshFutureListener<IoWriteFuture>() {
@SuppressWarnings("synthetic-access")
@Override
- public void operationComplete(final IoWriteFuture future) {
- if (wasPending) {
- pending.remove();
+ public void operationComplete(IoWriteFuture future) {
+ if (future.isWritten()) {
+ if (wasPending) {
+ pending.remove();
+ }
+ writePendingIfAny();
+ } else {
+ Throwable t = future.getException();
+ log.warn("Failed to write message", t);
}
- writePendingIfAny();
}
});
} catch (final WritePendingException e) {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/823540d4/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java b/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java
index a66c3b6..4623a52 100644
--- a/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java
+++ b/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java
@@ -25,11 +25,13 @@ import java.nio.charset.StandardCharsets;
import org.apache.sshd.common.Factory;
import org.apache.sshd.common.channel.BufferedIoOutputStream;
+import org.apache.sshd.common.channel.Window;
import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.io.IoInputStream;
import org.apache.sshd.common.io.IoOutputStream;
import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
import org.apache.sshd.server.AsyncCommand;
import org.apache.sshd.server.ChannelSessionAware;
@@ -144,10 +146,17 @@ public class AsyncEchoShellFactory implements Factory<Command> {
out.write(new ByteArrayBuffer(bytes)).addListener(new SshFutureListener<IoWriteFuture>() {
@Override
public void operationComplete(IoWriteFuture future) {
- try {
- channel.getLocalWindow().consumeAndCheck(bytes.length);
- } catch (IOException e) {
- channel.getSession().exceptionCaught(e);
+ Session session = channel.getSession();
+ if (future.isWritten()) {
+ try {
+ Window wLocal = channel.getLocalWindow();
+ wLocal.consumeAndCheck(bytes.length);
+ } catch (IOException e) {
+ session.exceptionCaught(e);
+ }
+ } else {
+ Throwable t = future.getException();
+ session.exceptionCaught(t);
}
}
});