You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2015/05/14 15:58:48 UTC

mina-sshd git commit: [SSHD-456] Nio2Acceptor does not unmanage the session upon exception in SessionListener

Repository: mina-sshd
Updated Branches:
  refs/heads/master fbb784347 -> 06e9af740


[SSHD-456] Nio2Acceptor does not unmanage the session upon exception in SessionListener


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/06e9af74
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/06e9af74
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/06e9af74

Branch: refs/heads/master
Commit: 06e9af740100ec0ca276bdbbcc7c02a049fb406b
Parents: fbb7843
Author: Lyor Goldstein <lg...@vmware.com>
Authored: Thu May 14 16:58:40 2015 +0300
Committer: Lyor Goldstein <lg...@vmware.com>
Committed: Thu May 14 16:58:40 2015 +0300

----------------------------------------------------------------------
 .../main/java/org/apache/sshd/SshClient.java    |  11 +
 .../org/apache/sshd/common/io/IoConnector.java  |   1 -
 .../sshd/common/io/nio2/Nio2Acceptor.java       |  40 ++-
 .../common/io/nio2/Nio2CompletionHandler.java   |   4 +
 .../sshd/common/io/nio2/Nio2Connector.java      |  10 +-
 .../apache/sshd/common/io/nio2/Nio2Service.java |   7 +-
 .../apache/sshd/common/io/nio2/Nio2Session.java |  24 +-
 .../common/session/AbstractSessionFactory.java  |   1 +
 .../apache/sshd/common/util/CloseableUtils.java |  16 +-
 .../test/java/org/apache/sshd/ServerTest.java   | 279 +++++++++++++------
 10 files changed, 291 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/06e9af74/sshd-core/src/main/java/org/apache/sshd/SshClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/SshClient.java b/sshd-core/src/main/java/org/apache/sshd/SshClient.java
index 12337a8..5b62412 100644
--- a/sshd-core/src/main/java/org/apache/sshd/SshClient.java
+++ b/sshd-core/src/main/java/org/apache/sshd/SshClient.java
@@ -113,6 +113,7 @@ import org.bouncycastle.openssl.PasswordFinder;
 public class SshClient extends AbstractFactoryManager implements ClientFactoryManager, Closeable {
 
     public static final Factory<SshClient> DEFAULT_SSH_CLIENT_FACTORY = new Factory<SshClient>() {
+        @Override
         public SshClient create() {
             return new SshClient();
         }
@@ -136,6 +137,7 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
         this.sessionFactory = sessionFactory;
     }
 
+    @Override
     public ServerKeyVerifier getServerKeyVerifier() {
         return serverKeyVerifier;
     }
@@ -144,6 +146,7 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
         this.serverKeyVerifier = serverKeyVerifier;
     }
 
+    @Override
     public UserInteraction getUserInteraction() {
         return userInteraction;
     }
@@ -152,6 +155,7 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
         this.userInteraction = userInteraction;
     }
 
+    @Override
     public List<NamedFactory<UserAuth>> getUserAuthFactories() {
         return userAuthFactories;
     }
@@ -244,12 +248,14 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
     protected Closeable getInnerCloseable() {
         return builder()
                 .run(new Runnable() {
+                    @Override
                     public void run() {
                         removeSessionTimeout(sessionFactory);
                     }
                 })
                 .sequential(connector, ioServiceFactory)
                 .run(new Runnable() {
+                    @Override
                     public void run() {
                         connector = null;
                         ioServiceFactory = null;
@@ -279,6 +285,7 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
         }
         final ConnectFuture connectFuture = new DefaultConnectFuture(null);
         connector.connect(address).addListener(new SshFutureListener<IoConnectFuture>() {
+            @Override
             public void operationComplete(IoConnectFuture future) {
                 if (future.isCanceled()) {
                     connectFuture.cancel();
@@ -467,8 +474,10 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
             try {
                 if (SecurityUtils.isBouncyCastleRegistered()) {
                     class KeyPairProviderLoader implements Callable<KeyPairProvider> {
+                        @Override
                         public KeyPairProvider call() throws Exception {
                             return new FileKeyPairProvider(files.toArray(new String[files.size()]), new PasswordFinder() {
+                                @Override
                                 public char[] getPassword() {
                                     try {
                                         System.out.println("Enter password for private key: ");
@@ -494,10 +503,12 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
         client.start();
         client.setKeyPairProvider(provider);
         client.setUserInteraction(new UserInteraction() {
+            @Override
             public void welcome(String banner) {
                 System.out.println(banner);
             }
 
+            @Override
             public String[] interactive(String destination, String name, String instruction, String[] prompt, boolean[] echo) {
                 String[] answers = new String[prompt.length];
                 try {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/06e9af74/sshd-core/src/main/java/org/apache/sshd/common/io/IoConnector.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoConnector.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoConnector.java
index a195dca..f91a69a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/IoConnector.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoConnector.java
@@ -18,7 +18,6 @@
  */
 package org.apache.sshd.common.io;
 
-import java.io.IOException;
 import java.net.SocketAddress;
 
 /**

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/06e9af74/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java
index 5593ea5..925c966 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java
@@ -51,6 +51,7 @@ public class Nio2Acceptor extends Nio2Service implements IoAcceptor {
         backlog = FactoryManagerUtils.getIntProperty(manager, FactoryManager.SOCKET_BACKLOG, DEFAULT_BACKLOG);
     }
 
+    @Override
     public void bind(Collection<? extends SocketAddress> addresses) throws IOException {
         for (SocketAddress address : addresses) {
             logger.debug("Binding Nio2Acceptor to address {}", address);
@@ -68,15 +69,18 @@ public class Nio2Acceptor extends Nio2Service implements IoAcceptor {
         }
     }
 
+    @Override
     public void bind(SocketAddress address) throws IOException {
         bind(Collections.singleton(address));
     }
 
+    @Override
     public void unbind() {
         logger.debug("Unbinding");
         unbind(getBoundAddresses());
     }
 
+    @Override
     public void unbind(Collection<? extends SocketAddress> addresses) {
         for (SocketAddress address : addresses) {
             AsynchronousServerSocketChannel channel = channels.remove(address);
@@ -90,10 +94,12 @@ public class Nio2Acceptor extends Nio2Service implements IoAcceptor {
         }
     }
 
+    @Override
     public void unbind(SocketAddress address) {
         unbind(Collections.singleton(address));
     }
 
+    @Override
     public Set<SocketAddress> getBoundAddresses() {
         return new HashSet<SocketAddress>(channels.keySet());
     }
@@ -104,6 +110,7 @@ public class Nio2Acceptor extends Nio2Service implements IoAcceptor {
         return super.close(immediately);
     }
 
+    @Override
     public void doCloseImmediately() {
         for (SocketAddress address : channels.keySet()) {
             try {
@@ -120,26 +127,53 @@ public class Nio2Acceptor extends Nio2Service implements IoAcceptor {
         AcceptCompletionHandler(AsynchronousServerSocketChannel socket) {
             this.socket = socket;
         }
+        @SuppressWarnings("synthetic-access")
+        @Override
         protected void onCompleted(AsynchronousSocketChannel result, SocketAddress address) {
             // Verify that the address has not been unbound
             if (!channels.containsKey(address)) {
                 return;
             }
+
+            Nio2Session session=null;
             try {
                 // Create a session
-                Nio2Session session = new Nio2Session(Nio2Acceptor.this, manager, handler, result);
+                session = new Nio2Session(Nio2Acceptor.this, manager, handler, result);
                 handler.sessionCreated(session);
-                sessions.put(session.getId(), session);
+                sessions.put(Long.valueOf(session.getId()), session);
                 session.startReading();
+            } catch (Throwable exc) {
+                failed(exc, address);
+
+                // fail fast the accepted connection
+                if (session != null) {
+                    try {
+                        session.close();
+                    } catch(Throwable t) {
+                        log.warn("Failed (" + t.getClass().getSimpleName() + ")"
+                                + " to close accepted connection from " + address
+                                + ": " + t.getMessage(),
+                                 t);
+                    }
+                }
+            }
+            
+            try {
                 // Accept new connections
                 socket.accept(address, this);
             } catch (Throwable exc) {
                 failed(exc, address);
             }
         }
+
+        @SuppressWarnings("synthetic-access")
+        @Override
         protected void onFailed(final Throwable exc, final SocketAddress address) {
             if (channels.containsKey(address) && !disposing.get()) {
-                logger.warn("Caught exception while accepting incoming connection", exc);
+                logger.warn("Caught " + exc.getClass().getSimpleName()
+                          + " while accepting incoming connection from " + address
+                          + ": " + exc.getMessage(),
+                            exc);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/06e9af74/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2CompletionHandler.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2CompletionHandler.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2CompletionHandler.java
index 5bc6e00..80b75da 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2CompletionHandler.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2CompletionHandler.java
@@ -26,8 +26,10 @@ import java.security.PrivilegedAction;
  */
 public abstract class Nio2CompletionHandler<V,A> implements CompletionHandler<V,A> {
 
+    @Override
     public void completed(final V result, final A attachment) {
         AccessController.doPrivileged(new PrivilegedAction<Object>() {
+            @Override
             public Object run() {
                 onCompleted(result, attachment);
                 return null;
@@ -35,8 +37,10 @@ public abstract class Nio2CompletionHandler<V,A> implements CompletionHandler<V,
         });
     }
 
+    @Override
     public void failed(final Throwable exc, final A attachment) {
         AccessController.doPrivileged(new PrivilegedAction<Object>() {
+            @Override
             public Object run() {
                 onFailed(exc, attachment);
                 return null;

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/06e9af74/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Connector.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Connector.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Connector.java
index 1318280..d544cc8 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Connector.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Connector.java
@@ -39,6 +39,7 @@ public class Nio2Connector extends Nio2Service implements IoConnector {
         super(manager, handler, group);
     }
 
+    @Override
     public IoConnectFuture connect(SocketAddress address) {
         logger.debug("Connecting to {}", address);
         final IoConnectFuture future = new DefaultIoConnectFuture(null);
@@ -51,11 +52,12 @@ public class Nio2Connector extends Nio2Service implements IoConnector {
             setOption(socket, FactoryManager.SOCKET_SNDBUF, StandardSocketOptions.SO_SNDBUF, null);
             setOption(socket, FactoryManager.TCP_NODELAY, StandardSocketOptions.TCP_NODELAY, null);
             socket.connect(address, null, new Nio2CompletionHandler<Void, Object>() {
+                @Override
                 protected void onCompleted(Void result, Object attachment) {
                     try {
                         Nio2Session session = new Nio2Session(Nio2Connector.this, manager, handler, socket);
                         handler.sessionCreated(session);
-                        sessions.put(session.getId(), session);
+                        sessions.put(Long.valueOf(session.getId()), session);
                         future.setSession(session);
                         session.startReading();
                     } catch (Throwable e) {
@@ -67,6 +69,7 @@ public class Nio2Connector extends Nio2Service implements IoConnector {
                         future.setException(e);
                     }
                 }
+                @Override
                 protected void onFailed(final Throwable exc, final Object attachment) {
                     future.setException(exc);
                 }
@@ -81,20 +84,25 @@ public class Nio2Connector extends Nio2Service implements IoConnector {
         DefaultIoConnectFuture(Object lock) {
             super(lock);
         }
+        @Override
         public IoSession getSession() {
             Object v = getValue();
             return v instanceof IoSession ? (IoSession) v : null;
         }
+        @Override
         public Throwable getException() {
             Object v = getValue();
             return v instanceof Throwable ? (Throwable) v : null;
         }
+        @Override
         public boolean isConnected() {
             return getValue() instanceof IoSession;
         }
+        @Override
         public void setSession(IoSession session) {
             setValue(session);
         }
+        @Override
         public void setException(Throwable exception) {
             setValue(exception);
         }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/06e9af74/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
index 419e234..7c9af13 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
@@ -70,12 +70,13 @@ public abstract class Nio2Service extends CloseableUtils.AbstractInnerCloseable
         return builder().parallel(sessions.values()).build();
     }
 
+    @Override
     public Map<Long, IoSession> getManagedSessions() {
         return Collections.unmodifiableMap(sessions);
     }
 
     public void sessionClosed(Nio2Session session) {
-        sessions.remove(session.getId());
+        sessions.remove(Long.valueOf(session.getId()));
     }
 
     protected <T> void setOption(NetworkChannel socket, String property, SocketOption<T> option, T defaultValue) throws IOException {
@@ -84,9 +85,9 @@ public abstract class Nio2Service extends CloseableUtils.AbstractInnerCloseable
         if (!GenericUtils.isEmpty(valStr)) {
             Class<T> type = option.type();
             if (type == Integer.class) {
-                val = type.cast(Integer.parseInt(valStr));
+                val = type.cast(Integer.valueOf(valStr));
             } else if (type == Boolean.class) {
-                val = type.cast(Boolean.parseBoolean(valStr));
+                val = type.cast(Boolean.valueOf(valStr));
             } else {
                 throw new IllegalStateException("Unsupported socket option type " + type);
             }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/06e9af74/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
index a0ceaf0..d98f46a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
@@ -72,22 +72,27 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
         log.debug("Creating IoSession on {} from {}", localAddress, remoteAddress);
     }
 
+    @Override
     public long getId() {
         return id;
     }
 
+    @Override
     public Object getAttribute(Object key) {
         return attributes.get(key);
     }
 
+    @Override
     public Object setAttribute(Object key, Object value) {
         return attributes.put(key, value);
     }
 
+    @Override
     public SocketAddress getRemoteAddress() {
         return remoteAddress;
     }
 
+    @Override
     public SocketAddress getLocalAddress() {
         return localAddress;
     }
@@ -105,8 +110,12 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
         }
     }
 
+    @Override
     public IoWriteFuture write(Buffer buffer) {
-        log.debug("Writing {} bytes", buffer.available());
+        if (log.isDebugEnabled()) {
+            log.debug("Writing {} bytes", Integer.valueOf(buffer.available()));
+        }
+
         ByteBuffer buf = ByteBuffer.wrap(buffer.array(), buffer.rpos(), buffer.available());
         final DefaultIoWriteFuture future = new DefaultIoWriteFuture(null, buf);
         if (isClosing()) {
@@ -166,6 +175,7 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
         }
     }
 
+    @Override
     public IoService getService() {
         return service;
     }
@@ -188,9 +198,11 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
 
     public void startReading(final ByteBuffer buffer) {
         doReadCycle(buffer, new Readable() {
+                @Override
                 public int available() {
                     return buffer.remaining();
                 }
+                @Override
                 public void getRawBytes(byte[] data, int offset, int len) {
                     buffer.get(data, offset, len);
                 }
@@ -199,10 +211,11 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
 
     protected void doReadCycle(final ByteBuffer buffer, final Readable bufReader) {
         final Nio2CompletionHandler<Integer, Object> completion = new Nio2CompletionHandler<Integer, Object>() {
+            @Override
             @SuppressWarnings("synthetic-access")
             protected void onCompleted(Integer result, Object attachment) {
                 try {
-                    if (result >= 0) {
+                    if (result.intValue() >= 0) {
                         log.debug("Read {} bytes", result);
                         buffer.flip();
                         handler.messageReceived(Nio2Session.this, bufReader);
@@ -222,6 +235,7 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
                 }
             }
 
+            @Override
             @SuppressWarnings("synthetic-access")
             protected void onFailed(Throwable exc, Object attachment) {
                 exceptionCaught(exc);
@@ -240,6 +254,7 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
             if (currentWrite.compareAndSet(null, future)) {
                 try {
                     socket.write(future.buffer, null, new Nio2CompletionHandler<Integer, Object>() {
+                        @Override
                         protected void onCompleted(Integer result, Object attachment) {
                             if (future.buffer.hasRemaining()) {
                                 try {
@@ -255,6 +270,7 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
                                 finishWrite();
                             }
                         }
+                        @Override
                         protected void onFailed(Throwable exc, Object attachment) {
                             future.setException(exc);
                             exceptionCaught(exc);
@@ -280,6 +296,7 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
             super(lock);
             this.buffer = buffer;
         }
+        @Override
         public void verify() throws SshException {
             try {
                 await();
@@ -292,12 +309,14 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
             }
         }
 
+        @Override
         public boolean isWritten() {
             return getValue() instanceof Boolean;
         }
         public void setWritten() {
             setValue(Boolean.TRUE);
         }
+        @Override
         public Throwable getException() {
             Object v = getValue();
             return v instanceof Throwable ? (Throwable) v : null;
@@ -310,6 +329,7 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
         }
     }
 
+    @Override
     public String toString() {
         return getClass().getSimpleName() + "[local=" + localAddress + ", remote=" + remoteAddress + "]";
     }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/06e9af74/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSessionFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSessionFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSessionFactory.java
index 5127538..763ab81 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSessionFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSessionFactory.java
@@ -38,6 +38,7 @@ public abstract class AbstractSessionFactory extends AbstractSessionIoHandler {
     	super();
     }
 
+    @Override
     protected AbstractSession createSession(IoSession ioSession) throws Exception {
         AbstractSession session = doCreateSession(ioSession);
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/06e9af74/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java b/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java
index 243d227..c0242ae 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java
@@ -97,7 +97,7 @@ public class CloseableUtils {
             return this;
         }
 
-        public <T extends SshFuture> Builder when(SshFuture<T>... futures) {
+        public <T extends SshFuture> Builder when(@SuppressWarnings("unchecked") SshFuture<T>... futures) {
             return when(Arrays.asList(futures));
         }
 
@@ -137,6 +137,7 @@ public class CloseableUtils {
             return this;
         }
 
+        @Override
         public Closeable build() {
             if (closeables.isEmpty()) {
                 return new SimpleCloseable(lock);
@@ -167,12 +168,15 @@ public class CloseableUtils {
             closing = new AtomicBoolean();
         }
 
+        @Override
         public boolean isClosed() {
             return future.isClosed();
         }
+        @Override
         public boolean isClosing() {
             return closing.get();
         }
+        @Override
         public CloseFuture close(boolean immediately) {
             if (closing.compareAndSet(false, true)) {
                 doClose(immediately);
@@ -194,9 +198,11 @@ public class CloseableUtils {
             this.closeables = closeables;
         }
 
+        @Override
         protected void doClose(final boolean immediately) {
             final AtomicInteger count = new AtomicInteger(1);
             SshFutureListener<CloseFuture> listener = new SshFutureListener<CloseFuture>() {
+                @Override
                 public void operationComplete(CloseFuture f) {
                     if (count.decrementAndGet() == 0) {
                         future.setClosed();
@@ -222,9 +228,11 @@ public class CloseableUtils {
             this.closeables = closeables;
         }
 
+        @Override
         protected void doClose(final boolean immediately) {
             final Iterator<? extends Closeable> iterator = closeables.iterator();
             SshFutureListener<CloseFuture> listener = new SshFutureListener<CloseFuture>() {
+                @Override
                 public void operationComplete(CloseFuture previousFuture) {
                     while (iterator.hasNext()) {
                         Closeable c = iterator.next();
@@ -252,6 +260,7 @@ public class CloseableUtils {
             this.futures = futures;
         }
 
+        @Override
         protected void doClose(boolean immediately) {
             if (immediately) {
                 for (SshFuture<?> f : futures) {
@@ -263,6 +272,7 @@ public class CloseableUtils {
             } else {
                 final AtomicInteger count = new AtomicInteger(1);
                 SshFutureListener<T> listener = new SshFutureListener<T>() {
+                    @Override
                     public void operationComplete(T f) {
                         if (count.decrementAndGet() == 0) {
                             future.setClosed();
@@ -294,6 +304,7 @@ public class CloseableUtils {
         /** A future that will be set 'closed' when the object is actually closed */
         protected final CloseFuture closeFuture = new DefaultCloseFuture(lock);
 
+        @Override
         public CloseFuture close(boolean immediately) {
             if (immediately) {
                 if (state.compareAndSet(State.Opened, State.Immediate)
@@ -312,6 +323,7 @@ public class CloseableUtils {
                     SshFuture<CloseFuture> grace = doCloseGracefully();
                     if (grace != null) {
                         grace.addListener(new SshFutureListener<CloseFuture>() {
+                            @Override
                             public void operationComplete(CloseFuture future) {
                                 if (state.compareAndSet(State.Graceful, State.Immediate)) {
                                     doCloseImmediately();
@@ -332,10 +344,12 @@ public class CloseableUtils {
             return closeFuture;
         }
 
+        @Override
         public boolean isClosed() {
             return state.get() == State.Closed;
         }
 
+        @Override
         public boolean isClosing() {
             return state.get() != State.Opened;
         }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/06e9af74/sshd-core/src/test/java/org/apache/sshd/ServerTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/ServerTest.java b/sshd-core/src/test/java/org/apache/sshd/ServerTest.java
index 9e82ca6..9b52296 100644
--- a/sshd-core/src/test/java/org/apache/sshd/ServerTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/ServerTest.java
@@ -29,13 +29,15 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
+import java.net.SocketAddress;
 import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.log4j.Logger;
 import org.apache.sshd.client.SessionFactory;
 import org.apache.sshd.client.channel.ChannelExec;
 import org.apache.sshd.client.channel.ChannelShell;
@@ -43,6 +45,8 @@ import org.apache.sshd.client.future.AuthFuture;
 import org.apache.sshd.client.session.ClientConnectionService;
 import org.apache.sshd.client.session.ClientSessionImpl;
 import org.apache.sshd.common.Channel;
+import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.FactoryManagerUtils;
 import org.apache.sshd.common.NamedFactory;
 import org.apache.sshd.common.Session;
 import org.apache.sshd.common.SessionListener;
@@ -51,11 +55,13 @@ import org.apache.sshd.common.channel.WindowClosedException;
 import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.session.AbstractConnectionService;
 import org.apache.sshd.common.session.AbstractSession;
+import org.apache.sshd.deprecated.ClientUserAuthServiceOld;
 import org.apache.sshd.deprecated.UserAuthPassword;
 import org.apache.sshd.server.Command;
 import org.apache.sshd.server.CommandFactory;
 import org.apache.sshd.server.Environment;
 import org.apache.sshd.server.ExitCallback;
+import org.apache.sshd.server.ServerFactoryManager;
 import org.apache.sshd.server.command.ScpCommandFactory;
 import org.apache.sshd.server.sftp.SftpSubsystemFactory;
 import org.apache.sshd.util.BaseTest;
@@ -66,8 +72,8 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-
-import org.apache.sshd.deprecated.ClientUserAuthServiceOld;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * TODO Add javadoc
@@ -107,7 +113,8 @@ public class ServerTest extends BaseTest {
      */
     @Test
     public void testFailAuthenticationWithWaitFor() throws Exception {
-        sshd.getProperties().put(SshServer.MAX_AUTH_REQUESTS, "10");
+        final int   MAX_AUTH_REQUESTS=10;
+        FactoryManagerUtils.updateProperty(sshd, ServerFactoryManager.MAX_AUTH_REQUESTS, MAX_AUTH_REQUESTS);
 
         client = SshClient.setUpDefaultClient();
         client.setServiceFactories(Arrays.asList(
@@ -115,24 +122,27 @@ public class ServerTest extends BaseTest {
                 new ClientConnectionService.Factory()
         ));
         client.start();
-        ClientSession s = client.connect("smx", "localhost", port).await().getSession();
-        int nbTrials = 0;
-        int res = 0;
-        while ((res & ClientSession.CLOSED) == 0) {
-            nbTrials ++;
-            s.getService(ClientUserAuthServiceOld.class)
-                    .auth(new UserAuthPassword((ClientSessionImpl) s, "ssh-connection", "buggy"));
-            res = s.waitFor(ClientSession.CLOSED | ClientSession.WAIT_AUTH, 5000);
-            if (res == ClientSession.TIMEOUT) {
-                throw new TimeoutException();
+        
+        try(ClientSession s = client.connect("smx", "localhost", port).await().getSession()) {
+            int nbTrials = 0;
+            int res = 0;
+            while ((res & ClientSession.CLOSED) == 0) {
+                nbTrials ++;
+                s.getService(ClientUserAuthServiceOld.class)
+                        .auth(new UserAuthPassword((ClientSessionImpl) s, "ssh-connection", "buggy"));
+                res = s.waitFor(ClientSession.CLOSED | ClientSession.WAIT_AUTH, 5000);
+                if (res == ClientSession.TIMEOUT) {
+                    throw new TimeoutException();
+                }
             }
+            assertTrue("Number trials (" + nbTrials + ") below min.=" + MAX_AUTH_REQUESTS, nbTrials > MAX_AUTH_REQUESTS);
         }
-        assertTrue(nbTrials > 10);
     }
 
     @Test
     public void testFailAuthenticationWithFuture() throws Exception {
-        sshd.getProperties().put(SshServer.MAX_AUTH_REQUESTS, "10");
+        final int   MAX_AUTH_REQUESTS=10;
+        FactoryManagerUtils.updateProperty(sshd, ServerFactoryManager.MAX_AUTH_REQUESTS, MAX_AUTH_REQUESTS);
 
         client = SshClient.setUpDefaultClient();
         client.setServiceFactories(Arrays.asList(
@@ -140,47 +150,54 @@ public class ServerTest extends BaseTest {
                 new ClientConnectionService.Factory()
         ));
         client.start();
-        ClientSession s = client.connect("smx", "localhost", port).await().getSession();
-        int nbTrials = 0;
-        AuthFuture authFuture;
-        do {
-            nbTrials++;
-            assertTrue(nbTrials < 100);
-            authFuture = s.getService(ClientUserAuthServiceOld.class)
-                    .auth(new UserAuthPassword((ClientSessionImpl) s, "ssh-connection", "buggy"));
-            assertTrue(authFuture.await(5000));
-            assertTrue(authFuture.isDone());
-            assertFalse(authFuture.isSuccess());
+        try(ClientSession s = client.connect("smx", "localhost", port).await().getSession()) {
+            int nbTrials = 0;
+            AuthFuture authFuture;
+            do {
+                nbTrials++;
+                assertTrue(nbTrials < 100);
+                authFuture = s.getService(ClientUserAuthServiceOld.class)
+                        .auth(new UserAuthPassword((ClientSessionImpl) s, "ssh-connection", "buggy"));
+                assertTrue(authFuture.await(5000));
+                assertTrue(authFuture.isDone());
+                assertFalse(authFuture.isSuccess());
+            }
+            while (authFuture.isFailure());
+            assertNotNull("Missing auth future exception", authFuture.getException());
+            assertTrue("Number trials (" + nbTrials + ") below min.=" + MAX_AUTH_REQUESTS, nbTrials > MAX_AUTH_REQUESTS);
         }
-        while (authFuture.isFailure());
-        assertNotNull(authFuture.getException());
-        assertTrue(nbTrials > 10);
     }
 
     @Test
     public void testAuthenticationTimeout() throws Exception {
-        sshd.getProperties().put(SshServer.AUTH_TIMEOUT, "5000");
+        final int   AUTH_TIMEOUT=5000;
+        FactoryManagerUtils.updateProperty(sshd, FactoryManager.AUTH_TIMEOUT, AUTH_TIMEOUT);
 
         client = SshClient.setUpDefaultClient();
         client.start();
-        ClientSession s = client.connect("test", "localhost", port).await().getSession();
-        int res = s.waitFor(ClientSession.CLOSED, 10000);
-        assertEquals("Session should be closed", ClientSession.CLOSED | ClientSession.WAIT_AUTH, res);
+        try(ClientSession s = client.connect("test", "localhost", port).await().getSession()) {
+            int res = s.waitFor(ClientSession.CLOSED, 2 * AUTH_TIMEOUT);
+            assertEquals("Session should be closed", ClientSession.CLOSED | ClientSession.WAIT_AUTH, res);
+        }
     }
 
     @Test
     public void testIdleTimeout() throws Exception {
         final CountDownLatch latch = new CountDownLatch(1);
         TestEchoShellFactory.TestEchoShell.latch = new CountDownLatch(1);
+        final int   IDLE_TIMEOUT=2500;
+        FactoryManagerUtils.updateProperty(sshd, FactoryManager.IDLE_TIMEOUT, IDLE_TIMEOUT);
 
-        sshd.getProperties().put(SshServer.IDLE_TIMEOUT, "2500");
         sshd.getSessionFactory().addListener(new SessionListener() {
+            @Override
             public void sessionCreated(Session session) {
                 System.out.println("Session created");
             }
+            @Override
             public void sessionEvent(Session session, Event event) {
                 System.out.println("Session event: " + event);
             }
+            @Override
             public void sessionClosed(Session session) {
                 System.out.println("Session closed");
                 latch.countDown();
@@ -189,17 +206,19 @@ public class ServerTest extends BaseTest {
 
         client = SshClient.setUpDefaultClient();
         client.start();
-        ClientSession s = client.connect("test", "localhost", port).await().getSession();
-        s.addPasswordIdentity("test");
-        s.auth().verify();
-        ChannelShell shell = s.createShellChannel();
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        ByteArrayOutputStream err = new ByteArrayOutputStream();
-        shell.setOut(out);
-        shell.setErr(err);
-        shell.open().await();
-        int res = s.waitFor(ClientSession.CLOSED, 5000);
-        assertEquals("Session should be closed", ClientSession.CLOSED | ClientSession.AUTHED, res);
+        try(ClientSession s = client.connect("test", "localhost", port).await().getSession()) {
+            s.addPasswordIdentity("test");
+            s.auth().verify();
+            try(ChannelShell shell = s.createShellChannel();
+                ByteArrayOutputStream out = new ByteArrayOutputStream();
+                ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+                shell.setOut(out);
+                shell.setErr(err);
+                shell.open().await();
+                int res = s.waitFor(ClientSession.CLOSED, 2 * IDLE_TIMEOUT);
+                assertEquals("Session should be closed", ClientSession.CLOSED | ClientSession.AUTHED, res);
+            }
+        }
         assertTrue(latch.await(1, TimeUnit.SECONDS));
         assertTrue(TestEchoShellFactory.TestEchoShell.latch.await(1, TimeUnit.SECONDS));
     }
@@ -216,17 +235,20 @@ public class ServerTest extends BaseTest {
         final CountDownLatch latch = new CountDownLatch(1);
 
         sshd.setCommandFactory(new StreamCommand.Factory());
-        sshd.getProperties().put(SshServer.IDLE_TIMEOUT, "5000");
-        sshd.getProperties().put(SshServer.DISCONNECT_TIMEOUT, "2000");
+        sshd.getProperties().put(FactoryManager.IDLE_TIMEOUT, "5000");
+        sshd.getProperties().put(FactoryManager.DISCONNECT_TIMEOUT, "2000");
         sshd.getSessionFactory().addListener(new SessionListener() {
+            @Override
             public void sessionCreated(Session session) {
                 System.out.println("Session created");
             }
 
+            @Override
             public void sessionEvent(Session session, Event event) {
                 System.out.println("Session event: " + event);
             }
 
+            @Override
             public void sessionClosed(Session session) {
                 System.out.println("Session closed");
                 latch.countDown();
@@ -236,29 +258,32 @@ public class ServerTest extends BaseTest {
         client = SshClient.setUpDefaultClient();
         client.start();
 
-        ClientSession s = client.connect("test", "localhost", port).await().getSession();
-        s.addPasswordIdentity("test");
-        s.auth().verify();
-        ChannelExec shell = s.createExecChannel("normal");
-        // Create a pipe that will block reading when the buffer is full
-        PipedInputStream pis = new PipedInputStream();
-        PipedOutputStream pos = new PipedOutputStream(pis);
-        shell.setOut(pos);
-        shell.open().await();
-
-        AbstractSession serverSession = sshd.getActiveSessions().iterator().next();
-        Channel channel = serverSession.getService(AbstractConnectionService.class).getChannels().iterator().next();
-        while (channel.getRemoteWindow().getSize() > 0) {
-            Thread.sleep(1);
+        try(ClientSession s = client.connect("test", "localhost", port).await().getSession()) {
+            s.addPasswordIdentity("test");
+            s.auth().verify();
+            try(ChannelExec shell = s.createExecChannel("normal");
+                // Create a pipe that will block reading when the buffer is full
+                PipedInputStream pis = new PipedInputStream();
+                PipedOutputStream pos = new PipedOutputStream(pis)) {
+                shell.setOut(pos);
+                shell.open().await();
+        
+                try(AbstractSession serverSession = sshd.getActiveSessions().iterator().next();
+                    Channel channel = serverSession.getService(AbstractConnectionService.class).getChannels().iterator().next()) {
+                    while (channel.getRemoteWindow().getSize() > 0) {
+                        Thread.sleep(1);
+                    }
+        
+                    LoggerFactory.getLogger(getClass()).info("Waiting for session idle timeouts");
+            
+                    long t0 = System.currentTimeMillis();
+                    latch.await(1, TimeUnit.MINUTES);
+                    long t1 = System.currentTimeMillis();
+                    assertTrue(t1 - t0 > 7000);
+                    assertTrue(t1 - t0 < 10000);
+                }
+            }
         }
-
-        Logger.getLogger(getClass()).info("Waiting for session idle timeouts");
-
-        long t0 = System.currentTimeMillis();
-        latch.await(1, TimeUnit.MINUTES);
-        long t1 = System.currentTimeMillis();
-        assertTrue(t1 - t0 > 7000);
-        assertTrue(t1 - t0 < 10000);
     }
 
     @Test
@@ -279,25 +304,29 @@ public class ServerTest extends BaseTest {
             }
         });
         client.start();
-        ClientSession s = client.connect("test", "localhost", port).await().getSession();
-        s.close(false);
+        try(ClientSession s = client.connect("test", "localhost", port).await().getSession()) {
+            s.close(false);
+        }
     }
 
     @Test
     public void testKexCompletedEvent() throws Exception {
     	final AtomicInteger	serverEventCount=new AtomicInteger(0);
         sshd.getSessionFactory().addListener(new SessionListener() {
-	            public void sessionCreated(Session session) {
+	            @Override
+                public void sessionCreated(Session session) {
 	            	// ignored
 	            }
 	
-	            public void sessionEvent(Session session, Event event) {
+	            @Override
+                public void sessionEvent(Session session, Event event) {
 	            	if (event == Event.KexCompleted) {
 	            		serverEventCount.incrementAndGet();
 	            	}
 	            }
 	
-	            public void sessionClosed(Session session) {
+	            @Override
+                public void sessionClosed(Session session) {
 	            	// ignored
 	            }
 	        });
@@ -306,27 +335,98 @@ public class ServerTest extends BaseTest {
         client.start();
     	final AtomicInteger	clientEventCount=new AtomicInteger(0);
         client.getSessionFactory().addListener(new SessionListener() {
-	            public void sessionCreated(Session session) {
+	            @Override
+                public void sessionCreated(Session session) {
 	            	// ignored
 	            }
 	
-	            public void sessionEvent(Session session, Event event) {
+	            @Override
+                public void sessionEvent(Session session, Event event) {
 	            	if (event == Event.KexCompleted) {
 	            		clientEventCount.incrementAndGet();
 	            	}
 	            }
 	
-	            public void sessionClosed(Session session) {
+	            @Override
+                public void sessionClosed(Session session) {
 	            	// ignored
 	            }
 	        });
 
-        ClientSession s = client.connect("test", "localhost", port).await().getSession();
-        s.addPasswordIdentity("test");
-        s.auth().verify();
-        Assert.assertEquals("Mismatched client events count", 1, clientEventCount.get());
-        Assert.assertEquals("Mismatched server events count", 1, serverEventCount.get());
-        s.close(false);
+        try(ClientSession s = client.connect("test", "localhost", port).await().getSession()) {
+            s.addPasswordIdentity("test");
+            s.auth().verify();
+            Assert.assertEquals("Mismatched client events count", 1, clientEventCount.get());
+            Assert.assertEquals("Mismatched server events count", 1, serverEventCount.get());
+            s.close(false);
+        }
+    }
+
+    @Test   // see https://issues.apache.org/jira/browse/SSHD-456
+    public void testServerStillListensIfSessionListenerThrowsException() throws InterruptedException {
+        final Map<String,SocketAddress> eventsMap = new TreeMap<String, SocketAddress>(String.CASE_INSENSITIVE_ORDER);
+        sshd.getSessionFactory().addListener(new SessionListener() {
+            private final Logger log=LoggerFactory.getLogger(getClass());
+            @Override
+            public void sessionCreated(Session session) {
+                throwException("SessionCreated", session);
+            }
+
+            @Override
+            public void sessionEvent(Session session, Event event) {
+                throwException("SessionEvent", session);
+            }
+
+            @Override
+            public void sessionClosed(Session session) {
+                throwException("SessionClosed", session);
+            }
+            
+            private void throwException(String phase, Session session) {
+                IoSession       ioSession = session.getIoSession();
+                SocketAddress   addr = ioSession.getRemoteAddress();
+                synchronized (eventsMap) {
+                    if (eventsMap.put(phase, addr) != null) {
+                        return; // already generated an event for this phase
+                    }
+                }
+                
+                RuntimeException e = new RuntimeException("Synthetic exception at phase=" + phase + ": " + addr);
+                log.info(e.getMessage());
+                throw e;
+            }
+        });
+        
+        client = SshClient.setUpDefaultClient();
+        client.start();
+        
+        int curCount=0;
+        for (int retryCount=0; retryCount < Byte.SIZE; retryCount++){
+            synchronized(eventsMap) {
+                if ((curCount=eventsMap.size()) >= 3) {
+                    return;
+                }
+            }
+            
+            try {
+                try(ClientSession s = client.connect("test", "localhost", port).await().getSession()) {
+                    s.addPasswordIdentity("test");
+                    s.auth().verify();
+                }
+                
+                synchronized(eventsMap) {
+                    assertTrue("Unexpected premature success: " + eventsMap, eventsMap.size() >= 3);
+                }
+            } catch(IOException e) {
+                // expected - ignored
+                synchronized(eventsMap) {
+                    int nextCount=eventsMap.size();
+                    assertTrue("No session event generated", nextCount > curCount);
+                }
+            }
+        }
+        
+        Assert.fail("No success to authenticate");
     }
 
     public static class TestEchoShellFactory extends EchoShellFactory {
@@ -368,7 +468,7 @@ public class ServerTest extends BaseTest {
 
         @Override
         public void setInputStream(InputStream in) {
-
+            // ignored
         }
 
         @Override
@@ -378,12 +478,12 @@ public class ServerTest extends BaseTest {
 
         @Override
         public void setErrorStream(OutputStream err) {
-
+            // ignored
         }
 
         @Override
         public void setExitCallback(ExitCallback callback) {
-
+            // ignored
         }
 
         @Override
@@ -426,11 +526,9 @@ public class ServerTest extends BaseTest {
         }
     }
 
-
-
     public static void main(String[] args) throws Exception {
         SshServer sshd = SshServer.setUpDefaultServer();
-        sshd.getProperties().put(SshServer.IDLE_TIMEOUT, "10000");
+        sshd.getProperties().put(FactoryManager.IDLE_TIMEOUT, "10000");
         sshd.setPort(8001);
         sshd.setKeyPairProvider(Utils.createTestHostKeyProvider());
         sshd.setSubsystemFactories(Arrays.<NamedFactory<Command>>asList(new SftpSubsystemFactory()));
@@ -440,5 +538,4 @@ public class ServerTest extends BaseTest {
         sshd.start();
         Thread.sleep(100000);
     }
-
 }