You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2017/10/11 17:34:21 UTC

[2/3] mina-sshd git commit: [SSHD-776] SSHD local port forwarding close session unexpectedly

[SSHD-776] SSHD local port forwarding close session unexpectedly


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/0e99597a
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/0e99597a
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/0e99597a

Branch: refs/heads/master
Commit: 0e99597aceb035d271f0140348df23d32b5e3bcd
Parents: 2529a4c
Author: Fulvio Cavarretta <fu...@primeur.com>
Authored: Wed Oct 11 18:21:11 2017 +0300
Committer: Lyor Goldstein <ly...@gmail.com>
Committed: Wed Oct 11 20:35:15 2017 +0300

----------------------------------------------------------------------
 .../sshd/common/channel/AbstractChannel.java    |  1 +
 .../apache/sshd/common/io/nio2/Nio2Session.java | 66 ++++++++++++--------
 .../helpers/AbstractConnectionService.java      | 15 ++++-
 .../sshd/server/forward/TcpipServerChannel.java | 63 ++++++++++---------
 4 files changed, 89 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0e99597a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index 2a6c0e9..e1eebef 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -776,6 +776,7 @@ public abstract class AbstractChannel
             throw err;
         }
     }
+
     @Override
     protected void doCloseImmediately() {
         if (service != null) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0e99597a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
index 0401224..a846562 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
@@ -133,11 +133,11 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
     @Override
     public IoWriteFuture writePacket(Buffer buffer) throws IOException {
         if (log.isDebugEnabled()) {
-            log.debug("Writing {} bytes", buffer.available());
+            log.debug("writePacket({}) Writing {} bytes", this, buffer.available());
         }
 
         ByteBuffer buf = ByteBuffer.wrap(buffer.array(), buffer.rpos(), buffer.available());
-        final Nio2DefaultIoWriteFuture future = new Nio2DefaultIoWriteFuture(null, buf);
+        Nio2DefaultIoWriteFuture future = new Nio2DefaultIoWriteFuture(null, buf);
         if (isClosing()) {
             Throwable exc = new ClosedChannelException();
             future.setException(exc);
@@ -150,32 +150,33 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
     }
 
     protected void exceptionCaught(Throwable exc) {
-        if (!closeFuture.isClosed()) {
-            AsynchronousSocketChannel socket = getSocket();
-            if (isClosing() || !socket.isOpen()) {
-                close(true);
-            } else {
-                IoHandler handler = getIoHandler();
-                try {
-                    if (log.isDebugEnabled()) {
-                        log.debug("exceptionCaught({}) caught {}[{}] - calling handler",
-                                  this, exc.getClass().getSimpleName(), exc.getMessage());
-                    }
-                    handler.exceptionCaught(this, exc);
-                } catch (Throwable e) {
-                    Throwable t = GenericUtils.peelException(e);
-                    if (log.isDebugEnabled()) {
-                        log.debug("exceptionCaught({}) Exception handler threw {}, closing the session: {}",
-                                  this, t.getClass().getSimpleName(), t.getMessage());
-                    }
+        if (closeFuture.isClosed()) {
+            return;
+        }
 
-                    if (log.isTraceEnabled()) {
-                        log.trace("exceptionCaught(" + this + ") exception handler failure details", t);
-                    }
-                    close(true);
+        AsynchronousSocketChannel socket = getSocket();
+        if (isOpen() && socket.isOpen()) {
+            IoHandler handler = getIoHandler();
+            try {
+                if (log.isDebugEnabled()) {
+                    log.debug("exceptionCaught({}) caught {}[{}] - calling handler",
+                              this, exc.getClass().getSimpleName(), exc.getMessage());
+                }
+                handler.exceptionCaught(this, exc);
+            } catch (Throwable e) {
+                Throwable t = GenericUtils.peelException(e);
+                if (log.isDebugEnabled()) {
+                    log.debug("exceptionCaught({}) Exception handler threw {}, closing the session: {}",
+                              this, t.getClass().getSimpleName(), t.getMessage());
+                }
+
+                if (log.isTraceEnabled()) {
+                    log.trace("exceptionCaught(" + this + ") exception handler failure details", t);
                 }
             }
         }
+
+        close(true);
     }
 
     @Override
@@ -193,12 +194,22 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
 
     @Override
     protected void doCloseImmediately() {
-        for (;;) {
+        for (boolean debugEnabled = log.isDebugEnabled();;) {
             // Cancel pending requests informing them of the cancellation
             Nio2DefaultIoWriteFuture future = writes.poll();
             if (future != null) {
+                if (future.isWritten()) {
+                    if (debugEnabled) {
+                        log.debug("doCloseImmediately({}) skip already written future={}", this, future);
+                    }
+                    continue;
+                }
+
                 Throwable error = future.getException();
                 if (error == null) {
+                    if (debugEnabled) {
+                        log.debug("doCloseImmediately({}) signal write abort for future={}", this, future);
+                    }
                     future.setException(new WriteAbortedException("Write request aborted due to immediate session close", null));
                 }
             } else {
@@ -379,6 +390,11 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
             if (log.isDebugEnabled()) {
                 log.debug("handleCompletedWriteCycle({}) finished writing len={}", this, writeLen);
             }
+
+            // This should be called before future.setWritten() to avoid WriteAbortedException
+            // to be thrown by doCloseImmediately when called in the listener of doCloseGracefully
+            writes.remove(future);
+
             future.setWritten();
             finishWrite(future);
         }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0e99597a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
index 1524196..14db9b1 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
@@ -307,7 +307,8 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
      */
     @Override
     public void unregisterChannel(Channel channel) {
-        Channel result = channels.remove(channel.getId());
+        int channelId = channel.getId();
+        Channel result = channels.remove(channelId);
         if (log.isDebugEnabled()) {
             log.debug("unregisterChannel({}) result={}", channel, result);
         }
@@ -436,8 +437,16 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
      */
     public void channelWindowAdjust(Buffer buffer) throws IOException {
         try {
-            Channel channel = getChannel(buffer);
-            channel.handleWindowAdjust(buffer);
+            // Do not use getChannel to avoid the session being closed
+            // if receiving the SSH_MSG_CHANNEL_WINDOW_ADJUST on an already closed channel
+            int recipient = buffer.getInt();
+            Channel channel = channels.get(recipient);
+            if (channel != null) {
+                channel.handleWindowAdjust(buffer);
+            } else {
+                log.warn("Received SSH_MSG_CHANNEL_WINDOW_ADJUST on unknown channel " + recipient);
+            }
+
         } catch (SshException e) {
             if (log.isDebugEnabled()) {
                 log.debug("channelWindowAdjust {} error: {}", e.getClass().getSimpleName(), e.getMessage());

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0e99597a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
index 57a5699..317e669 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
@@ -232,7 +232,7 @@ public class TcpipServerChannel extends AbstractServerChannel {
     protected void handleChannelOpenFailure(OpenFuture f, Throwable problem) {
         signalChannelOpenFailure(problem);
         notifyStateChanged(problem.getClass().getSimpleName());
-        closeImmediately0();
+        close(true);
 
         if (problem instanceof ConnectException) {
             f.setException(new SshChannelOpenException(getId(), SshConstants.SSH_OPEN_CONNECT_FAILED, problem.getMessage(), problem));
@@ -241,44 +241,43 @@ public class TcpipServerChannel extends AbstractServerChannel {
         }
 
     }
-    private void closeImmediately0() {
-        // We need to close the channel immediately to remove it from the
-        // server session's channel table and *not* send a packet to the
-        // client.  A notification was already sent by our caller, or will
-        // be sent after we return.
-        //
-        super.close(true);
+
+    @Override
+    public CloseFuture close(boolean immediately) {
+        CloseFuture closingFeature = super.close(immediately);
 
         // We also need to dispose of the connector, but unfortunately we
         // are being invoked by the connector thread or the connector's
-        // own processor thread.  Disposing of the connector within either
-        // causes deadlock.  Instead create a thread to dispose of the
+        // own processor thread. Disposing of the connector within either
+        // causes deadlock. Instead create a thread to dispose of the
         // connector in the background.
-
         ExecutorService service = getExecutorService();
+
         // allocate a temporary executor service if none provided
-        final ExecutorService executors = (service == null)
+        ExecutorService executors = (service == null)
                 ? ThreadUtils.newSingleThreadExecutor("TcpIpServerChannel-ConnectorCleanup[" + getSession() + "]")
                 : service;
         // shutdown the temporary executor service if had to create it
-        final boolean shutdown = executors != service || isShutdownOnExit();
-        executors.submit(() -> {
-            try {
-                connector.close(true);
-            } finally {
-                if (shutdown && !executors.isShutdown()) {
-                    Collection<Runnable> runners = executors.shutdownNow();
+        boolean shutdown = (executors != service) || isShutdownOnExit();
+
+        return builder().when(closingFeature).run(() -> {
+            executors.submit(() -> {
+                try {
                     if (log.isDebugEnabled()) {
-                        log.debug("destroy({}) - shutdown executor service - runners count={}", TcpipServerChannel.this, runners.size());
+                        log.debug("disposing connector: {} for: {}", connector, TcpipServerChannel.this);
+                    }
+                    connector.close(immediately);
+                } finally {
+                    if (shutdown && (!executors.isShutdown())) {
+                        Collection<Runnable> runners = executors.shutdownNow();
+                        if (log.isDebugEnabled()) {
+                            log.debug("destroy({}) - shutdown executor service - runners count={}",
+                                      TcpipServerChannel.this, runners.size());
+                        }
                     }
                 }
-            }
-        });
-    }
-
-    @Override
-    public CloseFuture close(boolean immediately) {
-        return super.close(immediately).addListener(sshFuture -> closeImmediately0());
+            });
+        }).build().close(false);
     }
 
     @Override
@@ -328,6 +327,14 @@ public class TcpipServerChannel extends AbstractServerChannel {
                     + " len=" + len + " write failure details", t);
         }
 
-        session.exceptionCaught(t);
+        if (ioSession.isOpen()) {
+            session.exceptionCaught(t);
+        } else {
+            // In case remote entity has closed the socket (the ioSession), data coming from
+            // the SSH channel should be simply discarded
+            if (log.isDebugEnabled()) {
+                log.debug("Ignoring writeDataFailure {} because ioSession {} is already closing ", t, ioSession);
+            }
+        }
     }
 }