You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2014/10/16 23:19:14 UTC
[6/8] git commit: [SSHD-360] Rework CloseableUtils code
[SSHD-360] Rework CloseableUtils code
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/d8386644
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/d8386644
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/d8386644
Branch: refs/heads/master
Commit: d8386644d897daccace5808338b24c5ee9b1bef8
Parents: b98694d
Author: Guillaume Nodet <gn...@apache.org>
Authored: Thu Oct 16 17:27:04 2014 +0200
Committer: Guillaume Nodet <gn...@apache.org>
Committed: Thu Oct 16 22:57:11 2014 +0200
----------------------------------------------------------------------
.../main/java/org/apache/sshd/SshClient.java | 36 +-
.../main/java/org/apache/sshd/SshServer.java | 43 +-
.../sshd/agent/common/AgentForwardSupport.java | 11 +-
.../sshd/agent/local/AgentForwardedChannel.java | 5 -
.../sshd/agent/unix/AgentForwardedChannel.java | 5 -
.../client/channel/AbstractClientChannel.java | 74 +---
.../sshd/client/channel/ChannelDirectTcpip.java | 6 +-
.../sshd/client/channel/ChannelSession.java | 8 +-
.../client/session/ClientUserAuthService.java | 2 +-
.../sshd/common/AbstractFactoryManager.java | 4 +-
.../sshd/common/channel/AbstractChannel.java | 89 ++--
.../common/channel/ChannelAsyncInputStream.java | 20 +-
.../channel/ChannelAsyncOutputStream.java | 15 +-
.../common/forward/DefaultTcpipForwarder.java | 8 +-
.../sshd/common/forward/TcpipClientChannel.java | 2 +-
.../sshd/common/forward/TcpipServerChannel.java | 2 +-
.../apache/sshd/common/io/nio2/Nio2Service.java | 7 +-
.../apache/sshd/common/io/nio2/Nio2Session.java | 12 +-
.../session/AbstractConnectionService.java | 36 +-
.../sshd/common/session/AbstractSession.java | 21 +-
.../apache/sshd/common/util/CloseableUtils.java | 439 ++++++++-----------
.../sshd/server/channel/ChannelSession.java | 91 ++--
.../sshd/server/session/ServerSession.java | 2 +-
.../sshd/server/x11/X11ForwardSupport.java | 20 +-
.../test/java/org/apache/sshd/CipherTest.java | 2 +-
.../test/java/org/apache/sshd/ClientTest.java | 32 +-
.../java/org/apache/sshd/CompressionTest.java | 2 +-
.../src/test/java/org/apache/sshd/KexTest.java | 2 +-
.../java/org/apache/sshd/KeyReExchangeTest.java | 2 +-
.../src/test/java/org/apache/sshd/LoadTest.java | 60 +--
.../src/test/java/org/apache/sshd/MacTest.java | 35 +-
.../org/apache/sshd/PortForwardingLoadTest.java | 4 +-
.../org/apache/sshd/PortForwardingTest.java | 4 +-
.../src/test/java/org/apache/sshd/ScpTest.java | 2 +-
.../src/test/java/org/apache/sshd/SftpTest.java | 2 +-
.../file/virtualfs/VirtualFileSystemTest.java | 4 -
36 files changed, 482 insertions(+), 627 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/SshClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/SshClient.java b/sshd-core/src/main/java/org/apache/sshd/SshClient.java
index 9eb47b8..0949574 100644
--- a/sshd-core/src/main/java/org/apache/sshd/SshClient.java
+++ b/sshd-core/src/main/java/org/apache/sshd/SshClient.java
@@ -62,7 +62,6 @@ import org.apache.sshd.common.Closeable;
import org.apache.sshd.common.Factory;
import org.apache.sshd.common.KeyPairProvider;
import org.apache.sshd.common.NamedFactory;
-import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.io.DefaultIoServiceFactoryFactory;
import org.apache.sshd.common.io.IoConnectFuture;
@@ -249,26 +248,21 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
start();
}
- public CloseFuture close(boolean immediately) {
- CloseFuture future;
- if (connector != null) {
- future = CloseableUtils.sequential(connector, ioServiceFactory).close(immediately);
- } else if (ioServiceFactory != null) {
- future = ioServiceFactory.close(immediately);
- } else {
- future = CloseableUtils.closed();
- }
- future.addListener(new SshFutureListener<CloseFuture>() {
- public void operationComplete(CloseFuture future) {
- connector = null;
- ioServiceFactory = null;
- if (shutdownExecutor && executor != null) {
- executor.shutdown();
- executor = null;
- }
- }
- });
- return future;
+ @Override
+ protected Closeable getInnerCloseable() {
+ return builder()
+ .sequential(connector, ioServiceFactory)
+ .run(new Runnable() {
+ public void run() {
+ connector = null;
+ ioServiceFactory = null;
+ if (shutdownExecutor && executor != null) {
+ executor.shutdownNow();
+ executor = null;
+ }
+ }
+ })
+ .build();
}
/**
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/SshServer.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/SshServer.java b/sshd-core/src/main/java/org/apache/sshd/SshServer.java
index 9d7d0c0..8d5d583 100644
--- a/sshd-core/src/main/java/org/apache/sshd/SshServer.java
+++ b/sshd-core/src/main/java/org/apache/sshd/SshServer.java
@@ -36,7 +36,6 @@ import org.apache.sshd.common.ForwardingFilter;
import org.apache.sshd.common.NamedFactory;
import org.apache.sshd.common.Session;
import org.apache.sshd.common.SshdSocketAddress;
-import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.io.DefaultIoServiceFactoryFactory;
import org.apache.sshd.common.io.IoAcceptor;
import org.apache.sshd.common.io.IoServiceFactory;
@@ -324,29 +323,25 @@ public class SshServer extends AbstractFactoryManager implements ServerFactoryMa
}
@Override
- protected CloseFuture doCloseGracefully() {
- stopSessionTimeoutListener();
- CloseFuture future;
- if (acceptor != null) {
- future = CloseableUtils.sequential(acceptor, ioServiceFactory).close(false);
- } else if (ioServiceFactory != null) {
- future = ioServiceFactory.close(false);
- } else {
- future = CloseableUtils.closed();
- }
- return future;
- }
-
- @Override
- protected void doCloseImmediately() {
- CloseableUtils.sequential(acceptor, ioServiceFactory).close(true);
- acceptor = null;
- ioServiceFactory = null;
- if (shutdownExecutor && executor != null) {
- executor.shutdown();
- executor = null;
- }
- super.doCloseImmediately();
+ protected Closeable getInnerCloseable() {
+ return builder()
+ .run(new Runnable() {
+ public void run() {
+ stopSessionTimeoutListener();
+ }
+ })
+ .sequential(acceptor, ioServiceFactory)
+ .run(new Runnable() {
+ public void run() {
+ acceptor = null;
+ ioServiceFactory = null;
+ if (shutdownExecutor && executor != null) {
+ executor.shutdownNow();
+ executor = null;
+ }
+ }
+ })
+ .build();
}
/**
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/agent/common/AgentForwardSupport.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/common/AgentForwardSupport.java b/sshd-core/src/main/java/org/apache/sshd/agent/common/AgentForwardSupport.java
index 4f6a464..c1a775f 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/common/AgentForwardSupport.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/common/AgentForwardSupport.java
@@ -21,22 +21,15 @@ package org.apache.sshd.agent.common;
import java.io.IOException;
import org.apache.sshd.agent.SshAgentServer;
-import org.apache.sshd.common.Closeable;
import org.apache.sshd.common.SshException;
-import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.session.ConnectionService;
import org.apache.sshd.common.util.CloseableUtils;
-import org.apache.sshd.server.session.ServerSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* The server side fake agent, acting as an agent, but actually forwarding the requests to the auth channel on the client side.
*/
public class AgentForwardSupport extends CloseableUtils.AbstractCloseable {
- private static final Logger log = LoggerFactory.getLogger(AgentForwardSupport.class);
-
private final ConnectionService service;
private String agentId;
private SshAgentServer agentServer;
@@ -73,4 +66,8 @@ public class AgentForwardSupport extends CloseableUtils.AbstractCloseable {
super.doCloseImmediately();
}
+ public String toString() {
+ return getClass().getSimpleName() + "[" + service.getSession() + "]";
+ }
+
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
index e632e2c..7abf714 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ArrayBlockingQueue;
import org.apache.sshd.agent.SshAgent;
import org.apache.sshd.agent.common.AbstractAgentProxy;
import org.apache.sshd.client.channel.AbstractClientChannel;
-import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.channel.ChannelOutputStream;
import org.apache.sshd.common.util.Buffer;
@@ -69,10 +68,6 @@ public class AgentForwardedChannel extends AbstractClientChannel {
}
}
- public OpenFuture open() throws IOException {
- return internalOpen();
- }
-
@Override
protected void doOpen() throws IOException {
if (streaming == Streaming.Async) {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
index 327bf66..486c837 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
@@ -21,7 +21,6 @@ package org.apache.sshd.agent.unix;
import java.io.IOException;
import org.apache.sshd.client.channel.AbstractClientChannel;
-import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.channel.ChannelOutputStream;
import org.apache.tomcat.jni.Socket;
@@ -56,10 +55,6 @@ public class AgentForwardedChannel extends AbstractClientChannel implements Runn
}
}
- public synchronized OpenFuture open() throws IOException {
- return internalOpen();
- }
-
@Override
protected synchronized void doOpen() throws IOException {
if (streaming == Streaming.Async) {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
index 793e8b8..964ef3a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
@@ -35,10 +35,7 @@ import org.apache.sshd.common.channel.ChannelAsyncInputStream;
import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
import org.apache.sshd.common.io.IoInputStream;
import org.apache.sshd.common.io.IoOutputStream;
-import org.apache.sshd.common.future.CloseFuture;
-import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.util.Buffer;
-import org.apache.sshd.common.util.CloseableUtils;
import org.apache.sshd.common.util.IoUtils;
/**
@@ -142,56 +139,31 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
}
@Override
- public CloseFuture close(final boolean immediately) {
- if (!closeFuture.isDone()) {
- if (opened) {
- super.close(immediately);
- } else if (openFuture != null) {
- if (immediately) {
- openFuture.setException(new SshException("Channel closed"));
- super.close(immediately);
- } else {
- openFuture.addListener(new SshFutureListener<OpenFuture>() {
- public void operationComplete(OpenFuture future) {
- if (future.isOpened()) {
- close(immediately);
- } else {
- close(true);
- }
- }
- });
- }
- } else {
- closeFuture.setClosed();
- notifyStateChanged();
- }
- }
- return closeFuture;
- }
-
- @Override
protected Closeable getInnerCloseable() {
return builder()
+ .when(openFuture)
+ .run(new Runnable() {
+ public void run() {
+ // If the channel has not been opened yet,
+ // skip the SSH_MSG_CHANNEL_CLOSE exchange
+ if (openFuture == null) {
+ gracefulFuture.setClosed();
+ }
+ // Close inverted streams after
+ // If the inverted stream is closed before, there's a small time window
+ // in which we have:
+ // ChannePipedInputStream#closed = true
+ // ChannePipedInputStream#writerClosed = false
+ // which leads to an IOException("Pipe closed") when reading.
+ IoUtils.closeQuietly(in, out, err);
+ IoUtils.closeQuietly(invertedIn, invertedOut, invertedErr);
+ }
+ })
.parallel(asyncIn, asyncOut, asyncErr)
- .close(super.getInnerCloseable())
+ .close(new GracefulChannelCloseable())
.build();
}
- @Override
- protected void doCloseImmediately() {
- // Close inverted streams after
- // If the inverted stream is closed before, there's a small time window
- // in which we have:
- // ChannePipedInputStream#closed = true
- // ChannePipedInputStream#writerClosed = false
- // which leads to an IOException("Pipe closed") when reading.
- IoUtils.closeQuietly(in, out, err);
- IoUtils.closeQuietly(invertedIn, invertedOut, invertedErr);
- // TODO: graceful close ?
- CloseableUtils.parallel(asyncIn, asyncOut, asyncErr).close(true);
- super.doCloseImmediately();
- }
-
public int waitFor(int mask, long timeout) {
long t = 0;
synchronized (lock) {
@@ -242,8 +214,8 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
}
}
- protected OpenFuture internalOpen() throws IOException {
- if (closeFuture.isClosed()) {
+ public synchronized OpenFuture open() throws IOException {
+ if (isClosing()) {
throw new SshException("Session has been closed");
}
openFuture = new DefaultOpenFuture(lock);
@@ -292,7 +264,7 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
protected void doWriteData(byte[] data, int off, int len) throws IOException {
// If we're already closing, ignore incoming data
- if (state.get() != CloseableUtils.AbstractCloseable.OPENED) {
+ if (isClosing()) {
return;
}
if (asyncOut != null) {
@@ -308,7 +280,7 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
protected void doWriteExtendedData(byte[] data, int off, int len) throws IOException {
// If we're already closing, ignore incoming data
- if (state.get() != CloseableUtils.AbstractCloseable.OPENED) {
+ if (isClosing()) {
return;
}
if (asyncErr != null) {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java
index 36ffd28..fdc1c99 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java
@@ -62,7 +62,7 @@ public class ChannelDirectTcpip extends AbstractClientChannel {
}
@Override
- protected OpenFuture internalOpen() throws IOException {
+ public OpenFuture open() throws IOException {
if (closeFuture.isClosed()) {
throw new SshException("Session has been closed");
}
@@ -94,10 +94,6 @@ public class ChannelDirectTcpip extends AbstractClientChannel {
}
}
- public OpenFuture open() throws IOException {
- return internalOpen();
- }
-
@Override
protected void doWriteData(byte[] data, int off, int len) throws IOException {
pipe.write(data, off, len);
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
index e02160b..92555df 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
@@ -21,7 +21,6 @@ package org.apache.sshd.client.channel;
import java.io.IOException;
import java.io.InputStream;
-import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.channel.ChannelOutputStream;
import org.apache.sshd.common.channel.ChannelPipedInputStream;
@@ -29,7 +28,6 @@ import org.apache.sshd.common.channel.ChannelPipedOutputStream;
import org.apache.sshd.common.channel.ChannelAsyncInputStream;
import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
import org.apache.sshd.common.future.CloseFuture;
-import org.apache.sshd.common.util.CloseableUtils;
/**
* TODO Add javadoc
@@ -44,10 +42,6 @@ public class ChannelSession extends AbstractClientChannel {
super("session");
}
- public OpenFuture open() throws IOException {
- return internalOpen();
- }
-
@Override
protected void doOpen() throws IOException {
if (streaming == Streaming.Async) {
@@ -117,7 +111,7 @@ public class ChannelSession extends AbstractClientChannel {
}
}
} catch (Exception e) {
- if (state.get() == CloseableUtils.AbstractCloseable.OPENED) {
+ if (!isClosing()) {
log.info("Caught exception", e);
close(false);
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java
index a454874..3dfcced 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java
@@ -92,7 +92,7 @@ public class ClientUserAuthService extends CloseableUtils.AbstractInnerCloseable
} else if (delegateOld != null) {
return delegateOld;
} else {
- return new CloseableUtils.AbstractCloseable() { };
+ return builder().build();
}
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/common/AbstractFactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/AbstractFactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/common/AbstractFactoryManager.java
index e10f20c..515bc9e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/AbstractFactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/AbstractFactoryManager.java
@@ -31,15 +31,13 @@ import org.apache.sshd.common.io.IoServiceFactory;
import org.apache.sshd.common.io.IoServiceFactoryFactory;
import org.apache.sshd.common.session.ConnectionService;
import org.apache.sshd.common.util.CloseableUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* TODO Add javadoc
*
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public abstract class AbstractFactoryManager extends CloseableUtils.AbstractCloseable implements FactoryManager {
+public abstract class AbstractFactoryManager extends CloseableUtils.AbstractInnerCloseable implements FactoryManager {
protected Map<String,String> properties = new HashMap<String,String>();
protected IoServiceFactoryFactory ioServiceFactoryFactory;
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index 4bfb4e6..41a0b4a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -21,7 +21,7 @@ package org.apache.sshd.common.channel;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.sshd.common.Channel;
import org.apache.sshd.common.Closeable;
@@ -48,8 +48,9 @@ public abstract class AbstractChannel extends CloseableUtils.AbstractInnerClosea
public static final int DEFAULT_WINDOW_SIZE = 0x200000;
public static final int DEFAULT_PACKET_SIZE = 0x8000;
- protected static final int CLOSE_SENT = 0x01;
- protected static final int CLOSE_RECV = 0x02;
+ protected static enum GracefulState {
+ Opened, CloseSent, CloseReceived, Closed
+ }
protected final Window localWindow = new Window(this, null, getClass().getName().contains(".client."), true);
protected final Window remoteWindow = new Window(this, null, getClass().getName().contains(".client."), false);
@@ -58,7 +59,7 @@ public abstract class AbstractChannel extends CloseableUtils.AbstractInnerClosea
protected int id;
protected int recipient;
protected volatile boolean eof;
- protected AtomicInteger gracefulState = new AtomicInteger();
+ protected AtomicReference<GracefulState> gracefulState = new AtomicReference<GracefulState>(GracefulState.Opened);
protected final DefaultCloseFuture gracefulFuture = new DefaultCloseFuture(lock);
protected final List<RequestHandler<Channel>> handlers = new ArrayList<RequestHandler<Channel>>();
@@ -140,61 +141,67 @@ public abstract class AbstractChannel extends CloseableUtils.AbstractInnerClosea
public void handleClose() throws IOException {
log.debug("Received SSH_MSG_CHANNEL_CLOSE on channel {}", this);
- if (gracefulState.compareAndSet(0, CLOSE_RECV)) {
+ if (gracefulState.compareAndSet(GracefulState.Opened, GracefulState.CloseReceived)) {
close(false);
- } else if (gracefulState.compareAndSet(CLOSE_SENT, CLOSE_SENT | CLOSE_RECV)) {
+ } else if (gracefulState.compareAndSet(GracefulState.CloseSent, GracefulState.Closed)) {
gracefulFuture.setClosed();
}
}
protected Closeable getInnerCloseable() {
- return new Closeable() {
- public boolean isClosed() {
- return gracefulFuture.isClosed();
- }
- public boolean isClosing() {
- return true;
- }
- public CloseFuture close(boolean immediately) {
- if (!immediately) {
- log.debug("Send SSH_MSG_CHANNEL_CLOSE on channel {}", AbstractChannel.this);
- Buffer buffer = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_CLOSE);
- buffer.putInt(recipient);
- try {
- session.writePacket(buffer).addListener(new SshFutureListener<IoWriteFuture>() {
- public void operationComplete(IoWriteFuture future) {
- if (future.isWritten()) {
- log.debug("Message SSH_MSG_CHANNEL_CLOSE written on channel {}", AbstractChannel.this);
- if (gracefulState.compareAndSet(0, CLOSE_SENT)) {
- // Waiting for CLOSE message to come back from the remote side
- } else if (gracefulState.compareAndSet(CLOSE_RECV, CLOSE_SENT | CLOSE_RECV)) {
- gracefulFuture.setValue(true);
- }
- } else {
- close(true);
+ return new GracefulChannelCloseable();
+ }
+
+ public class GracefulChannelCloseable implements Closeable {
+
+ protected volatile boolean closing;
+
+ public boolean isClosing() {
+ return closing;
+ }
+ public boolean isClosed() {
+ return gracefulFuture.isClosed();
+ }
+ public CloseFuture close(boolean immediately) {
+ closing = true;
+ if (immediately) {
+ gracefulFuture.setClosed();
+ } else if (!gracefulFuture.isClosed()) {
+ log.debug("Send SSH_MSG_CHANNEL_CLOSE on channel {}", AbstractChannel.this);
+ Buffer buffer = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_CLOSE);
+ buffer.putInt(recipient);
+ try {
+ session.writePacket(buffer).addListener(new SshFutureListener<IoWriteFuture>() {
+ public void operationComplete(IoWriteFuture future) {
+ if (future.isWritten()) {
+ log.debug("Message SSH_MSG_CHANNEL_CLOSE written on channel {}", AbstractChannel.this);
+ if (gracefulState.compareAndSet(GracefulState.Opened, GracefulState.CloseSent)) {
+ // Waiting for CLOSE message to come back from the remote side
+ } else if (gracefulState.compareAndSet(GracefulState.CloseReceived, GracefulState.Closed)) {
+ gracefulFuture.setClosed();
}
+ } else {
+ AbstractChannel.this.close(true);
}
- });
- } catch (IOException e) {
- log.debug("Exception caught while writing SSH_MSG_CHANNEL_CLOSE packet on channel " + AbstractChannel.this, e);
- close(true);
- }
- } else {
- gracefulFuture.setClosed();
+ }
+ });
+ } catch (IOException e) {
+ log.debug("Exception caught while writing SSH_MSG_CHANNEL_CLOSE packet on channel " + AbstractChannel.this, e);
+ AbstractChannel.this.close(true);
}
- return gracefulFuture;
}
- };
+ return gracefulFuture;
+ }
}
@Override
protected void doCloseImmediately() {
- super.doCloseImmediately();
service.unregisterChannel(AbstractChannel.this);
+ super.doCloseImmediately();
}
protected void writePacket(Buffer buffer) throws IOException {
- if (state.get() == OPENED) {
+ if (!isClosing()) {
session.writePacket(buffer);
} else {
log.debug("Discarding output packet because channel is being closed");
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
index 71155f1..b7cbbb5 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
@@ -21,24 +21,18 @@ package org.apache.sshd.common.channel;
import java.io.IOException;
import org.apache.sshd.common.Channel;
-import org.apache.sshd.common.Closeable;
import org.apache.sshd.common.RuntimeSshException;
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.future.CloseFuture;
-import org.apache.sshd.common.future.DefaultCloseFuture;
import org.apache.sshd.common.future.DefaultSshFuture;
-import org.apache.sshd.common.future.SshFuture;
-import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.io.IoInputStream;
import org.apache.sshd.common.io.IoReadFuture;
import org.apache.sshd.common.io.ReadPendingException;
import org.apache.sshd.common.util.Buffer;
import org.apache.sshd.common.util.CloseableUtils;
import org.apache.sshd.common.util.Readable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class ChannelAsyncInputStream extends CloseableUtils.AbstractInnerCloseable implements IoInputStream {
+public class ChannelAsyncInputStream extends CloseableUtils.AbstractCloseable implements IoInputStream {
private final Channel channel;
private final Buffer buffer = new Buffer();
@@ -72,16 +66,20 @@ public class ChannelAsyncInputStream extends CloseableUtils.AbstractInnerCloseab
}
@Override
- protected Closeable getInnerCloseable() {
+ protected void preClose() {
synchronized (buffer) {
if (buffer.available() == 0) {
if (pending != null) {
pending.setValue(new SshException("Closed"));
}
}
- return builder()
- .when(pending)
- .build();
+ }
+ }
+
+ @Override
+ protected CloseFuture doCloseGracefully() {
+ synchronized (buffer) {
+ return builder().when(pending).build().close(false);
}
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
index 3455c15..6aabdeb 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
@@ -19,24 +19,21 @@
package org.apache.sshd.common.channel;
import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.sshd.common.Channel;
-import org.apache.sshd.common.Closeable;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.future.DefaultSshFuture;
+import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.io.IoOutputStream;
import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.io.WritePendingException;
-import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.util.Buffer;
import org.apache.sshd.common.util.CloseableUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class ChannelAsyncOutputStream extends CloseableUtils.AbstractInnerCloseable implements IoOutputStream {
+public class ChannelAsyncOutputStream extends CloseableUtils.AbstractCloseable implements IoOutputStream {
private final Channel channel;
private final byte cmd;
@@ -65,10 +62,8 @@ public class ChannelAsyncOutputStream extends CloseableUtils.AbstractInnerClosea
}
@Override
- protected Closeable getInnerCloseable() {
- return builder()
- .when(pendingWrite.get())
- .build();
+ protected CloseFuture doCloseGracefully() {
+ return builder().when(pendingWrite.get()).build().close(false);
}
protected synchronized void doWriteIfPossible(boolean resume) {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
index 6d664d7..d3256ad 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
@@ -35,8 +35,6 @@ import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.SshdSocketAddress;
import org.apache.sshd.common.TcpipForwarder;
-import org.apache.sshd.common.future.CloseFuture;
-import org.apache.sshd.common.future.DefaultCloseFuture;
import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.io.IoAcceptor;
import org.apache.sshd.common.io.IoHandler;
@@ -45,8 +43,6 @@ import org.apache.sshd.common.session.ConnectionService;
import org.apache.sshd.common.util.Buffer;
import org.apache.sshd.common.util.CloseableUtils;
import org.apache.sshd.common.util.Readable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* TODO Add javadoc
@@ -244,4 +240,8 @@ public class DefaultTcpipForwarder extends CloseableUtils.AbstractInnerCloseable
}
}
+ public String toString() {
+ return getClass().getSimpleName() + "[" + session + "]";
+ }
+
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
index e449c8d..0d95acf 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
@@ -104,7 +104,7 @@ public class TcpipClientChannel extends AbstractClientChannel {
@Override
protected Closeable getInnerCloseable() {
- return CloseableUtils.sequential(serverSession, super.getInnerCloseable());
+ return builder().sequential(serverSession, super.getInnerCloseable()).build();
}
protected synchronized void doWriteData(byte[] data, int off, int len) throws IOException {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
index 9bbdea3..8ed89d4 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
@@ -112,7 +112,7 @@ public class TcpipServerChannel extends AbstractServerChannel {
out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
IoHandler handler = new IoHandler() {
public void messageReceived(IoSession session, Readable message) throws Exception {
- if (state.get() != OPENED) {
+ if (isClosing()) {
log.debug("Ignoring write to channel {} in CLOSING state", id);
} else {
Buffer buffer = new Buffer();
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
index 44019ec..3c5a2b7 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
@@ -19,17 +19,13 @@
package org.apache.sshd.common.io.nio2;
import java.nio.channels.AsynchronousChannelGroup;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.sshd.common.Closeable;
import org.apache.sshd.common.FactoryManager;
-import org.apache.sshd.common.future.CloseFuture;
-import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.io.IoHandler;
import org.apache.sshd.common.io.IoService;
import org.apache.sshd.common.io.IoSession;
@@ -66,8 +62,7 @@ public abstract class Nio2Service extends CloseableUtils.AbstractInnerCloseable
@Override
protected Closeable getInnerCloseable() {
- List<IoSession> s = new ArrayList<IoSession>(sessions.values());
- return CloseableUtils.parallel(s);
+ return builder().parallel(sessions.values()).build();
}
public Map<Long, IoSession> getManagedSessions() {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
index e8a8c4f..8fb1f3c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.future.DefaultSshFuture;
-import org.apache.sshd.common.future.SshFuture;
import org.apache.sshd.common.io.IoHandler;
import org.apache.sshd.common.io.IoService;
import org.apache.sshd.common.io.IoSession;
@@ -105,7 +104,7 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
log.debug("Writing {} bytes", buffer.available());
ByteBuffer buf = ByteBuffer.wrap(buffer.array(), buffer.rpos(), buffer.available());
final DefaultIoWriteFuture future = new DefaultIoWriteFuture(null, buf);
- if (state.get() != OPENED) {
+ if (isClosing()) {
Throwable exc = new ClosedChannelException();
future.setException(exc);
exceptionCaught(exc);
@@ -134,9 +133,7 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
@Override
protected CloseFuture doCloseGracefully() {
- synchronized (writes) {
- return CloseableUtils.parallel(writes.toArray(new SshFuture[writes.size()]));
- }
+ return builder().when(writes).build().close(false);
}
@Override
@@ -231,10 +228,7 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
finishWrite();
}
private void finishWrite() {
- synchronized (writes) {
- writes.remove(future);
- writes.notifyAll();
- }
+ writes.remove(future);
currentWrite.compareAndSet(future, null);
startWriting();
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionService.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionService.java
index 72c8c86..6bd467d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionService.java
@@ -19,7 +19,6 @@
package org.apache.sshd.common.session;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -30,23 +29,31 @@ import org.apache.sshd.client.channel.AbstractClientChannel;
import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.common.Channel;
import org.apache.sshd.common.Closeable;
-import org.apache.sshd.common.RequestHandler;
import org.apache.sshd.common.NamedFactory;
+import org.apache.sshd.common.RequestHandler;
import org.apache.sshd.common.Session;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.TcpipForwarder;
-import org.apache.sshd.common.future.CloseFuture;
-import org.apache.sshd.common.future.DefaultCloseFuture;
import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.util.Buffer;
import org.apache.sshd.common.util.CloseableUtils;
import org.apache.sshd.server.channel.OpenChannelException;
import org.apache.sshd.server.x11.X11ForwardSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static org.apache.sshd.common.SshConstants.*;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_CHANNEL_CLOSE;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_CHANNEL_DATA;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_CHANNEL_EOF;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_CHANNEL_FAILURE;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_CHANNEL_OPEN;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_CHANNEL_OPEN_CONFIRMATION;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_CHANNEL_OPEN_FAILURE;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_CHANNEL_REQUEST;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_CHANNEL_WINDOW_ADJUST;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_GLOBAL_REQUEST;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_REQUEST_FAILURE;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_REQUEST_SUCCESS;
/**
* Base implementation of ConnectionService.
@@ -88,10 +95,10 @@ public abstract class AbstractConnectionService extends CloseableUtils.AbstractI
@Override
protected Closeable getInnerCloseable() {
- return CloseableUtils.sequential(
- tcpipForwarder, agentForward, x11Forward,
- CloseableUtils.parallel(channels.values())
- );
+ return builder()
+ .sequential(tcpipForwarder, agentForward, x11Forward)
+ .parallel(channels.values())
+ .build();
}
protected int getNextChannelId() {
@@ -106,7 +113,7 @@ public abstract class AbstractConnectionService extends CloseableUtils.AbstractI
* @throws IOException
*/
public int registerChannel(Channel channel) throws IOException {
- if (state.get() != OPENED) {
+ if (isClosing()) {
throw new IllegalStateException("Session is being closed");
}
int channelId = getNextChannelId();
@@ -298,7 +305,7 @@ public abstract class AbstractConnectionService extends CloseableUtils.AbstractI
log.debug("Received SSH_MSG_CHANNEL_OPEN {}", type);
- if (state.get() != OPENED) {
+ if (isClosing()) {
Buffer buf = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_OPEN_FAILURE);
buf.putInt(id);
buf.putInt(SshConstants.SSH_OPEN_CONNECT_FAILED);
@@ -416,6 +423,7 @@ public abstract class AbstractConnectionService extends CloseableUtils.AbstractI
}
public String toString() {
- return getClass().getSimpleName();
+ return getClass().getSimpleName() + "[" + session + "]";
}
+
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
index ac859d7..d301e0a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
@@ -20,7 +20,6 @@ package org.apache.sshd.common.session;
import java.io.IOException;
import java.io.InterruptedIOException;
-import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -54,7 +53,14 @@ import org.apache.sshd.common.util.BufferUtils;
import org.apache.sshd.common.util.CloseableUtils;
import org.apache.sshd.common.util.Readable;
-import static org.apache.sshd.common.SshConstants.*;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_DEBUG;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_DISCONNECT;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_IGNORE;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_KEXINIT;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_NEWKEYS;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_SERVICE_ACCEPT;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_SERVICE_REQUEST;
+import static org.apache.sshd.common.SshConstants.SSH_MSG_UNIMPLEMENTED;
/**
* The AbstractSession handles all the basic SSH protocol such as key exchange, authentication,
@@ -417,7 +423,7 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
public void exceptionCaught(Throwable t) {
// Ignore exceptions that happen while closing
synchronized (lock) {
- if (state.get() != OPENED) {
+ if (isClosing()) {
return;
}
}
@@ -438,16 +444,17 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
@Override
protected Closeable getInnerCloseable() {
- return CloseableUtils.sequential(lock,
- CloseableUtils.parallel(lock, getServices()), ioSession);
+ return builder()
+ .parallel(getServices())
+ .close(ioSession)
+ .build();
}
@Override
protected void doCloseImmediately() {
super.doCloseImmediately();
// Fire 'close' event
- final ArrayList<SessionListener> l = new ArrayList<SessionListener>(listeners);
- for (SessionListener sl : l) {
+ for (SessionListener sl : listeners) {
sl.sessionClosed(this);
}
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java b/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java
index 5d1d97f..94acdf5 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java
@@ -18,11 +18,14 @@
*/
package org.apache.sshd.common.util;
+import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.sshd.common.Closeable;
import org.apache.sshd.common.SshException;
@@ -47,346 +50,273 @@ public class CloseableUtils {
return future;
}
- public static Closeable parallel(final Collection<? extends Closeable> closeables) {
- return parallel(null, closeables);
- }
+ public static class Builder {
- public static Closeable parallel(final Object lock, final Collection<? extends Closeable> closeables) {
- return parallel(lock, closeables.toArray(new Closeable[closeables.size()]));
- }
+ private final Object lock;
+ private final List<Closeable> closeables = new ArrayList<Closeable>();
- public static Closeable parallel(final Closeable... closeables) {
- return parallel(null, closeables);
- }
+ private Builder(Object lock) {
+ this.lock = lock;
+ }
- public static Closeable parallel(final Object lock, final Closeable... closeables) {
- int nbNonNulls = 0;
- for (Closeable closeable : closeables) {
- if (closeable != null) {
- nbNonNulls++;
+ public Builder run(final Runnable r) {
+ return close(new SimpleCloseable(lock) {
+ @Override
+ protected void doClose(boolean immediately) {
+ try {
+ r.run();
+ } finally {
+ super.doClose(immediately);
+ }
+ }
+ });
+ }
+
+ public <T extends SshFuture> Builder when(SshFuture<T> future) {
+ if (future != null) {
+ when(Collections.singleton(future));
}
+ return this;
}
- if (nbNonNulls == 0) {
- return new Closeable() {
- final CloseFuture future = new DefaultCloseFuture(lock);
- public boolean isClosed() {
- return future.isClosed();
- }
- public boolean isClosing() {
- return isClosed();
- }
- public CloseFuture close(boolean immediately) {
- future.setClosed();
- return future;
- }
- };
- } else if (nbNonNulls == 1) {
+
+ public <T extends SshFuture> Builder when(SshFuture<T>... futures) {
+ return when(Arrays.asList(futures));
+ }
+
+ public <T extends SshFuture> Builder when(final Iterable<? extends SshFuture<T>> futures) {
+ return close(new FuturesCloseable<T>(lock, futures));
+ }
+
+ public Builder sequential(Closeable... closeables) {
for (Closeable closeable : closeables) {
- if (closeable != null) {
- return closeable;
- }
+ close(closeable);
}
- throw new IllegalStateException();
- } else {
- return new Closeable() {
- final CloseFuture future = new DefaultCloseFuture(lock);
- final AtomicBoolean closing = new AtomicBoolean();
- public boolean isClosed() {
- return future.isClosed();
- }
- public boolean isClosing() {
- return closing.get();
- }
- public CloseFuture close(boolean immediately) {
- final AtomicInteger count = new AtomicInteger(closeables.length);
- if (closing.compareAndSet(false, true)) {
- SshFutureListener<CloseFuture> listener = new SshFutureListener<CloseFuture>() {
- public void operationComplete(CloseFuture f) {
- if (count.decrementAndGet() == 0) {
- future.setClosed();
- }
- }
- };
- for (Closeable c : closeables) {
- if (c != null) {
- c.close(immediately).addListener(listener);
- } else {
- listener.operationComplete(null);
- }
- }
- }
- return future;
- }
- };
+ return this;
}
- }
- public static Closeable sequential(final Collection<? extends Closeable> closeables) {
- return sequential(null, closeables);
- }
+ public Builder sequential(Iterable<Closeable> closeables) {
+ return close(new SequentialCloseable(lock, closeables));
+ }
- public static Closeable sequential(final Object lock, final Collection<? extends Closeable> closeables) {
- return sequential(lock, closeables.toArray(new Closeable[closeables.size()]));
- }
+ public Builder parallel(Closeable... closeables) {
+ if (closeables.length == 1) {
+ close(closeables[0]);
+ } else if (closeables.length > 0) {
+ parallel(Arrays.asList(closeables));
+ }
+ return this;
+ }
- public static Closeable sequential(final Closeable... closeables) {
- return sequential(null, closeables);
- }
+ public Builder parallel(Iterable<? extends Closeable> closeables) {
+ return close(new ParallelCloseable(lock, closeables));
+ }
- public static Closeable sequential(final Object lock, final Closeable... closeables) {
- int nbNonNulls = 0;
- for (Closeable closeable : closeables) {
- if (closeable != null) {
- nbNonNulls++;
+ public Builder close(Closeable c) {
+ if (c != null) {
+ closeables.add(c);
}
+ return this;
}
- if (nbNonNulls == 0) {
- return new Closeable() {
- final CloseFuture future = new DefaultCloseFuture(lock);
- public boolean isClosed() {
- return future.isClosed();
- }
- public boolean isClosing() {
- return isClosed();
- }
- public CloseFuture close(boolean immediately) {
- future.setClosed();
- return future;
- }
- };
- } else if (nbNonNulls == 1) {
- for (Closeable closeable : closeables) {
- if (closeable != null) {
- return closeable;
- }
+
+ public Closeable build() {
+ if (closeables.isEmpty()) {
+ return new SimpleCloseable(lock);
+ } else if (closeables.size() == 1) {
+ return closeables.get(0);
+ } else {
+ return new SequentialCloseable(lock, closeables);
}
- throw new IllegalStateException();
- } else {
- return new Closeable() {
- final DefaultCloseFuture future = new DefaultCloseFuture(lock);
- final AtomicBoolean closing = new AtomicBoolean();
- public boolean isClosed() {
- return future.isClosed();
- }
- public boolean isClosing() {
- return closing.get();
- }
- public CloseFuture close(final boolean immediately) {
- if (closing.compareAndSet(false, true)) {
- final Iterator<Closeable> iterator = Arrays.asList(closeables).iterator();
- SshFutureListener<CloseFuture> listener = new SshFutureListener<CloseFuture>() {
- public void operationComplete(CloseFuture previousFuture) {
- while (iterator.hasNext()) {
- Closeable c = iterator.next();
- if (c != null) {
- CloseFuture nextFuture = c.close(immediately);
- nextFuture.addListener(this);
- return;
- }
- }
- if (!iterator.hasNext()) {
- future.setClosed();
- }
- }
- };
- listener.operationComplete(null);
- }
- return future;
- }
- };
}
+
}
- public static <T extends SshFuture> CloseFuture parallel(final SshFuture<T>... futures) {
- return parallel(null, futures);
+ private static class SimpleCloseable implements Closeable {
+
+ protected final DefaultCloseFuture future;
+ protected final AtomicBoolean closing;
+
+ public SimpleCloseable(Object lock) {
+ future = new DefaultCloseFuture(lock);
+ closing = new AtomicBoolean();
+ }
+
+ public boolean isClosed() {
+ return future.isClosed();
+ }
+ public boolean isClosing() {
+ return closing.get();
+ }
+ public CloseFuture close(boolean immediately) {
+ if (closing.compareAndSet(false, true)) {
+ doClose(immediately);
+ }
+ return future;
+ }
+
+ protected void doClose(boolean immediately) {
+ future.setClosed();
+ }
}
- public static <T extends SshFuture> CloseFuture parallel(Object lock, final SshFuture<T>... futures) {
- final CloseFuture future = new DefaultCloseFuture(lock);
- if (futures.length > 0) {
- final AtomicInteger count = new AtomicInteger(futures.length);
- SshFutureListener<T> listener = new SshFutureListener<T>() {
- public void operationComplete(T f) {
+ private static class ParallelCloseable extends SimpleCloseable {
+
+ final Iterable<? extends Closeable> closeables;
+
+ private ParallelCloseable(Object lock, Iterable<? extends Closeable> closeables) {
+ super(lock);
+ this.closeables = closeables;
+ }
+
+ protected void doClose(final boolean immediately) {
+ final AtomicInteger count = new AtomicInteger(1);
+ SshFutureListener<CloseFuture> listener = new SshFutureListener<CloseFuture>() {
+ public void operationComplete(CloseFuture f) {
if (count.decrementAndGet() == 0) {
future.setClosed();
}
}
};
- for (SshFuture<T> f : futures) {
- if (f != null) {
- f.addListener(listener);
- } else {
- listener.operationComplete(null);
+ for (Closeable c : closeables) {
+ if (c != null) {
+ count.incrementAndGet();
+ c.close(immediately).addListener(listener);
}
}
- } else {
- future.setClosed();
+ listener.operationComplete(null);
}
- return future;
}
- public static Builder builder() {
- return new Builder();
- }
+ private static class SequentialCloseable extends SimpleCloseable {
- public static Builder builder(Logger logger, Object lock) {
- return new Builder();
- }
+ private final Iterable<? extends Closeable> closeables;
- public static class Builder {
- private final Object lock;
- private Closeable closeable = null;
- public Builder() {
- this(null);
+ public SequentialCloseable(Object lock, Iterable<? extends Closeable> closeables) {
+ super(lock);
+ this.closeables = closeables;
}
- public Builder(Object lock) {
- this.lock = lock;
- }
- public <T extends SshFuture> Builder when(final SshFuture<T>... futures) {
- return close(new Closeable() {
- private volatile boolean closing;
- private volatile boolean closed;
- public CloseFuture close(boolean immediately) {
- closing = true;
- if (immediately) {
- for (SshFuture<?> future : futures) {
- if (future instanceof DefaultSshFuture) {
- ((DefaultSshFuture<?>) future).setValue(new SshException("Closed"));
- }
+
+ protected void doClose(final boolean immediately) {
+ final Iterator<? extends Closeable> iterator = closeables.iterator();
+ SshFutureListener<CloseFuture> listener = new SshFutureListener<CloseFuture>() {
+ public void operationComplete(CloseFuture previousFuture) {
+ while (iterator.hasNext()) {
+ Closeable c = iterator.next();
+ if (c != null) {
+ CloseFuture nextFuture = c.close(immediately);
+ nextFuture.addListener(this);
+ return;
}
- closed = true;
- return closed();
- } else {
- return CloseableUtils.parallel(lock, futures).addListener(new SshFutureListener<CloseFuture>() {
- public void operationComplete(CloseFuture future) {
- closed = true;
- }
- });
+ }
+ if (!iterator.hasNext()) {
+ future.setClosed();
}
}
+ };
+ listener.operationComplete(null);
+ }
+ }
- public boolean isClosed() {
- return closed;
- }
+ private static class FuturesCloseable<T extends SshFuture> extends CloseableUtils.SimpleCloseable {
- public boolean isClosing() {
- return closing || closed;
- }
- });
- }
- public <T extends SshFuture> Builder when(Collection<? extends SshFuture<T>> futures) {
- return when(futures.toArray(new SshFuture[futures.size()]));
- }
- public Builder sequential(Closeable... closeables) {
- return close(CloseableUtils.sequential(lock, closeables));
- }
- public Builder sequential(Collection<Closeable> closeables) {
- return close(CloseableUtils.sequential(lock, closeables));
- }
- public Builder parallel(Closeable... closeables) {
- return close(CloseableUtils.parallel(lock, closeables));
- }
- public Builder parallel(Collection<? extends Closeable> closeables) {
- return close(CloseableUtils.parallel(lock, closeables));
- }
- public Builder close(Closeable c) {
- if (closeable == null) {
- closeable = c;
- } else {
- closeable = CloseableUtils.sequential(lock, closeable, c);
- }
- return this;
+ private final Iterable<? extends SshFuture<T>> futures;
+
+ public FuturesCloseable(Object lock, Iterable<? extends SshFuture<T>> futures) {
+ super(lock);
+ this.futures = futures;
}
- public Closeable build() {
- if (closeable == null) {
- closeable = new Closeable() {
- private volatile boolean closed;
- public CloseFuture close(boolean immediately) {
- closed = true;
- return closed();
- }
- public boolean isClosed() {
- return closed;
+
+ protected void doClose(boolean immediately) {
+ if (immediately) {
+ for (SshFuture<?> f : futures) {
+ if (f instanceof DefaultSshFuture) {
+ ((DefaultSshFuture<?>) f).setValue(new SshException("Closed"));
}
- public boolean isClosing() {
- return closed;
+ }
+ future.setClosed();
+ } else {
+ final AtomicInteger count = new AtomicInteger(1);
+ SshFutureListener<T> listener = new SshFutureListener<T>() {
+ public void operationComplete(T f) {
+ if (count.decrementAndGet() == 0) {
+ future.setClosed();
+ }
}
};
+ for (SshFuture<T> f : futures) {
+ if (f != null) {
+ count.incrementAndGet();
+ f.addListener(listener);
+ }
+ }
+ listener.operationComplete(null);
}
- return closeable;
}
}
public static abstract class AbstractCloseable implements Closeable {
- protected static final int OPENED = 0;
- protected static final int GRACEFUL = 1;
- protected static final int IMMEDIATE = 2;
- protected static final int CLOSED = 3;
-
+ protected enum State {
+ Opened, Graceful, Immediate, Closed
+ }
/** Our logger */
protected final Logger log = LoggerFactory.getLogger(getClass());
/** Lock object for this session state */
- protected final Object lock;
+ protected final Object lock = new Object();
/** State of this object */
- protected final AtomicInteger state = new AtomicInteger(OPENED);
+ protected final AtomicReference<State> state = new AtomicReference<State>(State.Opened);
/** A future that will be set 'closed' when the object is actually closed */
- protected final CloseFuture closeFuture;
-
- protected AbstractCloseable() {
- this(new Object());
- }
-
- protected AbstractCloseable(Object lock) {
- this.lock = lock;
- this.closeFuture = new DefaultCloseFuture(lock);
- }
+ protected final CloseFuture closeFuture = new DefaultCloseFuture(lock);
public CloseFuture close(boolean immediately) {
if (immediately) {
- if (state.compareAndSet(OPENED, IMMEDIATE) || state.compareAndSet(GRACEFUL, IMMEDIATE)) {
+ if (state.compareAndSet(State.Opened, State.Immediate)
+ || state.compareAndSet(State.Graceful, State.Immediate)) {
log.debug("Closing {} immediately", this);
preClose();
doCloseImmediately();
log.debug("{} closed", this);
} else {
- log.debug("{} is already {}", this, state.get() == CLOSED ? "closed" : "closing");
+ log.debug("{} is already {}", this, state.get() == State.Closed ? "closed" : "closing");
}
} else {
- if (state.compareAndSet(OPENED, GRACEFUL)) {
+ if (state.compareAndSet(State.Opened, State.Graceful)) {
log.debug("Closing {} gracefully", this);
preClose();
SshFuture<CloseFuture> grace = doCloseGracefully();
if (grace != null) {
grace.addListener(new SshFutureListener<CloseFuture>() {
public void operationComplete(CloseFuture future) {
- if (state.compareAndSet(GRACEFUL, IMMEDIATE)) {
+ if (state.compareAndSet(State.Graceful, State.Immediate)) {
doCloseImmediately();
log.debug("{} closed", AbstractCloseable.this);
}
}
});
} else {
- if (state.compareAndSet(GRACEFUL, IMMEDIATE)) {
+ if (state.compareAndSet(State.Graceful, State.Immediate)) {
doCloseImmediately();
log.debug("{} closed", this);
}
}
} else {
- log.debug("{} is already {}", this, state.get() == CLOSED ? "closed" : "closing");
+ log.debug("{} is already {}", this, state.get() == State.Closed ? "closed" : "closing");
}
}
return closeFuture;
}
public boolean isClosed() {
- return state.get() == CLOSED;
+ return state.get() == State.Closed;
}
public boolean isClosing() {
- return state.get() != OPENED;
+ return state.get() != State.Opened;
}
+ /**
+ * preClose is guaranteed to be called before doCloseGracefully or doCloseImmediately.
+ * When preClose() is called, isClosing() == true
+ */
protected void preClose() {
}
@@ -394,9 +324,16 @@ public class CloseableUtils {
return null;
}
+ /**
+ * doCloseImmediately is called once and only once
+ * with state == Immediate
+ *
+ * Overriding methods should always call the base implementation.
+ * It may be called concurrently while preClose() or doCloseGracefully is executing
+ */
protected void doCloseImmediately() {
closeFuture.setClosed();
- state.set(CLOSED);
+ state.set(State.Closed);
}
protected Builder builder() {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
index fa8abba..b2ad04a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
@@ -45,7 +45,6 @@ import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.future.DefaultCloseFuture;
import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.util.Buffer;
-import org.apache.sshd.common.util.CloseableUtils;
import org.apache.sshd.common.util.IoUtils;
import org.apache.sshd.common.util.LoggingFilterOutputStream;
import org.apache.sshd.server.AsyncCommand;
@@ -192,49 +191,51 @@ public class ChannelSession extends AbstractServerChannel {
@Override
protected Closeable getInnerCloseable() {
- return CloseableUtils.sequential(getCommandCloseable(), super.getInnerCloseable());
+ return builder()
+ .sequential(new CommandCloseable(), new GracefulChannelCloseable())
+ .parallel(asyncOut, asyncErr)
+ .build();
}
- protected Closeable getCommandCloseable() {
- return new Closeable() {
- public boolean isClosed() {
- return commandExitFuture.isClosed();
- }
- public boolean isClosing() {
- return isClosed();
- }
- public CloseFuture close(boolean immediately) {
- if (immediately) {
- commandExitFuture.setClosed();
- } else if (!commandExitFuture.isClosed()) {
- IoUtils.closeQuietly(receiver);
- final TimerTask task = new TimerTask() {
- @Override
- public void run() {
- commandExitFuture.setClosed();
- }
- };
- long timeout = DEFAULT_COMMAND_EXIT_TIMEOUT;
- String val = getSession().getFactoryManager().getProperties().get(ServerFactoryManager.COMMAND_EXIT_TIMEOUT);
- if (val != null) {
- try {
- timeout = Long.parseLong(val);
- } catch (NumberFormatException e) {
- // Ignore
- }
+ public class CommandCloseable implements Closeable {
+ public boolean isClosed() {
+ return commandExitFuture.isClosed();
+ }
+ public boolean isClosing() {
+ return isClosed();
+ }
+ public CloseFuture close(boolean immediately) {
+ if (immediately || command == null) {
+ commandExitFuture.setClosed();
+ } else if (!commandExitFuture.isClosed()) {
+ IoUtils.closeQuietly(receiver);
+ final TimerTask task = new TimerTask() {
+ @Override
+ public void run() {
+ commandExitFuture.setClosed();
+ }
+ };
+ long timeout = DEFAULT_COMMAND_EXIT_TIMEOUT;
+ String val = getSession().getFactoryManager().getProperties().get(ServerFactoryManager.COMMAND_EXIT_TIMEOUT);
+ if (val != null) {
+ try {
+ timeout = Long.parseLong(val);
+ } catch (NumberFormatException e) {
+ // Ignore
}
- log.debug("Wait {} ms for shell to exit cleanly", timeout);
- getSession().getFactoryManager().getScheduledExecutorService().schedule(task, timeout, TimeUnit.MILLISECONDS);
- commandExitFuture.addListener(new SshFutureListener<CloseFuture>() {
- public void operationComplete(CloseFuture future) {
- task.cancel();
- }
- });
}
- return commandExitFuture;
+ log.debug("Wait {} ms for shell to exit cleanly", timeout);
+ getSession().getFactoryManager().getScheduledExecutorService().schedule(task, timeout, TimeUnit.MILLISECONDS);
+ commandExitFuture.addListener(new SshFutureListener<CloseFuture>() {
+ public void operationComplete(CloseFuture future) {
+ task.cancel();
+ }
+ });
}
- };
+ return commandExitFuture;
+ }
}
+
@Override
protected void doCloseImmediately() {
if (command != null) {
@@ -243,8 +244,6 @@ public class ChannelSession extends AbstractServerChannel {
}
remoteWindow.notifyClosed();
IoUtils.closeQuietly(out, err, receiver);
- // TODO: graceful close ?
- CloseableUtils.parallel(asyncOut, asyncErr).close(true);
super.doCloseImmediately();
}
@@ -256,7 +255,7 @@ public class ChannelSession extends AbstractServerChannel {
protected void doWriteData(byte[] data, int off, int len) throws IOException {
// If we're already closing, ignore incoming data
- if (state.get() != OPENED) {
+ if (isClosing()) {
return;
}
if (receiver != null) {
@@ -393,6 +392,10 @@ public class ChannelSession extends AbstractServerChannel {
}
protected boolean handleShell(Buffer buffer) throws IOException {
+ // If we're already closing, ignore incoming data
+ if (isClosing()) {
+ return false;
+ }
if (((ServerSession) session).getFactoryManager().getShellFactory() == null) {
return false;
}
@@ -403,6 +406,10 @@ public class ChannelSession extends AbstractServerChannel {
}
protected boolean handleExec(Buffer buffer) throws IOException {
+ // If we're already closing, ignore incoming data
+ if (isClosing()) {
+ return false;
+ }
String commandLine = buffer.getString();
if (((ServerSession) session).getFactoryManager().getCommandFactory() == null) {
log.warn("Unsupported command: {}", commandLine);
@@ -561,7 +568,7 @@ public class ChannelSession extends AbstractServerChannel {
}
protected void closeShell(int exitValue) throws IOException {
- if (state.get() == OPENED) {
+ if (!isClosing()) {
sendEof();
sendExitStatus(exitValue);
commandExitFuture.setClosed();
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java
index 169e434..39a64a7 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java
@@ -88,7 +88,7 @@ public class ServerSession extends AbstractSession {
* @throws IOException
*/
protected void checkForTimeouts() throws IOException {
- if (state.get() == OPENED) {
+ if (!isClosing()) {
long now = System.currentTimeMillis();
if (!authed && now > authTimeoutTimestamp) {
disconnect(SshConstants.SSH2_DISCONNECT_PROTOCOL_ERROR, "Session has timed out waiting for authentication after " + authTimeoutMs + " ms.");
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java b/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
index 2b878b8..ba8eb36 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
@@ -29,7 +29,6 @@ import org.apache.sshd.common.Closeable;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.channel.ChannelOutputStream;
-import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.io.IoAcceptor;
import org.apache.sshd.common.io.IoHandler;
import org.apache.sshd.common.io.IoSession;
@@ -37,8 +36,6 @@ import org.apache.sshd.common.session.ConnectionService;
import org.apache.sshd.common.util.Buffer;
import org.apache.sshd.common.util.CloseableUtils;
import org.apache.sshd.common.util.Readable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
@@ -69,20 +66,7 @@ public class X11ForwardSupport extends CloseableUtils.AbstractInnerCloseable imp
@Override
protected Closeable getInnerCloseable() {
- return acceptor != null ? acceptor : new CloseableUtils.AbstractCloseable() { };
- }
-
- public CloseFuture close(boolean immediately) {
- IoAcceptor a;
- synchronized (this) {
- a = acceptor;
- acceptor = null;
- }
- if (a != null) {
- return a.close(immediately);
- } else {
- return CloseableUtils.closed();
- }
+ return builder().close(acceptor).build();
}
public synchronized String createDisplay(boolean singleConnection,
@@ -212,7 +196,7 @@ public class X11ForwardSupport extends CloseableUtils.AbstractInnerCloseable imp
@Override
protected Closeable getInnerCloseable() {
- return CloseableUtils.sequential(serverSession, super.getInnerCloseable());
+ return builder().sequential(serverSession, super.getInnerCloseable()).build();
}
protected synchronized void doWriteData(byte[] data, int off, int len) throws IOException {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d8386644/sshd-core/src/test/java/org/apache/sshd/CipherTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/CipherTest.java b/sshd-core/src/test/java/org/apache/sshd/CipherTest.java
index 28e3b68..7d2334f 100644
--- a/sshd-core/src/test/java/org/apache/sshd/CipherTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/CipherTest.java
@@ -130,7 +130,7 @@ public class CipherTest extends BaseTest {
@After
public void tearDown() throws Exception {
if (sshd != null) {
- sshd.stop();
+ sshd.stop(true);
}
}