You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2016/02/15 17:51:22 UTC

[4/6] mina-sshd git commit: Take into account IoWriteFuture#isWritten result

Take into account IoWriteFuture#isWritten result


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/823540d4
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/823540d4
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/823540d4

Branch: refs/heads/master
Commit: 823540d4c68014cb02a272e04c743a4b14a29926
Parents: 2bde68e
Author: Lyor Goldstein <ly...@gmail.com>
Authored: Mon Feb 15 18:51:10 2016 +0200
Committer: Lyor Goldstein <ly...@gmail.com>
Committed: Mon Feb 15 18:51:10 2016 +0200

----------------------------------------------------------------------
 .../channel/ChannelAsyncOutputStream.java       | 63 +++++++++++++++++---
 .../sshd/server/forward/TcpipServerChannel.java | 32 ++++++++--
 .../java/org/apache/sshd/WindowAdjustTest.java  | 16 +++--
 .../sshd/util/test/AsyncEchoShellFactory.java   | 17 ++++--
 4 files changed, 104 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/823540d4/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
index c790f29..7305937 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
@@ -28,20 +28,26 @@ import org.apache.sshd.common.io.IoOutputStream;
 import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.io.WritePendingException;
 import org.apache.sshd.common.session.Session;
+import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.closeable.AbstractCloseable;
 
-public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOutputStream {
+public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOutputStream, ChannelHolder {
 
-    private final Channel channel;
+    private final Channel channelInstance;
     private final byte cmd;
     private final AtomicReference<IoWriteFutureImpl> pendingWrite = new AtomicReference<>();
 
     public ChannelAsyncOutputStream(Channel channel, byte cmd) {
-        this.channel = channel;
+        this.channelInstance = ValidateUtils.checkNotNull(channel, "No channel");
         this.cmd = cmd;
     }
 
+    @Override
+    public Channel getChannel() {
+        return channelInstance;
+    }
+
     public void onWindowExpanded() throws IOException {
         doWriteIfPossible(true);
     }
@@ -77,6 +83,7 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
         final Buffer buffer = future.getBuffer();
         final int total = buffer.available();
         if (total > 0) {
+            Channel channel = getChannel();
             Window remoteWindow = channel.getRemoteWindow();
             final int length = Math.min(Math.min(remoteWindow.getSize(), total), remoteWindow.getPacketSize());
             if (log.isTraceEnabled()) {
@@ -101,32 +108,72 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
                 buffer.rpos(buffer.rpos() + length);
                 remoteWindow.consume(length);
                 try {
+                    final ChannelAsyncOutputStream stream = this;
                     s.writePacket(buf).addListener(new SshFutureListener<IoWriteFuture>() {
-                        @SuppressWarnings("synthetic-access")
                         @Override
                         public void operationComplete(IoWriteFuture f) {
+                            if (f.isWritten()) {
+                                handleOperationCompleted();
+                            } else {
+                                handleOperationFailed(f.getException());
+                            }
+                        }
+
+                        @SuppressWarnings("synthetic-access")
+                        private void handleOperationCompleted() {
                             if (total > length) {
+                                if (log.isTraceEnabled()) {
+                                    log.trace("doWriteIfPossible({}) completed write of {} out of {}", stream, length, total);
+                                }
                                 doWriteIfPossible(false);
                             } else {
-                                pendingWrite.compareAndSet(future, null);
+                                boolean nullified = pendingWrite.compareAndSet(future, null);
+                                if (log.isTraceEnabled()) {
+                                    log.trace("doWriteIfPossible({}) completed write len={}, more={}",
+                                              stream, total, !nullified);
+                                }
                                 future.setValue(Boolean.TRUE);
                             }
                         }
+
+                        @SuppressWarnings("synthetic-access")
+                        private void handleOperationFailed(Throwable reason) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("doWriteIfPossible({}) failed ({}) to complete write of {} out of {}: {}",
+                                          stream, reason.getClass().getSimpleName(), length, total, reason.getMessage());
+                            }
+
+                            if (log.isTraceEnabled()) {
+                                log.trace("doWriteIfPossible(" + this + ") write failure details", reason);
+                            }
+
+                            boolean nullified = pendingWrite.compareAndSet(future, null);
+                            if (log.isTraceEnabled()) {
+                                log.trace("doWriteIfPossible({}) failed write len={}, more={}",
+                                          stream, total, !nullified);
+                            }
+                            future.setValue(reason);
+                        }
                     });
                 } catch (IOException e) {
                     future.setValue(e);
                 }
             } else if (!resume) {
-                log.debug("Delaying write to {} until space is available in the remote window", this);
+                if (log.isDebugEnabled()) {
+                    log.debug("doWriteIfPossible({}) delaying write until space is available in the remote window", this);
+                }
             }
         } else {
-            pendingWrite.compareAndSet(future, null);
+            boolean nullified = pendingWrite.compareAndSet(future, null);
+            if (log.isTraceEnabled()) {
+                log.trace("doWriteIfPossible({}) current buffer sent - more={}", this, !nullified);
+            }
             future.setValue(Boolean.TRUE);
         }
     }
 
     @Override
     public String toString() {
-        return "ChannelAsyncOutputStream[" + channel + "]";
+        return getClass().getSimpleName() + "[" + getChannel() + "] cmd=" + SshConstants.getCommandMessageName(cmd & 0xFF);
     }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/823540d4/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
index 11ff1b4..4b66a24 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
@@ -52,6 +52,7 @@ import org.apache.sshd.common.util.net.SshdSocketAddress;
 import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
 import org.apache.sshd.common.util.threads.ThreadUtils;
 import org.apache.sshd.server.channel.AbstractServerChannel;
+import org.apache.sshd.server.channel.ServerChannel;
 
 /**
  * TODO Add javadoc
@@ -348,15 +349,34 @@ public class TcpipServerChannel extends AbstractServerChannel {
     protected void doWriteData(byte[] data, int off, final int len) throws IOException {
         // Make sure we copy the data as the incoming buffer may be reused
         Buffer buf = ByteArrayBuffer.getCompactClone(data, off, len);
+        final ServerChannel channel = this;
         ioSession.write(buf).addListener(new SshFutureListener<IoWriteFuture>() {
             @Override
+            @SuppressWarnings("synthetic-access")
             public void operationComplete(IoWriteFuture future) {
-                try {
-                    Window wLocal = getLocalWindow();
-                    wLocal.consumeAndCheck(len);
-                } catch (IOException e) {
-                    Session session = getSession();
-                    session.exceptionCaught(e);
+                Session session = getSession();
+                if (future.isWritten()) {
+                    try {
+                        Window wLocal = getLocalWindow();
+                        wLocal.consumeAndCheck(len);
+                    } catch (IOException e) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("doWriteData({}) failed ({}) to consume len={}: {}",
+                                      channel, e.getClass().getSimpleName(), len, e.getMessage());
+                        }
+                        session.exceptionCaught(e);
+                    }
+                } else {
+                    Throwable t = future.getException();
+                    if (log.isDebugEnabled()) {
+                        log.debug("doWriteData({}) failed ({}) to write len={}: {}",
+                                  channel, t.getClass().getSimpleName(), len, t.getMessage());
+                    }
+
+                    if (log.isTraceEnabled()) {
+                        log.trace("doWriteData(" + channel + ") len=" + len + " write failure details", t);
+                    }
+                    session.exceptionCaught(t);
                 }
             }
         });

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/823540d4/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java b/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
index f7017b3..b20cfb7 100644
--- a/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
@@ -112,7 +112,6 @@ public class WindowAdjustTest extends BaseTestSupport {
 
     @Test(timeout = 6L * 60L * 1000L)
     public void testTrafficHeavyLoad() throws Exception {
-
         try (SshClient client = setupTestClient()) {
             client.start();
 
@@ -274,7 +273,7 @@ public class WindowAdjustTest extends BaseTestSupport {
     /**
      * Wrapper for asyncIn stream that catches Pending exception and queues the pending messages for later retry (send after previous messages were fully transfered)
      */
-    private static class AsyncInPendingWrapper {
+    private static class AsyncInPendingWrapper extends AbstractLoggingBean {
         private IoOutputStream asyncIn;
 
         // Order has to be preserved for queued writes
@@ -309,11 +308,16 @@ public class WindowAdjustTest extends BaseTestSupport {
                 asyncIn.write(msg).addListener(new SshFutureListener<IoWriteFuture>() {
                     @SuppressWarnings("synthetic-access")
                     @Override
-                    public void operationComplete(final IoWriteFuture future) {
-                        if (wasPending) {
-                            pending.remove();
+                    public void operationComplete(IoWriteFuture future) {
+                        if (future.isWritten()) {
+                            if (wasPending) {
+                                pending.remove();
+                            }
+                            writePendingIfAny();
+                        } else {
+                            Throwable t = future.getException();
+                            log.warn("Failed to write message", t);
                         }
-                        writePendingIfAny();
                     }
                 });
             } catch (final WritePendingException e) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/823540d4/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java b/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java
index a66c3b6..4623a52 100644
--- a/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java
+++ b/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java
@@ -25,11 +25,13 @@ import java.nio.charset.StandardCharsets;
 
 import org.apache.sshd.common.Factory;
 import org.apache.sshd.common.channel.BufferedIoOutputStream;
+import org.apache.sshd.common.channel.Window;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.io.IoInputStream;
 import org.apache.sshd.common.io.IoOutputStream;
 import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.server.AsyncCommand;
 import org.apache.sshd.server.ChannelSessionAware;
@@ -144,10 +146,17 @@ public class AsyncEchoShellFactory implements Factory<Command> {
                     out.write(new ByteArrayBuffer(bytes)).addListener(new SshFutureListener<IoWriteFuture>() {
                         @Override
                         public void operationComplete(IoWriteFuture future) {
-                            try {
-                                channel.getLocalWindow().consumeAndCheck(bytes.length);
-                            } catch (IOException e) {
-                                channel.getSession().exceptionCaught(e);
+                            Session session = channel.getSession();
+                            if (future.isWritten()) {
+                                try {
+                                    Window wLocal = channel.getLocalWindow();
+                                    wLocal.consumeAndCheck(bytes.length);
+                                } catch (IOException e) {
+                                    session.exceptionCaught(e);
+                                }
+                            } else {
+                                Throwable t = future.getException();
+                                session.exceptionCaught(t);
                             }
                         }
                     });