You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2017/10/11 17:34:21 UTC
[2/3] mina-sshd git commit: [SSHD-776] SSHD local port forwarding
close session unexpectedly
[SSHD-776] SSHD local port forwarding close session unexpectedly
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/0e99597a
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/0e99597a
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/0e99597a
Branch: refs/heads/master
Commit: 0e99597aceb035d271f0140348df23d32b5e3bcd
Parents: 2529a4c
Author: Fulvio Cavarretta <fu...@primeur.com>
Authored: Wed Oct 11 18:21:11 2017 +0300
Committer: Lyor Goldstein <ly...@gmail.com>
Committed: Wed Oct 11 20:35:15 2017 +0300
----------------------------------------------------------------------
.../sshd/common/channel/AbstractChannel.java | 1 +
.../apache/sshd/common/io/nio2/Nio2Session.java | 66 ++++++++++++--------
.../helpers/AbstractConnectionService.java | 15 ++++-
.../sshd/server/forward/TcpipServerChannel.java | 63 ++++++++++---------
4 files changed, 89 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0e99597a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index 2a6c0e9..e1eebef 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -776,6 +776,7 @@ public abstract class AbstractChannel
throw err;
}
}
+
@Override
protected void doCloseImmediately() {
if (service != null) {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0e99597a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
index 0401224..a846562 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
@@ -133,11 +133,11 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
@Override
public IoWriteFuture writePacket(Buffer buffer) throws IOException {
if (log.isDebugEnabled()) {
- log.debug("Writing {} bytes", buffer.available());
+ log.debug("writePacket({}) Writing {} bytes", this, buffer.available());
}
ByteBuffer buf = ByteBuffer.wrap(buffer.array(), buffer.rpos(), buffer.available());
- final Nio2DefaultIoWriteFuture future = new Nio2DefaultIoWriteFuture(null, buf);
+ Nio2DefaultIoWriteFuture future = new Nio2DefaultIoWriteFuture(null, buf);
if (isClosing()) {
Throwable exc = new ClosedChannelException();
future.setException(exc);
@@ -150,32 +150,33 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
}
protected void exceptionCaught(Throwable exc) {
- if (!closeFuture.isClosed()) {
- AsynchronousSocketChannel socket = getSocket();
- if (isClosing() || !socket.isOpen()) {
- close(true);
- } else {
- IoHandler handler = getIoHandler();
- try {
- if (log.isDebugEnabled()) {
- log.debug("exceptionCaught({}) caught {}[{}] - calling handler",
- this, exc.getClass().getSimpleName(), exc.getMessage());
- }
- handler.exceptionCaught(this, exc);
- } catch (Throwable e) {
- Throwable t = GenericUtils.peelException(e);
- if (log.isDebugEnabled()) {
- log.debug("exceptionCaught({}) Exception handler threw {}, closing the session: {}",
- this, t.getClass().getSimpleName(), t.getMessage());
- }
+ if (closeFuture.isClosed()) {
+ return;
+ }
- if (log.isTraceEnabled()) {
- log.trace("exceptionCaught(" + this + ") exception handler failure details", t);
- }
- close(true);
+ AsynchronousSocketChannel socket = getSocket();
+ if (isOpen() && socket.isOpen()) {
+ IoHandler handler = getIoHandler();
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("exceptionCaught({}) caught {}[{}] - calling handler",
+ this, exc.getClass().getSimpleName(), exc.getMessage());
+ }
+ handler.exceptionCaught(this, exc);
+ } catch (Throwable e) {
+ Throwable t = GenericUtils.peelException(e);
+ if (log.isDebugEnabled()) {
+ log.debug("exceptionCaught({}) Exception handler threw {}, closing the session: {}",
+ this, t.getClass().getSimpleName(), t.getMessage());
+ }
+
+ if (log.isTraceEnabled()) {
+ log.trace("exceptionCaught(" + this + ") exception handler failure details", t);
}
}
}
+
+ close(true);
}
@Override
@@ -193,12 +194,22 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
@Override
protected void doCloseImmediately() {
- for (;;) {
+ for (boolean debugEnabled = log.isDebugEnabled();;) {
// Cancel pending requests informing them of the cancellation
Nio2DefaultIoWriteFuture future = writes.poll();
if (future != null) {
+ if (future.isWritten()) {
+ if (debugEnabled) {
+ log.debug("doCloseImmediately({}) skip already written future={}", this, future);
+ }
+ continue;
+ }
+
Throwable error = future.getException();
if (error == null) {
+ if (debugEnabled) {
+ log.debug("doCloseImmediately({}) signal write abort for future={}", this, future);
+ }
future.setException(new WriteAbortedException("Write request aborted due to immediate session close", null));
}
} else {
@@ -379,6 +390,11 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
if (log.isDebugEnabled()) {
log.debug("handleCompletedWriteCycle({}) finished writing len={}", this, writeLen);
}
+
+ // This should be called before future.setWritten() to avoid WriteAbortedException
+ // to be thrown by doCloseImmediately when called in the listener of doCloseGracefully
+ writes.remove(future);
+
future.setWritten();
finishWrite(future);
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0e99597a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
index 1524196..14db9b1 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
@@ -307,7 +307,8 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
*/
@Override
public void unregisterChannel(Channel channel) {
- Channel result = channels.remove(channel.getId());
+ int channelId = channel.getId();
+ Channel result = channels.remove(channelId);
if (log.isDebugEnabled()) {
log.debug("unregisterChannel({}) result={}", channel, result);
}
@@ -436,8 +437,16 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
*/
public void channelWindowAdjust(Buffer buffer) throws IOException {
try {
- Channel channel = getChannel(buffer);
- channel.handleWindowAdjust(buffer);
+ // Do not use getChannel to avoid the session being closed
+ // if receiving the SSH_MSG_CHANNEL_WINDOW_ADJUST on an already closed channel
+ int recipient = buffer.getInt();
+ Channel channel = channels.get(recipient);
+ if (channel != null) {
+ channel.handleWindowAdjust(buffer);
+ } else {
+ log.warn("Received SSH_MSG_CHANNEL_WINDOW_ADJUST on unknown channel " + recipient);
+ }
+
} catch (SshException e) {
if (log.isDebugEnabled()) {
log.debug("channelWindowAdjust {} error: {}", e.getClass().getSimpleName(), e.getMessage());
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0e99597a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
index 57a5699..317e669 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
@@ -232,7 +232,7 @@ public class TcpipServerChannel extends AbstractServerChannel {
protected void handleChannelOpenFailure(OpenFuture f, Throwable problem) {
signalChannelOpenFailure(problem);
notifyStateChanged(problem.getClass().getSimpleName());
- closeImmediately0();
+ close(true);
if (problem instanceof ConnectException) {
f.setException(new SshChannelOpenException(getId(), SshConstants.SSH_OPEN_CONNECT_FAILED, problem.getMessage(), problem));
@@ -241,44 +241,43 @@ public class TcpipServerChannel extends AbstractServerChannel {
}
}
- private void closeImmediately0() {
- // We need to close the channel immediately to remove it from the
- // server session's channel table and *not* send a packet to the
- // client. A notification was already sent by our caller, or will
- // be sent after we return.
- //
- super.close(true);
+
+ @Override
+ public CloseFuture close(boolean immediately) {
+ CloseFuture closingFeature = super.close(immediately);
// We also need to dispose of the connector, but unfortunately we
// are being invoked by the connector thread or the connector's
- // own processor thread. Disposing of the connector within either
- // causes deadlock. Instead create a thread to dispose of the
+ // own processor thread. Disposing of the connector within either
+ // causes deadlock. Instead create a thread to dispose of the
// connector in the background.
-
ExecutorService service = getExecutorService();
+
// allocate a temporary executor service if none provided
- final ExecutorService executors = (service == null)
+ ExecutorService executors = (service == null)
? ThreadUtils.newSingleThreadExecutor("TcpIpServerChannel-ConnectorCleanup[" + getSession() + "]")
: service;
// shutdown the temporary executor service if had to create it
- final boolean shutdown = executors != service || isShutdownOnExit();
- executors.submit(() -> {
- try {
- connector.close(true);
- } finally {
- if (shutdown && !executors.isShutdown()) {
- Collection<Runnable> runners = executors.shutdownNow();
+ boolean shutdown = (executors != service) || isShutdownOnExit();
+
+ return builder().when(closingFeature).run(() -> {
+ executors.submit(() -> {
+ try {
if (log.isDebugEnabled()) {
- log.debug("destroy({}) - shutdown executor service - runners count={}", TcpipServerChannel.this, runners.size());
+ log.debug("disposing connector: {} for: {}", connector, TcpipServerChannel.this);
+ }
+ connector.close(immediately);
+ } finally {
+ if (shutdown && (!executors.isShutdown())) {
+ Collection<Runnable> runners = executors.shutdownNow();
+ if (log.isDebugEnabled()) {
+ log.debug("destroy({}) - shutdown executor service - runners count={}",
+ TcpipServerChannel.this, runners.size());
+ }
}
}
- }
- });
- }
-
- @Override
- public CloseFuture close(boolean immediately) {
- return super.close(immediately).addListener(sshFuture -> closeImmediately0());
+ });
+ }).build().close(false);
}
@Override
@@ -328,6 +327,14 @@ public class TcpipServerChannel extends AbstractServerChannel {
+ " len=" + len + " write failure details", t);
}
- session.exceptionCaught(t);
+ if (ioSession.isOpen()) {
+ session.exceptionCaught(t);
+ } else {
+ // In case remote entity has closed the socket (the ioSession), data coming from
+ // the SSH channel should be simply discarded
+ if (log.isDebugEnabled()) {
+ log.debug("Ignoring writeDataFailure {} because ioSession {} is already closing ", t, ioSession);
+ }
+ }
}
}