You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2015/05/14 15:58:48 UTC
mina-sshd git commit: [SSHD-456] Nio2Acceptor does not unmanage the
session upon exception in SessionListener
Repository: mina-sshd
Updated Branches:
refs/heads/master fbb784347 -> 06e9af740
[SSHD-456] Nio2Acceptor does not unmanage the session upon exception in SessionListener
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/06e9af74
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/06e9af74
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/06e9af74
Branch: refs/heads/master
Commit: 06e9af740100ec0ca276bdbbcc7c02a049fb406b
Parents: fbb7843
Author: Lyor Goldstein <lg...@vmware.com>
Authored: Thu May 14 16:58:40 2015 +0300
Committer: Lyor Goldstein <lg...@vmware.com>
Committed: Thu May 14 16:58:40 2015 +0300
----------------------------------------------------------------------
.../main/java/org/apache/sshd/SshClient.java | 11 +
.../org/apache/sshd/common/io/IoConnector.java | 1 -
.../sshd/common/io/nio2/Nio2Acceptor.java | 40 ++-
.../common/io/nio2/Nio2CompletionHandler.java | 4 +
.../sshd/common/io/nio2/Nio2Connector.java | 10 +-
.../apache/sshd/common/io/nio2/Nio2Service.java | 7 +-
.../apache/sshd/common/io/nio2/Nio2Session.java | 24 +-
.../common/session/AbstractSessionFactory.java | 1 +
.../apache/sshd/common/util/CloseableUtils.java | 16 +-
.../test/java/org/apache/sshd/ServerTest.java | 279 +++++++++++++------
10 files changed, 291 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/06e9af74/sshd-core/src/main/java/org/apache/sshd/SshClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/SshClient.java b/sshd-core/src/main/java/org/apache/sshd/SshClient.java
index 12337a8..5b62412 100644
--- a/sshd-core/src/main/java/org/apache/sshd/SshClient.java
+++ b/sshd-core/src/main/java/org/apache/sshd/SshClient.java
@@ -113,6 +113,7 @@ import org.bouncycastle.openssl.PasswordFinder;
public class SshClient extends AbstractFactoryManager implements ClientFactoryManager, Closeable {
public static final Factory<SshClient> DEFAULT_SSH_CLIENT_FACTORY = new Factory<SshClient>() {
+ @Override
public SshClient create() {
return new SshClient();
}
@@ -136,6 +137,7 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
this.sessionFactory = sessionFactory;
}
+ @Override
public ServerKeyVerifier getServerKeyVerifier() {
return serverKeyVerifier;
}
@@ -144,6 +146,7 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
this.serverKeyVerifier = serverKeyVerifier;
}
+ @Override
public UserInteraction getUserInteraction() {
return userInteraction;
}
@@ -152,6 +155,7 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
this.userInteraction = userInteraction;
}
+ @Override
public List<NamedFactory<UserAuth>> getUserAuthFactories() {
return userAuthFactories;
}
@@ -244,12 +248,14 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
protected Closeable getInnerCloseable() {
return builder()
.run(new Runnable() {
+ @Override
public void run() {
removeSessionTimeout(sessionFactory);
}
})
.sequential(connector, ioServiceFactory)
.run(new Runnable() {
+ @Override
public void run() {
connector = null;
ioServiceFactory = null;
@@ -279,6 +285,7 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
}
final ConnectFuture connectFuture = new DefaultConnectFuture(null);
connector.connect(address).addListener(new SshFutureListener<IoConnectFuture>() {
+ @Override
public void operationComplete(IoConnectFuture future) {
if (future.isCanceled()) {
connectFuture.cancel();
@@ -467,8 +474,10 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
try {
if (SecurityUtils.isBouncyCastleRegistered()) {
class KeyPairProviderLoader implements Callable<KeyPairProvider> {
+ @Override
public KeyPairProvider call() throws Exception {
return new FileKeyPairProvider(files.toArray(new String[files.size()]), new PasswordFinder() {
+ @Override
public char[] getPassword() {
try {
System.out.println("Enter password for private key: ");
@@ -494,10 +503,12 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
client.start();
client.setKeyPairProvider(provider);
client.setUserInteraction(new UserInteraction() {
+ @Override
public void welcome(String banner) {
System.out.println(banner);
}
+ @Override
public String[] interactive(String destination, String name, String instruction, String[] prompt, boolean[] echo) {
String[] answers = new String[prompt.length];
try {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/06e9af74/sshd-core/src/main/java/org/apache/sshd/common/io/IoConnector.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoConnector.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoConnector.java
index a195dca..f91a69a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/IoConnector.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoConnector.java
@@ -18,7 +18,6 @@
*/
package org.apache.sshd.common.io;
-import java.io.IOException;
import java.net.SocketAddress;
/**
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/06e9af74/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java
index 5593ea5..925c966 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java
@@ -51,6 +51,7 @@ public class Nio2Acceptor extends Nio2Service implements IoAcceptor {
backlog = FactoryManagerUtils.getIntProperty(manager, FactoryManager.SOCKET_BACKLOG, DEFAULT_BACKLOG);
}
+ @Override
public void bind(Collection<? extends SocketAddress> addresses) throws IOException {
for (SocketAddress address : addresses) {
logger.debug("Binding Nio2Acceptor to address {}", address);
@@ -68,15 +69,18 @@ public class Nio2Acceptor extends Nio2Service implements IoAcceptor {
}
}
+ @Override
public void bind(SocketAddress address) throws IOException {
bind(Collections.singleton(address));
}
+ @Override
public void unbind() {
logger.debug("Unbinding");
unbind(getBoundAddresses());
}
+ @Override
public void unbind(Collection<? extends SocketAddress> addresses) {
for (SocketAddress address : addresses) {
AsynchronousServerSocketChannel channel = channels.remove(address);
@@ -90,10 +94,12 @@ public class Nio2Acceptor extends Nio2Service implements IoAcceptor {
}
}
+ @Override
public void unbind(SocketAddress address) {
unbind(Collections.singleton(address));
}
+ @Override
public Set<SocketAddress> getBoundAddresses() {
return new HashSet<SocketAddress>(channels.keySet());
}
@@ -104,6 +110,7 @@ public class Nio2Acceptor extends Nio2Service implements IoAcceptor {
return super.close(immediately);
}
+ @Override
public void doCloseImmediately() {
for (SocketAddress address : channels.keySet()) {
try {
@@ -120,26 +127,53 @@ public class Nio2Acceptor extends Nio2Service implements IoAcceptor {
AcceptCompletionHandler(AsynchronousServerSocketChannel socket) {
this.socket = socket;
}
+ @SuppressWarnings("synthetic-access")
+ @Override
protected void onCompleted(AsynchronousSocketChannel result, SocketAddress address) {
// Verify that the address has not been unbound
if (!channels.containsKey(address)) {
return;
}
+
+ Nio2Session session=null;
try {
// Create a session
- Nio2Session session = new Nio2Session(Nio2Acceptor.this, manager, handler, result);
+ session = new Nio2Session(Nio2Acceptor.this, manager, handler, result);
handler.sessionCreated(session);
- sessions.put(session.getId(), session);
+ sessions.put(Long.valueOf(session.getId()), session);
session.startReading();
+ } catch (Throwable exc) {
+ failed(exc, address);
+
+ // fail fast the accepted connection
+ if (session != null) {
+ try {
+ session.close();
+ } catch(Throwable t) {
+ log.warn("Failed (" + t.getClass().getSimpleName() + ")"
+ + " to close accepted connection from " + address
+ + ": " + t.getMessage(),
+ t);
+ }
+ }
+ }
+
+ try {
// Accept new connections
socket.accept(address, this);
} catch (Throwable exc) {
failed(exc, address);
}
}
+
+ @SuppressWarnings("synthetic-access")
+ @Override
protected void onFailed(final Throwable exc, final SocketAddress address) {
if (channels.containsKey(address) && !disposing.get()) {
- logger.warn("Caught exception while accepting incoming connection", exc);
+ logger.warn("Caught " + exc.getClass().getSimpleName()
+ + " while accepting incoming connection from " + address
+ + ": " + exc.getMessage(),
+ exc);
}
}
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/06e9af74/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2CompletionHandler.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2CompletionHandler.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2CompletionHandler.java
index 5bc6e00..80b75da 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2CompletionHandler.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2CompletionHandler.java
@@ -26,8 +26,10 @@ import java.security.PrivilegedAction;
*/
public abstract class Nio2CompletionHandler<V,A> implements CompletionHandler<V,A> {
+ @Override
public void completed(final V result, final A attachment) {
AccessController.doPrivileged(new PrivilegedAction<Object>() {
+ @Override
public Object run() {
onCompleted(result, attachment);
return null;
@@ -35,8 +37,10 @@ public abstract class Nio2CompletionHandler<V,A> implements CompletionHandler<V,
});
}
+ @Override
public void failed(final Throwable exc, final A attachment) {
AccessController.doPrivileged(new PrivilegedAction<Object>() {
+ @Override
public Object run() {
onFailed(exc, attachment);
return null;
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/06e9af74/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Connector.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Connector.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Connector.java
index 1318280..d544cc8 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Connector.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Connector.java
@@ -39,6 +39,7 @@ public class Nio2Connector extends Nio2Service implements IoConnector {
super(manager, handler, group);
}
+ @Override
public IoConnectFuture connect(SocketAddress address) {
logger.debug("Connecting to {}", address);
final IoConnectFuture future = new DefaultIoConnectFuture(null);
@@ -51,11 +52,12 @@ public class Nio2Connector extends Nio2Service implements IoConnector {
setOption(socket, FactoryManager.SOCKET_SNDBUF, StandardSocketOptions.SO_SNDBUF, null);
setOption(socket, FactoryManager.TCP_NODELAY, StandardSocketOptions.TCP_NODELAY, null);
socket.connect(address, null, new Nio2CompletionHandler<Void, Object>() {
+ @Override
protected void onCompleted(Void result, Object attachment) {
try {
Nio2Session session = new Nio2Session(Nio2Connector.this, manager, handler, socket);
handler.sessionCreated(session);
- sessions.put(session.getId(), session);
+ sessions.put(Long.valueOf(session.getId()), session);
future.setSession(session);
session.startReading();
} catch (Throwable e) {
@@ -67,6 +69,7 @@ public class Nio2Connector extends Nio2Service implements IoConnector {
future.setException(e);
}
}
+ @Override
protected void onFailed(final Throwable exc, final Object attachment) {
future.setException(exc);
}
@@ -81,20 +84,25 @@ public class Nio2Connector extends Nio2Service implements IoConnector {
DefaultIoConnectFuture(Object lock) {
super(lock);
}
+ @Override
public IoSession getSession() {
Object v = getValue();
return v instanceof IoSession ? (IoSession) v : null;
}
+ @Override
public Throwable getException() {
Object v = getValue();
return v instanceof Throwable ? (Throwable) v : null;
}
+ @Override
public boolean isConnected() {
return getValue() instanceof IoSession;
}
+ @Override
public void setSession(IoSession session) {
setValue(session);
}
+ @Override
public void setException(Throwable exception) {
setValue(exception);
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/06e9af74/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
index 419e234..7c9af13 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
@@ -70,12 +70,13 @@ public abstract class Nio2Service extends CloseableUtils.AbstractInnerCloseable
return builder().parallel(sessions.values()).build();
}
+ @Override
public Map<Long, IoSession> getManagedSessions() {
return Collections.unmodifiableMap(sessions);
}
public void sessionClosed(Nio2Session session) {
- sessions.remove(session.getId());
+ sessions.remove(Long.valueOf(session.getId()));
}
protected <T> void setOption(NetworkChannel socket, String property, SocketOption<T> option, T defaultValue) throws IOException {
@@ -84,9 +85,9 @@ public abstract class Nio2Service extends CloseableUtils.AbstractInnerCloseable
if (!GenericUtils.isEmpty(valStr)) {
Class<T> type = option.type();
if (type == Integer.class) {
- val = type.cast(Integer.parseInt(valStr));
+ val = type.cast(Integer.valueOf(valStr));
} else if (type == Boolean.class) {
- val = type.cast(Boolean.parseBoolean(valStr));
+ val = type.cast(Boolean.valueOf(valStr));
} else {
throw new IllegalStateException("Unsupported socket option type " + type);
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/06e9af74/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
index a0ceaf0..d98f46a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
@@ -72,22 +72,27 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
log.debug("Creating IoSession on {} from {}", localAddress, remoteAddress);
}
+ @Override
public long getId() {
return id;
}
+ @Override
public Object getAttribute(Object key) {
return attributes.get(key);
}
+ @Override
public Object setAttribute(Object key, Object value) {
return attributes.put(key, value);
}
+ @Override
public SocketAddress getRemoteAddress() {
return remoteAddress;
}
+ @Override
public SocketAddress getLocalAddress() {
return localAddress;
}
@@ -105,8 +110,12 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
}
}
+ @Override
public IoWriteFuture write(Buffer buffer) {
- log.debug("Writing {} bytes", buffer.available());
+ if (log.isDebugEnabled()) {
+ log.debug("Writing {} bytes", Integer.valueOf(buffer.available()));
+ }
+
ByteBuffer buf = ByteBuffer.wrap(buffer.array(), buffer.rpos(), buffer.available());
final DefaultIoWriteFuture future = new DefaultIoWriteFuture(null, buf);
if (isClosing()) {
@@ -166,6 +175,7 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
}
}
+ @Override
public IoService getService() {
return service;
}
@@ -188,9 +198,11 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
public void startReading(final ByteBuffer buffer) {
doReadCycle(buffer, new Readable() {
+ @Override
public int available() {
return buffer.remaining();
}
+ @Override
public void getRawBytes(byte[] data, int offset, int len) {
buffer.get(data, offset, len);
}
@@ -199,10 +211,11 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
protected void doReadCycle(final ByteBuffer buffer, final Readable bufReader) {
final Nio2CompletionHandler<Integer, Object> completion = new Nio2CompletionHandler<Integer, Object>() {
+ @Override
@SuppressWarnings("synthetic-access")
protected void onCompleted(Integer result, Object attachment) {
try {
- if (result >= 0) {
+ if (result.intValue() >= 0) {
log.debug("Read {} bytes", result);
buffer.flip();
handler.messageReceived(Nio2Session.this, bufReader);
@@ -222,6 +235,7 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
}
}
+ @Override
@SuppressWarnings("synthetic-access")
protected void onFailed(Throwable exc, Object attachment) {
exceptionCaught(exc);
@@ -240,6 +254,7 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
if (currentWrite.compareAndSet(null, future)) {
try {
socket.write(future.buffer, null, new Nio2CompletionHandler<Integer, Object>() {
+ @Override
protected void onCompleted(Integer result, Object attachment) {
if (future.buffer.hasRemaining()) {
try {
@@ -255,6 +270,7 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
finishWrite();
}
}
+ @Override
protected void onFailed(Throwable exc, Object attachment) {
future.setException(exc);
exceptionCaught(exc);
@@ -280,6 +296,7 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
super(lock);
this.buffer = buffer;
}
+ @Override
public void verify() throws SshException {
try {
await();
@@ -292,12 +309,14 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
}
}
+ @Override
public boolean isWritten() {
return getValue() instanceof Boolean;
}
public void setWritten() {
setValue(Boolean.TRUE);
}
+ @Override
public Throwable getException() {
Object v = getValue();
return v instanceof Throwable ? (Throwable) v : null;
@@ -310,6 +329,7 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
}
}
+ @Override
public String toString() {
return getClass().getSimpleName() + "[local=" + localAddress + ", remote=" + remoteAddress + "]";
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/06e9af74/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSessionFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSessionFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSessionFactory.java
index 5127538..763ab81 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSessionFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSessionFactory.java
@@ -38,6 +38,7 @@ public abstract class AbstractSessionFactory extends AbstractSessionIoHandler {
super();
}
+ @Override
protected AbstractSession createSession(IoSession ioSession) throws Exception {
AbstractSession session = doCreateSession(ioSession);
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/06e9af74/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java b/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java
index 243d227..c0242ae 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java
@@ -97,7 +97,7 @@ public class CloseableUtils {
return this;
}
- public <T extends SshFuture> Builder when(SshFuture<T>... futures) {
+ public <T extends SshFuture> Builder when(@SuppressWarnings("unchecked") SshFuture<T>... futures) {
return when(Arrays.asList(futures));
}
@@ -137,6 +137,7 @@ public class CloseableUtils {
return this;
}
+ @Override
public Closeable build() {
if (closeables.isEmpty()) {
return new SimpleCloseable(lock);
@@ -167,12 +168,15 @@ public class CloseableUtils {
closing = new AtomicBoolean();
}
+ @Override
public boolean isClosed() {
return future.isClosed();
}
+ @Override
public boolean isClosing() {
return closing.get();
}
+ @Override
public CloseFuture close(boolean immediately) {
if (closing.compareAndSet(false, true)) {
doClose(immediately);
@@ -194,9 +198,11 @@ public class CloseableUtils {
this.closeables = closeables;
}
+ @Override
protected void doClose(final boolean immediately) {
final AtomicInteger count = new AtomicInteger(1);
SshFutureListener<CloseFuture> listener = new SshFutureListener<CloseFuture>() {
+ @Override
public void operationComplete(CloseFuture f) {
if (count.decrementAndGet() == 0) {
future.setClosed();
@@ -222,9 +228,11 @@ public class CloseableUtils {
this.closeables = closeables;
}
+ @Override
protected void doClose(final boolean immediately) {
final Iterator<? extends Closeable> iterator = closeables.iterator();
SshFutureListener<CloseFuture> listener = new SshFutureListener<CloseFuture>() {
+ @Override
public void operationComplete(CloseFuture previousFuture) {
while (iterator.hasNext()) {
Closeable c = iterator.next();
@@ -252,6 +260,7 @@ public class CloseableUtils {
this.futures = futures;
}
+ @Override
protected void doClose(boolean immediately) {
if (immediately) {
for (SshFuture<?> f : futures) {
@@ -263,6 +272,7 @@ public class CloseableUtils {
} else {
final AtomicInteger count = new AtomicInteger(1);
SshFutureListener<T> listener = new SshFutureListener<T>() {
+ @Override
public void operationComplete(T f) {
if (count.decrementAndGet() == 0) {
future.setClosed();
@@ -294,6 +304,7 @@ public class CloseableUtils {
/** A future that will be set 'closed' when the object is actually closed */
protected final CloseFuture closeFuture = new DefaultCloseFuture(lock);
+ @Override
public CloseFuture close(boolean immediately) {
if (immediately) {
if (state.compareAndSet(State.Opened, State.Immediate)
@@ -312,6 +323,7 @@ public class CloseableUtils {
SshFuture<CloseFuture> grace = doCloseGracefully();
if (grace != null) {
grace.addListener(new SshFutureListener<CloseFuture>() {
+ @Override
public void operationComplete(CloseFuture future) {
if (state.compareAndSet(State.Graceful, State.Immediate)) {
doCloseImmediately();
@@ -332,10 +344,12 @@ public class CloseableUtils {
return closeFuture;
}
+ @Override
public boolean isClosed() {
return state.get() == State.Closed;
}
+ @Override
public boolean isClosing() {
return state.get() != State.Opened;
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/06e9af74/sshd-core/src/test/java/org/apache/sshd/ServerTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/ServerTest.java b/sshd-core/src/test/java/org/apache/sshd/ServerTest.java
index 9e82ca6..9b52296 100644
--- a/sshd-core/src/test/java/org/apache/sshd/ServerTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/ServerTest.java
@@ -29,13 +29,15 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
+import java.net.SocketAddress;
import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.log4j.Logger;
import org.apache.sshd.client.SessionFactory;
import org.apache.sshd.client.channel.ChannelExec;
import org.apache.sshd.client.channel.ChannelShell;
@@ -43,6 +45,8 @@ import org.apache.sshd.client.future.AuthFuture;
import org.apache.sshd.client.session.ClientConnectionService;
import org.apache.sshd.client.session.ClientSessionImpl;
import org.apache.sshd.common.Channel;
+import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.FactoryManagerUtils;
import org.apache.sshd.common.NamedFactory;
import org.apache.sshd.common.Session;
import org.apache.sshd.common.SessionListener;
@@ -51,11 +55,13 @@ import org.apache.sshd.common.channel.WindowClosedException;
import org.apache.sshd.common.io.IoSession;
import org.apache.sshd.common.session.AbstractConnectionService;
import org.apache.sshd.common.session.AbstractSession;
+import org.apache.sshd.deprecated.ClientUserAuthServiceOld;
import org.apache.sshd.deprecated.UserAuthPassword;
import org.apache.sshd.server.Command;
import org.apache.sshd.server.CommandFactory;
import org.apache.sshd.server.Environment;
import org.apache.sshd.server.ExitCallback;
+import org.apache.sshd.server.ServerFactoryManager;
import org.apache.sshd.server.command.ScpCommandFactory;
import org.apache.sshd.server.sftp.SftpSubsystemFactory;
import org.apache.sshd.util.BaseTest;
@@ -66,8 +72,8 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-
-import org.apache.sshd.deprecated.ClientUserAuthServiceOld;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* TODO Add javadoc
@@ -107,7 +113,8 @@ public class ServerTest extends BaseTest {
*/
@Test
public void testFailAuthenticationWithWaitFor() throws Exception {
- sshd.getProperties().put(SshServer.MAX_AUTH_REQUESTS, "10");
+ final int MAX_AUTH_REQUESTS=10;
+ FactoryManagerUtils.updateProperty(sshd, ServerFactoryManager.MAX_AUTH_REQUESTS, MAX_AUTH_REQUESTS);
client = SshClient.setUpDefaultClient();
client.setServiceFactories(Arrays.asList(
@@ -115,24 +122,27 @@ public class ServerTest extends BaseTest {
new ClientConnectionService.Factory()
));
client.start();
- ClientSession s = client.connect("smx", "localhost", port).await().getSession();
- int nbTrials = 0;
- int res = 0;
- while ((res & ClientSession.CLOSED) == 0) {
- nbTrials ++;
- s.getService(ClientUserAuthServiceOld.class)
- .auth(new UserAuthPassword((ClientSessionImpl) s, "ssh-connection", "buggy"));
- res = s.waitFor(ClientSession.CLOSED | ClientSession.WAIT_AUTH, 5000);
- if (res == ClientSession.TIMEOUT) {
- throw new TimeoutException();
+
+ try(ClientSession s = client.connect("smx", "localhost", port).await().getSession()) {
+ int nbTrials = 0;
+ int res = 0;
+ while ((res & ClientSession.CLOSED) == 0) {
+ nbTrials ++;
+ s.getService(ClientUserAuthServiceOld.class)
+ .auth(new UserAuthPassword((ClientSessionImpl) s, "ssh-connection", "buggy"));
+ res = s.waitFor(ClientSession.CLOSED | ClientSession.WAIT_AUTH, 5000);
+ if (res == ClientSession.TIMEOUT) {
+ throw new TimeoutException();
+ }
}
+ assertTrue("Number trials (" + nbTrials + ") below min.=" + MAX_AUTH_REQUESTS, nbTrials > MAX_AUTH_REQUESTS);
}
- assertTrue(nbTrials > 10);
}
@Test
public void testFailAuthenticationWithFuture() throws Exception {
- sshd.getProperties().put(SshServer.MAX_AUTH_REQUESTS, "10");
+ final int MAX_AUTH_REQUESTS=10;
+ FactoryManagerUtils.updateProperty(sshd, ServerFactoryManager.MAX_AUTH_REQUESTS, MAX_AUTH_REQUESTS);
client = SshClient.setUpDefaultClient();
client.setServiceFactories(Arrays.asList(
@@ -140,47 +150,54 @@ public class ServerTest extends BaseTest {
new ClientConnectionService.Factory()
));
client.start();
- ClientSession s = client.connect("smx", "localhost", port).await().getSession();
- int nbTrials = 0;
- AuthFuture authFuture;
- do {
- nbTrials++;
- assertTrue(nbTrials < 100);
- authFuture = s.getService(ClientUserAuthServiceOld.class)
- .auth(new UserAuthPassword((ClientSessionImpl) s, "ssh-connection", "buggy"));
- assertTrue(authFuture.await(5000));
- assertTrue(authFuture.isDone());
- assertFalse(authFuture.isSuccess());
+ try(ClientSession s = client.connect("smx", "localhost", port).await().getSession()) {
+ int nbTrials = 0;
+ AuthFuture authFuture;
+ do {
+ nbTrials++;
+ assertTrue(nbTrials < 100);
+ authFuture = s.getService(ClientUserAuthServiceOld.class)
+ .auth(new UserAuthPassword((ClientSessionImpl) s, "ssh-connection", "buggy"));
+ assertTrue(authFuture.await(5000));
+ assertTrue(authFuture.isDone());
+ assertFalse(authFuture.isSuccess());
+ }
+ while (authFuture.isFailure());
+ assertNotNull("Missing auth future exception", authFuture.getException());
+ assertTrue("Number trials (" + nbTrials + ") below min.=" + MAX_AUTH_REQUESTS, nbTrials > MAX_AUTH_REQUESTS);
}
- while (authFuture.isFailure());
- assertNotNull(authFuture.getException());
- assertTrue(nbTrials > 10);
}
@Test
public void testAuthenticationTimeout() throws Exception {
- sshd.getProperties().put(SshServer.AUTH_TIMEOUT, "5000");
+ final int AUTH_TIMEOUT=5000;
+ FactoryManagerUtils.updateProperty(sshd, FactoryManager.AUTH_TIMEOUT, AUTH_TIMEOUT);
client = SshClient.setUpDefaultClient();
client.start();
- ClientSession s = client.connect("test", "localhost", port).await().getSession();
- int res = s.waitFor(ClientSession.CLOSED, 10000);
- assertEquals("Session should be closed", ClientSession.CLOSED | ClientSession.WAIT_AUTH, res);
+ try(ClientSession s = client.connect("test", "localhost", port).await().getSession()) {
+ int res = s.waitFor(ClientSession.CLOSED, 2 * AUTH_TIMEOUT);
+ assertEquals("Session should be closed", ClientSession.CLOSED | ClientSession.WAIT_AUTH, res);
+ }
}
@Test
public void testIdleTimeout() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
TestEchoShellFactory.TestEchoShell.latch = new CountDownLatch(1);
+ final int IDLE_TIMEOUT=2500;
+ FactoryManagerUtils.updateProperty(sshd, FactoryManager.IDLE_TIMEOUT, IDLE_TIMEOUT);
- sshd.getProperties().put(SshServer.IDLE_TIMEOUT, "2500");
sshd.getSessionFactory().addListener(new SessionListener() {
+ @Override
public void sessionCreated(Session session) {
System.out.println("Session created");
}
+ @Override
public void sessionEvent(Session session, Event event) {
System.out.println("Session event: " + event);
}
+ @Override
public void sessionClosed(Session session) {
System.out.println("Session closed");
latch.countDown();
@@ -189,17 +206,19 @@ public class ServerTest extends BaseTest {
client = SshClient.setUpDefaultClient();
client.start();
- ClientSession s = client.connect("test", "localhost", port).await().getSession();
- s.addPasswordIdentity("test");
- s.auth().verify();
- ChannelShell shell = s.createShellChannel();
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ByteArrayOutputStream err = new ByteArrayOutputStream();
- shell.setOut(out);
- shell.setErr(err);
- shell.open().await();
- int res = s.waitFor(ClientSession.CLOSED, 5000);
- assertEquals("Session should be closed", ClientSession.CLOSED | ClientSession.AUTHED, res);
+ try(ClientSession s = client.connect("test", "localhost", port).await().getSession()) {
+ s.addPasswordIdentity("test");
+ s.auth().verify();
+ try(ChannelShell shell = s.createShellChannel();
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+ shell.setOut(out);
+ shell.setErr(err);
+ shell.open().await();
+ int res = s.waitFor(ClientSession.CLOSED, 2 * IDLE_TIMEOUT);
+ assertEquals("Session should be closed", ClientSession.CLOSED | ClientSession.AUTHED, res);
+ }
+ }
assertTrue(latch.await(1, TimeUnit.SECONDS));
assertTrue(TestEchoShellFactory.TestEchoShell.latch.await(1, TimeUnit.SECONDS));
}
@@ -216,17 +235,20 @@ public class ServerTest extends BaseTest {
final CountDownLatch latch = new CountDownLatch(1);
sshd.setCommandFactory(new StreamCommand.Factory());
- sshd.getProperties().put(SshServer.IDLE_TIMEOUT, "5000");
- sshd.getProperties().put(SshServer.DISCONNECT_TIMEOUT, "2000");
+ sshd.getProperties().put(FactoryManager.IDLE_TIMEOUT, "5000");
+ sshd.getProperties().put(FactoryManager.DISCONNECT_TIMEOUT, "2000");
sshd.getSessionFactory().addListener(new SessionListener() {
+ @Override
public void sessionCreated(Session session) {
System.out.println("Session created");
}
+ @Override
public void sessionEvent(Session session, Event event) {
System.out.println("Session event: " + event);
}
+ @Override
public void sessionClosed(Session session) {
System.out.println("Session closed");
latch.countDown();
@@ -236,29 +258,32 @@ public class ServerTest extends BaseTest {
client = SshClient.setUpDefaultClient();
client.start();
- ClientSession s = client.connect("test", "localhost", port).await().getSession();
- s.addPasswordIdentity("test");
- s.auth().verify();
- ChannelExec shell = s.createExecChannel("normal");
- // Create a pipe that will block reading when the buffer is full
- PipedInputStream pis = new PipedInputStream();
- PipedOutputStream pos = new PipedOutputStream(pis);
- shell.setOut(pos);
- shell.open().await();
-
- AbstractSession serverSession = sshd.getActiveSessions().iterator().next();
- Channel channel = serverSession.getService(AbstractConnectionService.class).getChannels().iterator().next();
- while (channel.getRemoteWindow().getSize() > 0) {
- Thread.sleep(1);
+ try(ClientSession s = client.connect("test", "localhost", port).await().getSession()) {
+ s.addPasswordIdentity("test");
+ s.auth().verify();
+ try(ChannelExec shell = s.createExecChannel("normal");
+ // Create a pipe that will block reading when the buffer is full
+ PipedInputStream pis = new PipedInputStream();
+ PipedOutputStream pos = new PipedOutputStream(pis)) {
+ shell.setOut(pos);
+ shell.open().await();
+
+ try(AbstractSession serverSession = sshd.getActiveSessions().iterator().next();
+ Channel channel = serverSession.getService(AbstractConnectionService.class).getChannels().iterator().next()) {
+ while (channel.getRemoteWindow().getSize() > 0) {
+ Thread.sleep(1);
+ }
+
+ LoggerFactory.getLogger(getClass()).info("Waiting for session idle timeouts");
+
+ long t0 = System.currentTimeMillis();
+ latch.await(1, TimeUnit.MINUTES);
+ long t1 = System.currentTimeMillis();
+ assertTrue(t1 - t0 > 7000);
+ assertTrue(t1 - t0 < 10000);
+ }
+ }
}
-
- Logger.getLogger(getClass()).info("Waiting for session idle timeouts");
-
- long t0 = System.currentTimeMillis();
- latch.await(1, TimeUnit.MINUTES);
- long t1 = System.currentTimeMillis();
- assertTrue(t1 - t0 > 7000);
- assertTrue(t1 - t0 < 10000);
}
@Test
@@ -279,25 +304,29 @@ public class ServerTest extends BaseTest {
}
});
client.start();
- ClientSession s = client.connect("test", "localhost", port).await().getSession();
- s.close(false);
+ try(ClientSession s = client.connect("test", "localhost", port).await().getSession()) {
+ s.close(false);
+ }
}
@Test
public void testKexCompletedEvent() throws Exception {
final AtomicInteger serverEventCount=new AtomicInteger(0);
sshd.getSessionFactory().addListener(new SessionListener() {
- public void sessionCreated(Session session) {
+ @Override
+ public void sessionCreated(Session session) {
// ignored
}
- public void sessionEvent(Session session, Event event) {
+ @Override
+ public void sessionEvent(Session session, Event event) {
if (event == Event.KexCompleted) {
serverEventCount.incrementAndGet();
}
}
- public void sessionClosed(Session session) {
+ @Override
+ public void sessionClosed(Session session) {
// ignored
}
});
@@ -306,27 +335,98 @@ public class ServerTest extends BaseTest {
client.start();
final AtomicInteger clientEventCount=new AtomicInteger(0);
client.getSessionFactory().addListener(new SessionListener() {
- public void sessionCreated(Session session) {
+ @Override
+ public void sessionCreated(Session session) {
// ignored
}
- public void sessionEvent(Session session, Event event) {
+ @Override
+ public void sessionEvent(Session session, Event event) {
if (event == Event.KexCompleted) {
clientEventCount.incrementAndGet();
}
}
- public void sessionClosed(Session session) {
+ @Override
+ public void sessionClosed(Session session) {
// ignored
}
});
- ClientSession s = client.connect("test", "localhost", port).await().getSession();
- s.addPasswordIdentity("test");
- s.auth().verify();
- Assert.assertEquals("Mismatched client events count", 1, clientEventCount.get());
- Assert.assertEquals("Mismatched server events count", 1, serverEventCount.get());
- s.close(false);
+ try(ClientSession s = client.connect("test", "localhost", port).await().getSession()) {
+ s.addPasswordIdentity("test");
+ s.auth().verify();
+ Assert.assertEquals("Mismatched client events count", 1, clientEventCount.get());
+ Assert.assertEquals("Mismatched server events count", 1, serverEventCount.get());
+ s.close(false);
+ }
+ }
+
+ @Test // see https://issues.apache.org/jira/browse/SSHD-456
+ public void testServerStillListensIfSessionListenerThrowsException() throws InterruptedException {
+ final Map<String,SocketAddress> eventsMap = new TreeMap<String, SocketAddress>(String.CASE_INSENSITIVE_ORDER);
+ sshd.getSessionFactory().addListener(new SessionListener() {
+ private final Logger log=LoggerFactory.getLogger(getClass());
+ @Override
+ public void sessionCreated(Session session) {
+ throwException("SessionCreated", session);
+ }
+
+ @Override
+ public void sessionEvent(Session session, Event event) {
+ throwException("SessionEvent", session);
+ }
+
+ @Override
+ public void sessionClosed(Session session) {
+ throwException("SessionClosed", session);
+ }
+
+ private void throwException(String phase, Session session) {
+ IoSession ioSession = session.getIoSession();
+ SocketAddress addr = ioSession.getRemoteAddress();
+ synchronized (eventsMap) {
+ if (eventsMap.put(phase, addr) != null) {
+ return; // already generated an event for this phase
+ }
+ }
+
+ RuntimeException e = new RuntimeException("Synthetic exception at phase=" + phase + ": " + addr);
+ log.info(e.getMessage());
+ throw e;
+ }
+ });
+
+ client = SshClient.setUpDefaultClient();
+ client.start();
+
+ int curCount=0;
+ for (int retryCount=0; retryCount < Byte.SIZE; retryCount++){
+ synchronized(eventsMap) {
+ if ((curCount=eventsMap.size()) >= 3) {
+ return;
+ }
+ }
+
+ try {
+ try(ClientSession s = client.connect("test", "localhost", port).await().getSession()) {
+ s.addPasswordIdentity("test");
+ s.auth().verify();
+ }
+
+ synchronized(eventsMap) {
+ assertTrue("Unexpected premature success: " + eventsMap, eventsMap.size() >= 3);
+ }
+ } catch(IOException e) {
+ // expected - ignored
+ synchronized(eventsMap) {
+ int nextCount=eventsMap.size();
+ assertTrue("No session event generated", nextCount > curCount);
+ }
+ }
+ }
+
+ Assert.fail("No success to authenticate");
}
public static class TestEchoShellFactory extends EchoShellFactory {
@@ -368,7 +468,7 @@ public class ServerTest extends BaseTest {
@Override
public void setInputStream(InputStream in) {
-
+ // ignored
}
@Override
@@ -378,12 +478,12 @@ public class ServerTest extends BaseTest {
@Override
public void setErrorStream(OutputStream err) {
-
+ // ignored
}
@Override
public void setExitCallback(ExitCallback callback) {
-
+ // ignored
}
@Override
@@ -426,11 +526,9 @@ public class ServerTest extends BaseTest {
}
}
-
-
public static void main(String[] args) throws Exception {
SshServer sshd = SshServer.setUpDefaultServer();
- sshd.getProperties().put(SshServer.IDLE_TIMEOUT, "10000");
+ sshd.getProperties().put(FactoryManager.IDLE_TIMEOUT, "10000");
sshd.setPort(8001);
sshd.setKeyPairProvider(Utils.createTestHostKeyProvider());
sshd.setSubsystemFactories(Arrays.<NamedFactory<Command>>asList(new SftpSubsystemFactory()));
@@ -440,5 +538,4 @@ public class ServerTest extends BaseTest {
sshd.start();
Thread.sleep(100000);
}
-
}