You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2015/08/25 06:38:15 UTC

[1/4] mina-sshd git commit: [SSHD-555] Add ChannelListener support

Repository: mina-sshd
Updated Branches:
  refs/heads/master 051aec3fa -> 40195ab46


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java b/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java
index 7d6b1dc..0a3b2a0 100644
--- a/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java
@@ -36,7 +36,6 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.sshd.client.SessionFactory;
 import org.apache.sshd.client.SshClient;
 import org.apache.sshd.client.channel.ChannelExec;
 import org.apache.sshd.client.channel.ChannelShell;
@@ -45,10 +44,12 @@ import org.apache.sshd.client.future.AuthFuture;
 import org.apache.sshd.client.session.ClientConnectionServiceFactory;
 import org.apache.sshd.client.session.ClientSession;
 import org.apache.sshd.client.session.ClientSessionImpl;
+import org.apache.sshd.client.session.SessionFactory;
 import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.FactoryManagerUtils;
 import org.apache.sshd.common.NamedFactory;
 import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.channel.TestChannelListener;
 import org.apache.sshd.common.channel.WindowClosedException;
 import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.kex.KexProposalOption;
@@ -95,9 +96,10 @@ public class ServerTest extends BaseTestSupport {
         sshd.setKeyPairProvider(Utils.createTestHostKeyProvider());
         sshd.setShellFactory(new TestEchoShellFactory());
         sshd.setPasswordAuthenticator(BogusPasswordAuthenticator.INSTANCE);
-        sshd.setSessionFactory(new org.apache.sshd.server.session.SessionFactory());
         sshd.start();
         port = sshd.getPort();
+
+        client = SshClient.setUpDefaultClient();
     }
 
     @After
@@ -120,7 +122,6 @@ public class ServerTest extends BaseTestSupport {
         final int MAX_AUTH_REQUESTS = 10;
         FactoryManagerUtils.updateProperty(sshd, ServerFactoryManager.MAX_AUTH_REQUESTS, MAX_AUTH_REQUESTS);
 
-        client = SshClient.setUpDefaultClient();
         client.setServiceFactories(Arrays.asList(
                 new ClientUserAuthServiceOld.Factory(),
                 ClientConnectionServiceFactory.INSTANCE
@@ -133,8 +134,7 @@ public class ServerTest extends BaseTestSupport {
             while ((res & ClientSession.CLOSED) == 0) {
                 nbTrials++;
                 s.getService(ClientUserAuthServiceOld.class)
-                        .auth(new org.apache.sshd.deprecated.UserAuthPassword(s, "ssh-connection", "buggy"))
-                ;
+                        .auth(new org.apache.sshd.deprecated.UserAuthPassword(s, "ssh-connection", "buggy"));
                 res = s.waitFor(ClientSession.CLOSED | ClientSession.WAIT_AUTH, TimeUnit.SECONDS.toMillis(5L));
                 if (res == ClientSession.TIMEOUT) {
                     throw new TimeoutException("Client session timeout signalled");
@@ -151,7 +151,6 @@ public class ServerTest extends BaseTestSupport {
         final int MAX_AUTH_REQUESTS = 10;
         FactoryManagerUtils.updateProperty(sshd, ServerFactoryManager.MAX_AUTH_REQUESTS, MAX_AUTH_REQUESTS);
 
-        client = SshClient.setUpDefaultClient();
         client.setServiceFactories(Arrays.asList(
                 new ClientUserAuthServiceOld.Factory(),
                 ClientConnectionServiceFactory.INSTANCE
@@ -164,13 +163,12 @@ public class ServerTest extends BaseTestSupport {
                 nbTrials++;
                 assertTrue(nbTrials < 100);
                 authFuture = s.getService(ClientUserAuthServiceOld.class)
-                        .auth(new org.apache.sshd.deprecated.UserAuthPassword(s, "ssh-connection", "buggy"))
-                ;
+                        .auth(new org.apache.sshd.deprecated.UserAuthPassword(s, "ssh-connection", "buggy"));
                 assertTrue("Authentication wait failed", authFuture.await(5000));
                 assertTrue("Authentication not done", authFuture.isDone());
                 assertFalse("Authentication unexpectedly successful", authFuture.isSuccess());
-            }
-            while (authFuture.isFailure());
+            } while (authFuture.isFailure());
+
             assertNotNull("Missing auth future exception", authFuture.getException());
             assertTrue("Number trials (" + nbTrials + ") below min.=" + MAX_AUTH_REQUESTS, nbTrials > MAX_AUTH_REQUESTS);
         } finally {
@@ -180,12 +178,11 @@ public class ServerTest extends BaseTestSupport {
 
     @Test
     public void testAuthenticationTimeout() throws Exception {
-        final int AUTH_TIMEOUT = 5000;
+        final long AUTH_TIMEOUT = TimeUnit.SECONDS.toMillis(5L);
         FactoryManagerUtils.updateProperty(sshd, FactoryManager.AUTH_TIMEOUT, AUTH_TIMEOUT);
 
-        client = SshClient.setUpDefaultClient();
         client.start();
-        try (ClientSession s = client.connect("test", "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
+        try (ClientSession s = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
             int res = s.waitFor(ClientSession.CLOSED, 2 * AUTH_TIMEOUT);
             assertEquals("Session should be closed", ClientSession.CLOSED | ClientSession.WAIT_AUTH, res);
         } finally {
@@ -197,10 +194,10 @@ public class ServerTest extends BaseTestSupport {
     public void testIdleTimeout() throws Exception {
         final CountDownLatch latch = new CountDownLatch(1);
         TestEchoShellFactory.TestEchoShell.latch = new CountDownLatch(1);
-        final int IDLE_TIMEOUT = 2500;
+        final long IDLE_TIMEOUT = 2500;
         FactoryManagerUtils.updateProperty(sshd, FactoryManager.IDLE_TIMEOUT, IDLE_TIMEOUT);
 
-        sshd.getSessionFactory().addListener(new SessionListener() {
+        sshd.addSessionListener(new SessionListener() {
             @Override
             public void sessionCreated(Session session) {
                 System.out.println("Session created");
@@ -218,27 +215,29 @@ public class ServerTest extends BaseTestSupport {
             }
         });
 
-        client = SshClient.setUpDefaultClient();
+        TestChannelListener channelListener = new TestChannelListener();
+        sshd.addChannelListener(channelListener);
+
         client.start();
-        try (ClientSession s = client.connect("test", "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
-            s.addPasswordIdentity("test");
-            s.auth().verify(5L, TimeUnit.SECONDS);
-
-            try (ChannelShell shell = s.createShellChannel();
-                 ByteArrayOutputStream out = new ByteArrayOutputStream();
-                 ByteArrayOutputStream err = new ByteArrayOutputStream()) {
-                shell.setOut(out);
-                shell.setErr(err);
-                shell.open().verify(9L, TimeUnit.SECONDS);
-                int res = s.waitFor(ClientSession.CLOSED, 2 * IDLE_TIMEOUT);
-                assertEquals("Session should be closed", ClientSession.CLOSED | ClientSession.AUTHED, res);
-            }
+        try (ClientSession s = createTestClientSession();
+             ChannelShell shell = s.createShellChannel();
+             ByteArrayOutputStream out = new ByteArrayOutputStream();
+             ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+            shell.setOut(out);
+            shell.setErr(err);
+            shell.open().verify(9L, TimeUnit.SECONDS);
+
+            assertTrue("No activated server side channels", GenericUtils.size(channelListener.getActiveChannels()) > 0);
+            assertTrue("No open server side channels", GenericUtils.size(channelListener.getOpenChannels()) > 0);
+
+            int res = s.waitFor(ClientSession.CLOSED, 2 * IDLE_TIMEOUT);
+            assertEquals("Session should be closed", ClientSession.CLOSED | ClientSession.AUTHED, res);
         } finally {
             client.stop();
         }
 
-        assertTrue(latch.await(1, TimeUnit.SECONDS));
-        assertTrue(TestEchoShellFactory.TestEchoShell.latch.await(1, TimeUnit.SECONDS));
+        assertTrue("Session latch not signalled in time", latch.await(1, TimeUnit.SECONDS));
+        assertTrue("Shell latch not signalled in time", TestEchoShellFactory.TestEchoShell.latch.await(1, TimeUnit.SECONDS));
     }
 
     /**
@@ -254,9 +253,13 @@ public class ServerTest extends BaseTestSupport {
 
         sshd.setCommandFactory(new StreamCommand.Factory());
 
-        FactoryManagerUtils.updateProperty(sshd, FactoryManager.IDLE_TIMEOUT, 5000);
-        FactoryManagerUtils.updateProperty(sshd, FactoryManager.DISCONNECT_TIMEOUT, 2000);
-        sshd.getSessionFactory().addListener(new SessionListener() {
+        final long IDLE_TIMEOUT_VALUE = TimeUnit.SECONDS.toMillis(5L);
+        FactoryManagerUtils.updateProperty(sshd, FactoryManager.IDLE_TIMEOUT, IDLE_TIMEOUT_VALUE);
+
+        final long DISCONNECT_TIMEOUT_VALUE = TimeUnit.SECONDS.toMillis(2L);
+        FactoryManagerUtils.updateProperty(sshd, FactoryManager.DISCONNECT_TIMEOUT, DISCONNECT_TIMEOUT_VALUE);
+
+        sshd.addSessionListener(new SessionListener() {
             @Override
             public void sessionCreated(Session session) {
                 System.out.println("Session created");
@@ -274,36 +277,43 @@ public class ServerTest extends BaseTestSupport {
             }
         });
 
-        client = SshClient.setUpDefaultClient();
+        TestChannelListener channelListener = new TestChannelListener();
+        sshd.addChannelListener(channelListener);
+
         client.start();
 
-        try (ClientSession s = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
-            s.addPasswordIdentity(getCurrentTestName());
-            s.auth().verify(5L, TimeUnit.SECONDS);
+        try (ClientSession s = createTestClientSession();
+             ChannelExec shell = s.createExecChannel("normal");
+             // Create a pipe that will block reading when the buffer is full
+             PipedInputStream pis = new PipedInputStream();
+             PipedOutputStream pos = new PipedOutputStream(pis)) {
 
-            try (ChannelExec shell = s.createExecChannel("normal");
-                 // Create a pipe that will block reading when the buffer is full
-                 PipedInputStream pis = new PipedInputStream();
-                 PipedOutputStream pos = new PipedOutputStream(pis)) {
+            shell.setOut(pos);
+            shell.open().verify(5L, TimeUnit.SECONDS);
 
-                shell.setOut(pos);
-                shell.open().verify(5L, TimeUnit.SECONDS);
+            assertTrue("No activated server side channels", GenericUtils.size(channelListener.getActiveChannels()) > 0);
+            assertTrue("No open server side channels", GenericUtils.size(channelListener.getOpenChannels()) > 0);
 
-                try (AbstractSession serverSession = sshd.getActiveSessions().iterator().next();
-                     Channel channel = serverSession.getService(AbstractConnectionService.class).getChannels().iterator().next()) {
+            try (AbstractSession serverSession = sshd.getActiveSessions().iterator().next();
+                 Channel channel = serverSession.getService(AbstractConnectionService.class).getChannels().iterator().next()) {
 
-                    while (channel.getRemoteWindow().getSize() > 0) {
-                        Thread.sleep(1);
-                    }
+                final long MAX_TIMEOUT_VALUE = IDLE_TIMEOUT_VALUE + DISCONNECT_TIMEOUT_VALUE + TimeUnit.SECONDS.toMillis(3L);
+                for (long totalNanoTime = 0L; channel.getRemoteWindow().getSize() > 0; ) {
+                    long nanoStart = System.nanoTime();
+                    Thread.sleep(1L);
+                    long nanoEnd = System.nanoTime(), nanoDuration = nanoEnd - nanoStart;
 
-                    LoggerFactory.getLogger(getClass()).info("Waiting for session idle timeouts");
-
-                    long t0 = System.currentTimeMillis();
-                    latch.await(1, TimeUnit.MINUTES);
-                    long t1 = System.currentTimeMillis(), diff = t1 - t0;
-                    assertTrue("Wait time too low: " + diff, diff > 7000);
-                    assertTrue("Wait time too high: " + diff, diff < 10000);
+                    totalNanoTime += nanoDuration;
+                    assertTrue("Waiting for too long on remote window size to reach zero", totalNanoTime < TimeUnit.MILLISECONDS.toNanos(MAX_TIMEOUT_VALUE));
                 }
+
+                LoggerFactory.getLogger(getClass()).info("Waiting for session idle timeouts");
+
+                long t0 = System.currentTimeMillis();
+                latch.await(1, TimeUnit.MINUTES);
+                long t1 = System.currentTimeMillis(), diff = t1 - t0;
+                assertTrue("Wait time too low: " + diff, diff > IDLE_TIMEOUT_VALUE);
+                assertTrue("Wait time too high: " + diff, diff < MAX_TIMEOUT_VALUE);
             }
         } finally {
             client.stop();
@@ -311,11 +321,11 @@ public class ServerTest extends BaseTestSupport {
     }
 
     @Test
-    public void testLanguage() throws Exception {
-        client = SshClient.setUpDefaultClient();
-        client.setSessionFactory(new SessionFactory() {
+    public void testLanguageNegotiation() throws Exception {
+        client.setSessionFactory(new SessionFactory(client) {
             @Override
-            protected AbstractSession createSession(IoSession ioSession) throws Exception {
+            @SuppressWarnings("synthetic-access")
+            protected ClientSessionImpl createSession(IoSession ioSession) throws Exception {
                 return new ClientSessionImpl(client, ioSession) {
                     @Override
                     protected Map<KexProposalOption, String> createProposal(String hostKeyTypes) {
@@ -327,9 +337,10 @@ public class ServerTest extends BaseTestSupport {
                 };
             }
         });
+
         client.start();
-        try (ClientSession s = client.connect("test", "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
-            s.close(false);
+        try (ClientSession s = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
+            // do nothing
         } finally {
             client.stop();
         }
@@ -338,7 +349,7 @@ public class ServerTest extends BaseTestSupport {
     @Test
     public void testKexCompletedEvent() throws Exception {
         final AtomicInteger serverEventCount = new AtomicInteger(0);
-        sshd.getSessionFactory().addListener(new SessionListener() {
+        sshd.addSessionListener(new SessionListener() {
             @Override
             public void sessionCreated(Session session) {
                 // ignored
@@ -357,10 +368,9 @@ public class ServerTest extends BaseTestSupport {
             }
         });
 
-        client = SshClient.setUpDefaultClient();
         client.start();
         final AtomicInteger clientEventCount = new AtomicInteger(0);
-        client.getSessionFactory().addListener(new SessionListener() {
+        client.addSessionListener(new SessionListener() {
             @Override
             public void sessionCreated(Session session) {
                 // ignored
@@ -379,9 +389,7 @@ public class ServerTest extends BaseTestSupport {
             }
         });
 
-        try (ClientSession s = client.connect("test", "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
-            s.addPasswordIdentity("test");
-            s.auth().verify(5L, TimeUnit.SECONDS);
+        try (ClientSession s = createTestClientSession()) {
             assertEquals("Mismatched client events count", 1, clientEventCount.get());
             assertEquals("Mismatched server events count", 1, serverEventCount.get());
             s.close(false);
@@ -391,11 +399,10 @@ public class ServerTest extends BaseTestSupport {
     }
 
     @Test   // see https://issues.apache.org/jira/browse/SSHD-456
-    public void testServerStillListensIfSessionListenerThrowsException() throws InterruptedException {
+    public void testServerStillListensIfSessionListenerThrowsException() throws Exception {
         final Map<String, SocketAddress> eventsMap = new TreeMap<String, SocketAddress>(String.CASE_INSENSITIVE_ORDER);
-        sshd.getSessionFactory().addListener(new SessionListener() {
-            private final Logger log = LoggerFactory.getLogger(getClass());
-
+        final Logger log = LoggerFactory.getLogger(getClass());
+        sshd.addSessionListener(new SessionListener() {
             @Override
             public void sessionCreated(Session session) {
                 throwException("SessionCreated", session);
@@ -426,7 +433,6 @@ public class ServerTest extends BaseTestSupport {
             }
         });
 
-        client = SshClient.setUpDefaultClient();
         client.start();
 
         int curCount = 0;
@@ -438,19 +444,18 @@ public class ServerTest extends BaseTestSupport {
             }
 
             try {
-                try (ClientSession s = client.connect("test", "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
-                    s.addPasswordIdentity("test");
-                    s.auth().verify(5L, TimeUnit.SECONDS);
+                try (ClientSession s = createTestClientSession()) {
+                    log.info("Retry #" + retryCount + " successful");
                 }
 
                 synchronized (eventsMap) {
-                    assertTrue("Unexpected premature success: " + eventsMap, eventsMap.size() >= 3);
+                    assertTrue("Unexpected premature success at retry # " + retryCount + ": " + eventsMap, eventsMap.size() >= 3);
                 }
             } catch (IOException e) {
                 // expected - ignored
                 synchronized (eventsMap) {
                     int nextCount = eventsMap.size();
-                    assertTrue("No session event generated", nextCount > curCount);
+                    assertTrue("No session event generated at retry #" + retryCount, nextCount > curCount);
                 }
             }
         }
@@ -506,6 +511,8 @@ public class ServerTest extends BaseTestSupport {
             }
         });
 
+        TestChannelListener channelListener = new TestChannelListener();
+        sshd.addChannelListener(channelListener);
 
         @SuppressWarnings("synthetic-access")
         Map<String, String> expected = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER) {
@@ -518,37 +525,54 @@ public class ServerTest extends BaseTestSupport {
             }
         };
 
-        client = SshClient.setUpDefaultClient();
         client.start();
-        try (ClientSession s = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
-            s.addPasswordIdentity(getCurrentTestName());
-            s.auth().verify(5L, TimeUnit.SECONDS);
+        try (ClientSession s = createTestClientSession();
+             ChannelExec shell = s.createExecChannel(getCurrentTestName())) {
+            for (Map.Entry<String, String> ee : expected.entrySet()) {
+                shell.setEnv(ee.getKey(), ee.getValue());
+            }
 
-            try (ChannelExec shell = s.createExecChannel(getCurrentTestName())) {
-                for (Map.Entry<String, String> ee : expected.entrySet()) {
-                    shell.setEnv(ee.getKey(), ee.getValue());
-                }
-                shell.open().verify(5L, TimeUnit.SECONDS);
-                shell.waitFor(ClientChannel.CLOSED, TimeUnit.SECONDS.toMillis(17L));
+            shell.open().verify(5L, TimeUnit.SECONDS);
 
-                Integer status = shell.getExitStatus();
-                assertNotNull("No exit status", status);
-                assertEquals("Bad exit status", 0, status.intValue());
-            }
+            assertTrue("No activated server side channels", GenericUtils.size(channelListener.getActiveChannels()) > 0);
+            assertTrue("No open server side channels", GenericUtils.size(channelListener.getOpenChannels()) > 0);
 
-            Environment cmdEnv = envHolder.get();
-            assertNotNull("No environment set", cmdEnv);
+            shell.waitFor(ClientChannel.CLOSED, TimeUnit.SECONDS.toMillis(17L));
 
-            Map<String, String> vars = cmdEnv.getEnv();
-            assertTrue("Mismatched vars count", GenericUtils.size(vars) >= GenericUtils.size(expected));
-            for (Map.Entry<String, String> ee : expected.entrySet()) {
-                String key = ee.getKey(), expValue = ee.getValue(), actValue = vars.get(key);
-                assertEquals("Mismatched value for " + key, expValue, actValue);
-            }
+            Integer status = shell.getExitStatus();
+            assertNotNull("No exit status", status);
+            assertEquals("Bad exit status", 0, status.intValue());
         } finally {
             client.stop();
         }
 
+        assertTrue("Still activated server side channels", GenericUtils.isEmpty(channelListener.getActiveChannels()));
+
+        Environment cmdEnv = envHolder.get();
+        assertNotNull("No environment set", cmdEnv);
+
+        Map<String, String> vars = cmdEnv.getEnv();
+        assertTrue("Mismatched vars count", GenericUtils.size(vars) >= GenericUtils.size(expected));
+        for (Map.Entry<String, String> ee : expected.entrySet()) {
+            String key = ee.getKey(), expValue = ee.getValue(), actValue = vars.get(key);
+            assertEquals("Mismatched value for " + key, expValue, actValue);
+        }
+    }
+
+    private ClientSession createTestClientSession() throws Exception {
+        ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession();
+        try {
+            session.addPasswordIdentity(getCurrentTestName());
+            session.auth().verify(5L, TimeUnit.SECONDS);
+
+            ClientSession returnValue = session;
+            session = null; // avoid 'finally' close
+            return returnValue;
+        } finally {
+            if (session != null) {
+                session.close();
+            }
+        }
     }
 
     public static class TestEchoShellFactory extends EchoShellFactory {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-git/src/test/java/org/apache/sshd/git/util/Utils.java
----------------------------------------------------------------------
diff --git a/sshd-git/src/test/java/org/apache/sshd/git/util/Utils.java b/sshd-git/src/test/java/org/apache/sshd/git/util/Utils.java
index d220326..37948b8 100644
--- a/sshd-git/src/test/java/org/apache/sshd/git/util/Utils.java
+++ b/sshd-git/src/test/java/org/apache/sshd/git/util/Utils.java
@@ -52,7 +52,7 @@ public class Utils {
         URL url = Utils.class.getClassLoader().getResource(resource);
         try {
             return new File(url.toURI());
-        } catch(URISyntaxException e) {
+        } catch (URISyntaxException e) {
             return new File(url.getPath());
         }
     }


[3/4] mina-sshd git commit: [SSHD-555] Add ChannelListener support

Posted by lg...@apache.org.
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/session/SessionTimeoutListener.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/SessionTimeoutListener.java b/sshd-core/src/main/java/org/apache/sshd/common/session/SessionTimeoutListener.java
index bdde515..2d74ecd 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/SessionTimeoutListener.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/SessionTimeoutListener.java
@@ -32,10 +32,17 @@ import org.apache.sshd.common.util.logging.AbstractLoggingBean;
 public class SessionTimeoutListener extends AbstractLoggingBean implements SessionListener, Runnable {
     private final Set<AbstractSession> sessions = new CopyOnWriteArraySet<AbstractSession>();
 
+    public SessionTimeoutListener() {
+        super();
+    }
+
     @Override
     public void sessionCreated(Session session) {
-        if (session instanceof AbstractSession && (session.getAuthTimeout() > 0 || session.getIdleTimeout() > 0)) {
+        if ((session instanceof AbstractSession) && ((session.getAuthTimeout() > 0L) || (session.getIdleTimeout() > 0L))) {
             sessions.add((AbstractSession) session);
+            log.debug("sessionCreated({}) tracking", session);
+        } else {
+            log.trace("sessionCreated({}) not tracked", session);
         }
     }
 
@@ -46,7 +53,11 @@ public class SessionTimeoutListener extends AbstractLoggingBean implements Sessi
 
     @Override
     public void sessionClosed(Session s) {
-        sessions.remove(s);
+        if (sessions.remove(s)) {
+            log.debug("sessionClosed({}) un-tracked", s);
+        } else {
+            log.trace("sessionClosed({}) not tracked", s);
+        }
     }
 
     @Override
@@ -55,7 +66,7 @@ public class SessionTimeoutListener extends AbstractLoggingBean implements Sessi
             try {
                 session.checkForTimeouts();
             } catch (Exception e) {
-                log.warn("An error occurred while checking session=" + session + " timeouts", e);
+                log.warn(e.getClass().getSimpleName() + " while checking session=" + session + " timeouts: " + e.getMessage(), e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java b/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java
index 6f2304f..f1003cf 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java
@@ -103,20 +103,27 @@ public final class EventListenerUtils {
      * @see #proxyWrapper(Class, ClassLoader, Iterable)
      */
     public static <T extends EventListener> T proxyWrapper(Class<T> listenerType, ClassLoader loader, final Iterable<? extends T> listeners) {
-        if ((listenerType == null) || (!listenerType.isInterface())) {
-            throw new IllegalArgumentException("Target proxy is not an interface");
-        }
-
-        if (listeners == null) {
-            throw new IllegalArgumentException("No listeners container provided");
-        }
+        ValidateUtils.checkNotNull(listenerType, "No listener type specified");
+        ValidateUtils.checkTrue(listenerType.isInterface(), "Target proxy is not an interface: %s", listenerType.getSimpleName());
+        ValidateUtils.checkNotNull(listeners, "No listeners container provided");
 
         Object wrapper = Proxy.newProxyInstance(loader, new Class<?>[]{listenerType}, new InvocationHandler() {
             @Override
             public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+                Throwable err = null;
                 for (T l : listeners) {
-                    method.invoke(l, args);
+                    try {
+                        method.invoke(l, args);
+                    } catch (Throwable t) {
+                        Throwable e = GenericUtils.peelException(t);
+                        err = GenericUtils.accumulateException(err, e);
+                    }
+                }
+
+                if (err != null) {
+                    throw err;
                 }
+
                 return null;    // we assume always void return value...
             }
         });

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/util/GenericUtils.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/GenericUtils.java b/sshd-core/src/main/java/org/apache/sshd/common/util/GenericUtils.java
index 2563fbb..87631d7 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/GenericUtils.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/GenericUtils.java
@@ -19,6 +19,8 @@
 
 package org.apache.sshd.common.util;
 
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.UndeclaredThrowableException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -32,6 +34,9 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
+import javax.management.MBeanException;
+import javax.management.ReflectionException;
+
 /**
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
@@ -333,6 +338,47 @@ public final class GenericUtils {
     }
 
     /**
+     * Attempts to get to the &quot;effective&quot; exception being thrown,
+     * by taking care of some known exceptions that wrap the original thrown
+     * one.
+     * @param t The original {@link Throwable} - ignored if {@code null}
+     * @return The effective exception - same as input if not a wrapper
+     */
+    public static Throwable peelException(Throwable t) {
+        if (t == null) {
+            return t;
+        } else if (t instanceof UndeclaredThrowableException) {
+            Throwable wrapped = ((UndeclaredThrowableException) t).getUndeclaredThrowable();
+            // according to the Javadoc it may be null, in which case 'getCause'
+            // might contain the information we need
+            if (wrapped != null) {
+                return peelException(wrapped);
+            }
+
+            wrapped = t.getCause();
+            if (wrapped != t) {     // make sure it is a real cause
+                return peelException(wrapped);
+            }
+        } else if (t instanceof InvocationTargetException) {
+            Throwable target = ((InvocationTargetException) t).getTargetException();
+            if (target != null) {
+                return peelException(target);
+            }
+        } else if (t instanceof ReflectionException) {
+            Throwable target = ((ReflectionException) t).getTargetException();
+            if (target != null) {
+                return peelException(target);
+            }
+        } else if (t instanceof MBeanException) {
+            Throwable target = ((MBeanException) t).getTargetException();
+            if (target != null) {
+                return peelException(target);
+            }
+        }
+
+        return t;   // no special handling required or available
+    }
+    /**
      * @param t The original {@link Throwable} - ignored if {@code null}
      * @return If {@link Throwable#getCause()} is non-{@code null} then
      * the cause, otherwise the original exception - {@code null} if

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
index 469d8a6..a18f741 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
@@ -158,5 +158,4 @@ public interface ServerFactoryManager extends FactoryManager {
      * or {@code null} if subsystems are not supported on this server
      */
     List<NamedFactory<Command>> getSubsystemFactories();
-
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/server/SshServer.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/SshServer.java b/sshd-core/src/main/java/org/apache/sshd/server/SshServer.java
index a2bfb2c..6baf31b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/SshServer.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/SshServer.java
@@ -276,7 +276,6 @@ public class SshServer extends AbstractFactoryManager implements ServerFactoryMa
         if (sessionFactory == null) {
             sessionFactory = createSessionFactory();
         }
-        sessionFactory.setServer(this);
         acceptor = createAcceptor();
 
         setupSessionTimeout(sessionFactory);
@@ -377,7 +376,7 @@ public class SshServer extends AbstractFactoryManager implements ServerFactoryMa
     }
 
     protected SessionFactory createSessionFactory() {
-        return new SessionFactory();
+        return new SessionFactory(this);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/server/auth/CachingPublicKeyAuthenticator.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/auth/CachingPublicKeyAuthenticator.java b/sshd-core/src/main/java/org/apache/sshd/server/auth/CachingPublicKeyAuthenticator.java
index 6af7c6f..998813c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/auth/CachingPublicKeyAuthenticator.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/auth/CachingPublicKeyAuthenticator.java
@@ -47,7 +47,7 @@ public class CachingPublicKeyAuthenticator implements PublickeyAuthenticator, Se
         if (map == null) {
             map = new ConcurrentHashMap<>();
             cache.put(session, map);
-            session.addListener(this);
+            session.addSessionListener(this);
         }
 
         Boolean result = map.get(key);

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
index 3b7be62..8161e67 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
@@ -24,6 +24,8 @@ import org.apache.sshd.client.future.DefaultOpenFuture;
 import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.channel.AbstractChannel;
+import org.apache.sshd.common.channel.ChannelListener;
+import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 
 /**
@@ -62,8 +64,22 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
     }
 
     protected OpenFuture doInit(Buffer buffer) {
+        ChannelListener listener = getChannelListenerProxy();
         OpenFuture f = new DefaultOpenFuture(this);
-        f.setOpened();
+        try {
+            listener.channelOpenSuccess(this);
+            f.setOpened();
+        } catch (RuntimeException t) {
+            Throwable e = GenericUtils.peelException(t);
+            try {
+                listener.channelOpenFailure(this, e);
+            } catch (Throwable ignored) {
+                log.warn("doInit({}) failed ({}) to inform listener of open failure={}: {}",
+                         this, ignored.getClass().getSimpleName(), e.getClass().getSimpleName(), ignored.getMessage());
+            }
+            f.setException(e);
+        }
+
         return f;
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
index 520dcdd..a4a020d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
@@ -447,7 +447,7 @@ public class ChannelSession extends AbstractServerChannel {
             return false;
         }
 
-        ServerFactoryManager manager = ((ServerSession) session).getFactoryManager();
+        ServerFactoryManager manager = ((ServerSession) getSession()).getFactoryManager();
         Factory<Command> factory = manager.getShellFactory();
         if (factory == null) {
             log.debug("handleShell - no shell factory");
@@ -472,7 +472,7 @@ public class ChannelSession extends AbstractServerChannel {
         }
 
         String commandLine = buffer.getString();
-        ServerFactoryManager manager = ((ServerSession) session).getFactoryManager();
+        ServerFactoryManager manager = ((ServerSession) getSession()).getFactoryManager();
         CommandFactory factory = manager.getCommandFactory();
         if (factory == null) {
             log.warn("No command factory for command: {}", commandLine);
@@ -498,7 +498,7 @@ public class ChannelSession extends AbstractServerChannel {
 
     protected boolean handleSubsystem(Buffer buffer) throws IOException {
         String subsystem = buffer.getString();
-        ServerFactoryManager manager = ((ServerSession) session).getFactoryManager();
+        ServerFactoryManager manager = ((ServerSession) getSession()).getFactoryManager();
         List<NamedFactory<Command>> factories = manager.getSubsystemFactories();
         if (GenericUtils.isEmpty(factories)) {
             log.warn("No factories for subsystem: {}", subsystem);
@@ -536,14 +536,15 @@ public class ChannelSession extends AbstractServerChannel {
         addEnvVariable(Environment.ENV_USER, session.getUsername());
         // If the shell wants to be aware of the session, let's do that
         if (command instanceof SessionAware) {
-            ((SessionAware) command).setSession((ServerSession) session);
+            ((SessionAware) command).setSession((ServerSession) getSession());
         }
         if (command instanceof ChannelSessionAware) {
             ((ChannelSessionAware) command).setChannelSession(this);
         }
         // If the shell wants to be aware of the file system, let's do that too
         if (command instanceof FileSystemAware) {
-            FileSystemFactory factory = ((ServerSession) session).getFactoryManager().getFileSystemFactory();
+            ServerFactoryManager manager = ((ServerSession) getSession()).getFactoryManager();
+            FileSystemFactory factory = manager.getFileSystemFactory();
             ((FileSystemAware) command).setFileSystem(factory.createFileSystem(session));
         }
         // If the shell wants to use non-blocking io

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
index 4fa0670..29ba7f5 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
@@ -31,6 +31,7 @@ import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshdSocketAddress;
 import org.apache.sshd.common.channel.Channel;
 import org.apache.sshd.common.channel.ChannelFactory;
+import org.apache.sshd.common.channel.ChannelListener;
 import org.apache.sshd.common.channel.ChannelOutputStream;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
@@ -40,6 +41,7 @@ import org.apache.sshd.common.io.IoHandler;
 import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.session.Session;
+import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.Readable;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
@@ -112,7 +114,7 @@ public class TcpipServerChannel extends AbstractServerChannel {
         int originatorPort = buffer.getInt();
         if (log.isDebugEnabled()) {
             log.debug("Receiving request for direct tcpip: hostToConnect={}, portToConnect={}, originatorIpAddress={}, originatorPort={}",
-                    hostToConnect, portToConnect, originatorIpAddress, originatorPort);
+                      hostToConnect, portToConnect, originatorIpAddress, originatorPort);
         }
 
         final SshdSocketAddress address;
@@ -172,31 +174,71 @@ public class TcpipServerChannel extends AbstractServerChannel {
                 close(true);
             }
         };
+
         connector = manager.getIoServiceFactory().createConnector(handler);
         IoConnectFuture future = connector.connect(address.toInetSocketAddress());
         future.addListener(new SshFutureListener<IoConnectFuture>() {
-            @SuppressWarnings("synthetic-access")
             @Override
             public void operationComplete(IoConnectFuture future) {
-                if (future.isConnected()) {
-                    ioSession = future.getSession();
-                    f.setOpened();
-                } else if (future.getException() != null) {
-                    closeImmediately0();
-                    if (future.getException() instanceof ConnectException) {
-                        f.setException(new OpenChannelException(
-                                SshConstants.SSH_OPEN_CONNECT_FAILED,
-                                future.getException().getMessage(),
-                                future.getException()));
-                    } else {
-                        f.setException(future.getException());
-                    }
-                }
+                handleChannelConnectResult(f, future);
             }
         });
         return f;
     }
 
+    protected void handleChannelConnectResult(OpenFuture f, IoConnectFuture future) {
+        ChannelListener listener = getChannelListenerProxy();
+        try {
+            if (future.isConnected()) {
+                handleChannelOpenSuccess(f, future.getSession());
+                return;
+            }
+
+            Throwable problem = GenericUtils.peelException(future.getException());
+            if (problem != null) {
+                handleChannelOpenFailure(f, problem);
+            }
+        } catch (RuntimeException t) {
+            Throwable e = GenericUtils.peelException(t);
+            try {
+                listener.channelOpenFailure(this, e);
+            } catch (Throwable ignored) {
+                log.warn("handleChannelConnectResult({})[exception] failed ({}) to inform listener of open failure={}: {}",
+                         this, ignored.getClass().getSimpleName(), e.getClass().getSimpleName(), ignored.getMessage());
+            }
+            f.setException(e);
+        }
+    }
+
+    protected void handleChannelOpenSuccess(OpenFuture f, IoSession session) {
+        ioSession = session;
+
+        ChannelListener listener = getChannelListenerProxy();
+        listener.channelOpenSuccess(this);
+        f.setOpened();
+    }
+
+    protected void handleChannelOpenFailure(OpenFuture f, Throwable problem) {
+        ChannelListener listener = getChannelListenerProxy();
+        try {
+            listener.channelOpenFailure(this, problem);
+        } catch (Throwable ignored) {
+            log.warn("handleChannelOpenFailure({}) failed ({}) to inform listener of open failure={}: {}",
+                     this, ignored.getClass().getSimpleName(), problem.getClass().getSimpleName(), ignored.getMessage());
+        }
+
+        closeImmediately0();
+
+        if (problem instanceof ConnectException) {
+            f.setException(new OpenChannelException(
+                    SshConstants.SSH_OPEN_CONNECT_FAILED,
+                    problem.getMessage(),
+                    problem));
+        } else {
+            f.setException(problem);
+        }
+
+    }
     private void closeImmediately0() {
         // We need to close the channel immediately to remove it from the
         // server session's channel table and *not* send a packet to the

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/server/session/SessionFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/session/SessionFactory.java b/sshd-core/src/main/java/org/apache/sshd/server/session/SessionFactory.java
index b358e8b..835d91f 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/session/SessionFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/session/SessionFactory.java
@@ -19,7 +19,6 @@
 package org.apache.sshd.server.session;
 
 import org.apache.sshd.common.io.IoSession;
-import org.apache.sshd.common.session.AbstractSession;
 import org.apache.sshd.common.session.AbstractSessionFactory;
 import org.apache.sshd.server.ServerFactoryManager;
 
@@ -30,29 +29,18 @@ import org.apache.sshd.server.ServerFactoryManager;
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  * @see org.apache.sshd.server.SshServer#setSessionFactory(SessionFactory)
  */
-public class SessionFactory extends AbstractSessionFactory {
+public class SessionFactory extends AbstractSessionFactory<ServerFactoryManager, ServerSessionImpl> {
 
-    private ServerFactoryManager manager;
-
-    public SessionFactory() {
-        super();
-    }
-
-    public SessionFactory(ServerFactoryManager manager) {
-        this.manager = manager;
+    public SessionFactory(ServerFactoryManager server) {
+        super(server);
     }
 
-    public ServerFactoryManager getServer() {
-        return manager;
-    }
-
-    public void setServer(ServerFactoryManager server) {
-        this.manager = server;
+    public final ServerFactoryManager getServer() {
+        return getFactoryManager();
     }
 
     @Override
-    protected AbstractSession doCreateSession(IoSession ioSession) throws Exception {
+    protected ServerSessionImpl doCreateSession(IoSession ioSession) throws Exception {
         return new ServerSessionImpl(getServer(), ioSession);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/test/java/org/apache/sshd/AuthenticationTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/AuthenticationTest.java b/sshd-core/src/test/java/org/apache/sshd/AuthenticationTest.java
index 91879a0..25a2de8 100644
--- a/sshd-core/src/test/java/org/apache/sshd/AuthenticationTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/AuthenticationTest.java
@@ -30,7 +30,6 @@ import org.apache.sshd.client.session.ClientSession;
 import org.apache.sshd.common.FactoryManagerUtils;
 import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.keyprovider.KeyPairProvider;
-import org.apache.sshd.common.session.AbstractSession;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.deprecated.ClientUserAuthServiceOld;
 import org.apache.sshd.deprecated.UserAuthKeyboardInteractive;
@@ -70,9 +69,9 @@ public class AuthenticationTest extends BaseTestSupport {
         sshd.setPublickeyAuthenticator(AcceptAllPublickeyAuthenticator.INSTANCE);
         FactoryManagerUtils.updateProperty(sshd, ServerFactoryManager.WELCOME_BANNER, WELCOME);
         FactoryManagerUtils.updateProperty(sshd, ServerFactoryManager.AUTH_METHODS, "publickey,password publickey,keyboard-interactive");
-        sshd.setSessionFactory(new SessionFactory() {
+        sshd.setSessionFactory(new SessionFactory(sshd) {
             @Override
-            protected AbstractSession doCreateSession(IoSession ioSession) throws Exception {
+            protected ServerSessionImpl doCreateSession(IoSession ioSession) throws Exception {
                 return new TestSession(getServer(), ioSession);
             }
         });

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java b/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java
index fd2590c..18203fb 100644
--- a/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java
@@ -52,9 +52,9 @@ public class KeepAliveTest extends BaseTestSupport {
     private SshServer sshd;
     private int port;
 
-    private static final int HEARTBEAT = 2000;
-    private static final int TIMEOUT = 4000;
-    private static final int WAIT = 8000;
+    private static final long HEARTBEAT = TimeUnit.SECONDS.toMillis(2L);
+    private static final long TIMEOUT = 2L * HEARTBEAT;
+    private static final long WAIT = 2L * TIMEOUT;
 
     @Before
     public void setUp() throws Exception {
@@ -76,27 +76,7 @@ public class KeepAliveTest extends BaseTestSupport {
     }
 
     @Test
-    public void testClient() throws Exception {
-        SshClient client = SshClient.setUpDefaultClient();
-        client.start();
-
-        try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
-            session.addPasswordIdentity(getCurrentTestName());
-            session.auth().verify(5L, TimeUnit.SECONDS);
-
-            try (ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL)) {
-                int state = channel.waitFor(ClientChannel.CLOSED, WAIT);
-                assertEquals("Wrong channel state", ClientChannel.CLOSED | ClientChannel.EOF, state);
-
-                channel.close(false);
-            }
-        } finally {
-            client.stop();
-        }
-    }
-
-    @Test
-    public void testClientNew() throws Exception {
+    public void testIdleClient() throws Exception {
         SshClient client = SshClient.setUpDefaultClient();
         client.start();
 
@@ -137,27 +117,6 @@ public class KeepAliveTest extends BaseTestSupport {
     }
 
     @Test
-    public void testClientWithHeartBeatNew() throws Exception {
-        SshClient client = SshClient.setUpDefaultClient();
-        FactoryManagerUtils.updateProperty(client, ClientFactoryManager.HEARTBEAT_INTERVAL, HEARTBEAT);
-        client.start();
-
-        try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
-            session.addPasswordIdentity(getCurrentTestName());
-            session.auth().verify(5L, TimeUnit.SECONDS);
-
-            try (ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL)) {
-                int state = channel.waitFor(ClientChannel.CLOSED, WAIT);
-                assertEquals("Wrong channel state", ClientChannel.TIMEOUT, state);
-
-                channel.close(false);
-            }
-        } finally {
-            client.stop();
-        }
-    }
-
-    @Test
     public void testShellClosedOnClientTimeout() throws Exception {
         TestEchoShellFactory.TestEchoShell.latch = new CountDownLatch(1);
 
@@ -187,36 +146,6 @@ public class KeepAliveTest extends BaseTestSupport {
         }
     }
 
-    @Test
-    public void testShellClosedOnClientTimeoutNew() throws Exception {
-        TestEchoShellFactory.TestEchoShell.latch = new CountDownLatch(1);
-
-        SshClient client = SshClient.setUpDefaultClient();
-        client.start();
-
-        try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
-            session.addPasswordIdentity(getCurrentTestName());
-            session.auth().verify(5L, TimeUnit.SECONDS);
-
-            try (ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
-                 ByteArrayOutputStream out = new ByteArrayOutputStream();
-                 ByteArrayOutputStream err = new ByteArrayOutputStream()) {
-
-                channel.setOut(out);
-                channel.setErr(err);
-                channel.open().verify(9L, TimeUnit.SECONDS);
-
-                assertTrue("Latch time out", TestEchoShellFactory.TestEchoShell.latch.await(10L, TimeUnit.SECONDS));
-                int state = channel.waitFor(ClientChannel.CLOSED, WAIT);
-                assertEquals("Wrong channel state", ClientChannel.CLOSED | ClientChannel.EOF | ClientChannel.OPENED, state);
-
-                channel.close(false);
-            }
-        } finally {
-            client.stop();
-        }
-    }
-
     public static class TestEchoShellFactory extends EchoShellFactory {
         @Override
         public Command create() {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java b/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
index 07fd3cb..26f9645 100644
--- a/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
@@ -206,7 +206,7 @@ public class KeyReExchangeTest extends BaseTestSupport {
                     sb.append("\n");
 
                     final AtomicInteger exchanges = new AtomicInteger();
-                    session.addListener(new SessionListener() {
+                    session.addSessionListener(new SessionListener() {
                         @Override
                         public void sessionCreated(Session session) {
                             // ignored


[2/4] mina-sshd git commit: [SSHD-555] Add ChannelListener support

Posted by lg...@apache.org.
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
index dbe8b2b..36cbbe3 100644
--- a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
@@ -36,11 +36,15 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.sshd.client.auth.UserAuth;
 import org.apache.sshd.client.auth.UserAuthKeyboardInteractive;
@@ -54,15 +58,22 @@ import org.apache.sshd.client.channel.ClientChannel;
 import org.apache.sshd.client.future.AuthFuture;
 import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.client.subsystem.SubsystemClient;
+import org.apache.sshd.client.subsystem.sftp.SftpClient;
+import org.apache.sshd.common.Factory;
 import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.FactoryManagerUtils;
 import org.apache.sshd.common.NamedFactory;
+import org.apache.sshd.common.NamedResource;
 import org.apache.sshd.common.RuntimeSshException;
 import org.apache.sshd.common.Service;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.channel.AbstractChannel;
 import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.channel.ChannelListener;
+import org.apache.sshd.common.channel.ChannelListenerManager;
+import org.apache.sshd.common.channel.TestChannelListener;
 import org.apache.sshd.common.cipher.BuiltinCiphers;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
@@ -75,9 +86,11 @@ import org.apache.sshd.common.keyprovider.KeyPairProvider;
 import org.apache.sshd.common.session.AbstractSession;
 import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.session.Session;
+import org.apache.sshd.common.session.SessionListener;
 import org.apache.sshd.common.subsystem.sftp.SftpConstants;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.Transformer;
+import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.common.util.io.NoCloseOutputStream;
@@ -95,6 +108,7 @@ import org.apache.sshd.server.session.ServerConnectionServiceFactory;
 import org.apache.sshd.server.session.ServerSession;
 import org.apache.sshd.server.session.ServerUserAuthService;
 import org.apache.sshd.server.session.ServerUserAuthServiceFactory;
+import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory;
 import org.apache.sshd.util.AsyncEchoShellFactory;
 import org.apache.sshd.util.BaseTestSupport;
 import org.apache.sshd.util.BogusPasswordAuthenticator;
@@ -106,6 +120,8 @@ import org.junit.Before;
 import org.junit.FixMethodOrder;
 import org.junit.Test;
 import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * TODO Add javadoc
@@ -121,6 +137,28 @@ public class ClientTest extends BaseTestSupport {
     private CountDownLatch authLatch;
     private CountDownLatch channelLatch;
 
+    private final AtomicReference<ClientSession> clientSessionHolder = new AtomicReference<ClientSession>(null);
+    @SuppressWarnings("synthetic-access")
+    private final SessionListener clientSessionListener = new SessionListener() {
+        @Override
+        public void sessionCreated(Session session) {
+            assertObjectInstanceOf("Non client session creation notification", ClientSession.class, session);
+            assertNull("Multiple creation notifications", clientSessionHolder.getAndSet((ClientSession) session));
+        }
+
+        @Override
+        public void sessionEvent(Session session, Event event) {
+            assertObjectInstanceOf("Non client session event notification: " + event, ClientSession.class, session);
+            assertSame("Mismatched client session event instance: " + event, clientSessionHolder.get(), session);
+        }
+
+        @Override
+        public void sessionClosed(Session session) {
+            assertObjectInstanceOf("Non client session closure notification", ClientSession.class, session);
+            assertSame("Mismatched client session closure instance", clientSessionHolder.getAndSet(null), session);
+        }
+    };
+
     public ClientTest() {
         super();
     }
@@ -185,6 +223,8 @@ public class ClientTest extends BaseTestSupport {
         port = sshd.getPort();
 
         client = SshClient.setUpDefaultClient();
+        clientSessionHolder.set(null);  // just making sure
+        client.addSessionListener(clientSessionListener);
     }
 
     @After
@@ -195,6 +235,167 @@ public class ClientTest extends BaseTestSupport {
         if (client != null) {
             client.stop();
         }
+        clientSessionHolder.set(null);  // just making sure
+    }
+
+    @Test
+    public void testClientStillActiveIfListenerExceptions() throws Exception {
+        final Map<String, Integer> eventsMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+        final Collection<String> failuresSet = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
+        final Logger log = LoggerFactory.getLogger(getClass());
+        client.addChannelListener(new ChannelListener() {
+            @Override
+            public void channelOpenSuccess(Channel channel) {
+                handleChannelEvent("OpenSuccess", channel);
+            }
+
+            @Override
+            public void channelOpenFailure(Channel channel, Throwable reason) {
+                assertObjectInstanceOf("Mismatched failure reason type", ChannelFailureException.class, reason);
+
+                String name = ((NamedResource) reason).getName();
+                synchronized(failuresSet) {
+                    assertTrue("Re-signalled failure location: " + name, failuresSet.add(name));
+                }
+            }
+
+            @Override
+            public void channelInitialized(Channel channel) {
+                handleChannelEvent("Initialized", channel);
+            }
+
+            @Override
+            public void channelClosed(Channel channel) {
+                handleChannelEvent("Closed", channel);
+            }
+
+            private void handleChannelEvent(String name, Channel channel) {
+                int id = channel.getId();
+                synchronized(eventsMap) {
+                    if (eventsMap.put(name, id) != null) {
+                        return; // already generated an exception for this event
+                    }
+                }
+
+                log.info("handleChannelEvent({})[{}]", name, id);
+                throw new ChannelFailureException(name);
+            }
+        });
+
+        client.start();
+
+        try (ClientSession session = createTestClientSession();
+             PipedOutputStream pipedIn = new PipedOutputStream();
+             InputStream inPipe = new PipedInputStream(pipedIn);
+             ByteArrayOutputStream out = new ByteArrayOutputStream();
+             ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+            // we expect failures either on channel init or open - the one on close is ignored...
+            for (int retryCount = 0; retryCount <= 3; retryCount++) {
+                try {
+                    try(ChannelShell channel = session.createShellChannel()) {
+                        channel.setIn(inPipe);
+                        channel.setOut(out);
+                        channel.setErr(err);
+                        channel.open().verify(3L, TimeUnit.SECONDS);
+                        break;  // 1st success means all methods have been invoked
+                    }
+                } catch (IOException e) {
+                    synchronized(eventsMap) {
+                        eventsMap.remove("Closed"); // since it is called anyway but does not cause an IOException
+                        assertTrue("Unexpected failure at retry #" + retryCount, eventsMap.size() < 3);
+                    }
+                }
+            }
+        } finally {
+            client.stop();
+        }
+
+        assertEquals("Mismatched total failures count on test end", 3, eventsMap.size());
+        assertEquals("Mismatched open failures count on test end", 1, failuresSet.size());
+    }
+
+    @Test
+    public void testSimpleClientListener() throws Exception {
+        final AtomicReference<Channel> channelHolder = new AtomicReference<>(null);
+        client.addChannelListener(new ChannelListener() {
+            @Override
+            public void channelOpenSuccess(Channel channel) {
+                assertSame("Mismatched opened channel instances", channel, channelHolder.get());
+            }
+
+            @Override
+            public void channelOpenFailure(Channel channel, Throwable reason) {
+                assertSame("Mismatched failed open channel instances", channel, channelHolder.get());
+            }
+
+            @Override
+            public void channelInitialized(Channel channel) {
+                assertNull("Multiple channel initialization notifications", channelHolder.getAndSet(channel));
+            }
+
+            @Override
+            public void channelClosed(Channel channel) {
+                assertSame("Mismatched closed channel instances", channel, channelHolder.getAndSet(null));
+            }
+        });
+        sshd.setSubsystemFactories(Arrays.<NamedFactory<Command>>asList(new SftpSubsystemFactory()));
+
+        client.start();
+
+        try (final ClientSession session = createTestClientSession()) {
+            testClientListener(channelHolder, ChannelShell.class, new Factory<ChannelShell>() {
+                @Override
+                public ChannelShell create() {
+                    try {
+                        return session.createShellChannel();
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
+            testClientListener(channelHolder, ChannelExec.class, new Factory<ChannelExec>() {
+                @Override
+                public ChannelExec create() {
+                    try {
+                        return session.createExecChannel(getCurrentTestName());
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
+            testClientListener(channelHolder, SftpClient.class, new Factory<SftpClient>() {
+                @Override
+                public SftpClient create() {
+                    try {
+                        return session.createSftpClient();
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
+        } finally {
+            client.stop();
+        }
+    }
+
+    private <C extends Closeable> void testClientListener(AtomicReference<Channel> channelHolder, Class<C> channelType, Factory<? extends C> factory) throws Exception {
+        assertNull(channelType.getSimpleName() + ": Unexpected currently active channel", channelHolder.get());
+
+        try(C instance = factory.create()) {
+            Channel expectedChannel;
+            if (instance instanceof Channel) {
+                expectedChannel = (Channel) instance;
+            } else if (instance instanceof SubsystemClient) {
+                expectedChannel = ((SubsystemClient) instance).getClientChannel();
+            } else {
+                throw new UnsupportedOperationException("Unknown test instance type" + instance.getClass().getSimpleName());
+            }
+
+            Channel actualChannel = channelHolder.get();
+            assertSame("Mismatched listener " + channelType.getSimpleName() + " instances", expectedChannel, actualChannel);
+        }
+
+        assertNull(channelType.getSimpleName() + ": Active channel closure not signalled", channelHolder.get());
     }
 
     @Test
@@ -205,23 +406,21 @@ public class ClientTest extends BaseTestSupport {
         FactoryManagerUtils.updateProperty(client, FactoryManager.WINDOW_SIZE, 1024);
         client.start();
 
-        try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
-            session.addPasswordIdentity(getCurrentTestName());
-            session.auth().verify(5L, TimeUnit.SECONDS);
+        try (ClientSession session = createTestClientSession();
+             final ChannelShell channel = session.createShellChannel()) {
 
-            try (final ChannelShell channel = session.createShellChannel()) {
-                channel.setStreaming(ClientChannel.Streaming.Async);
-                channel.open().verify(5L, TimeUnit.SECONDS);
+            channel.setStreaming(ClientChannel.Streaming.Async);
+            channel.open().verify(5L, TimeUnit.SECONDS);
 
-                final byte[] message = "0123456789\n".getBytes(StandardCharsets.UTF_8);
-                final int nbMessages = 1000;
+            final byte[] message = "0123456789\n".getBytes(StandardCharsets.UTF_8);
+            final int nbMessages = 1000;
 
-                try (final ByteArrayOutputStream baosOut = new ByteArrayOutputStream();
-                     final ByteArrayOutputStream baosErr = new ByteArrayOutputStream()) {
-                    final AtomicInteger writes = new AtomicInteger(nbMessages);
+            try (final ByteArrayOutputStream baosOut = new ByteArrayOutputStream();
+                 final ByteArrayOutputStream baosErr = new ByteArrayOutputStream()) {
+                 final AtomicInteger writes = new AtomicInteger(nbMessages);
 
-                    channel.getAsyncIn().write(new ByteArrayBuffer(message))
-                            .addListener(new SshFutureListener<IoWriteFuture>() {
+                channel.getAsyncIn().write(new ByteArrayBuffer(message))
+                       .addListener(new SshFutureListener<IoWriteFuture>() {
                                 @Override
                                 public void operationComplete(IoWriteFuture future) {
                                     try {
@@ -242,8 +441,8 @@ public class ClientTest extends BaseTestSupport {
                                     }
                                 }
                             });
-                    channel.getAsyncOut().read(new ByteArrayBuffer())
-                            .addListener(new SshFutureListener<IoReadFuture>() {
+                channel.getAsyncOut().read(new ByteArrayBuffer())
+                       .addListener(new SshFutureListener<IoReadFuture>() {
                                 @Override
                                 public void operationComplete(IoReadFuture future) {
                                     try {
@@ -261,8 +460,8 @@ public class ClientTest extends BaseTestSupport {
                                     }
                                 }
                             });
-                    channel.getAsyncErr().read(new ByteArrayBuffer())
-                            .addListener(new SshFutureListener<IoReadFuture>() {
+                channel.getAsyncErr().read(new ByteArrayBuffer())
+                       .addListener(new SshFutureListener<IoReadFuture>() {
                                 @Override
                                 public void operationComplete(IoReadFuture future) {
                                     try {
@@ -281,210 +480,201 @@ public class ClientTest extends BaseTestSupport {
                                 }
                             });
 
-                    channel.waitFor(ClientChannel.CLOSED, 0);
+                channel.waitFor(ClientChannel.CLOSED, 0);
 
-                    assertEquals(nbMessages * message.length, baosOut.size());
-                }
+                assertEquals("Mismatched sent and received data size", nbMessages * message.length, baosOut.size());
             }
 
             client.close(true);
         } finally {
             client.stop();
         }
+        assertNull("Session closure not signalled", clientSessionHolder.get());
     }
 
     @Test
     public void testCommandDeadlock() throws Exception {
         client.start();
 
-        try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
-            session.addPasswordIdentity(getCurrentTestName());
-            session.auth().verify(5L, TimeUnit.SECONDS);
-
-            try (ChannelExec channel = session.createExecChannel(getCurrentTestName());
-                 OutputStream stdout = new NoCloseOutputStream(System.out);
-                 OutputStream stderr = new NoCloseOutputStream(System.err)) {
-
-                channel.setOut(stdout);
-                channel.setErr(stderr);
-                channel.open().verify(9L, TimeUnit.SECONDS);
-                Thread.sleep(125L);
-                try {
-                    byte[] data = "a".getBytes(StandardCharsets.UTF_8);
-                    for (int i = 0; i < 100; i++) {
-                        channel.getInvertedIn().write(data);
-                        channel.getInvertedIn().flush();
-                    }
-                } catch (SshException e) {
-                    // That's ok, the channel is being closed by the other side
+        try (ClientSession session = createTestClientSession();
+             ChannelExec channel = session.createExecChannel(getCurrentTestName());
+             OutputStream stdout = new NoCloseOutputStream(System.out);
+             OutputStream stderr = new NoCloseOutputStream(System.err)) {
+
+            channel.setOut(stdout);
+            channel.setErr(stderr);
+            channel.open().verify(9L, TimeUnit.SECONDS);
+            Thread.sleep(125L);
+            try {
+                byte[] data = "a".getBytes(StandardCharsets.UTF_8);
+                OutputStream invertedStream = channel.getInvertedIn();
+                for (int i = 0; i < 100; i++) {
+                    invertedStream.write(data);
+                    invertedStream.flush();
                 }
-                assertEquals(ClientChannel.CLOSED, channel.waitFor(ClientChannel.CLOSED, 0) & ClientChannel.CLOSED);
-                session.close(false).await();
+            } catch (SshException e) {
+                // That's ok, the channel is being closed by the other side
             }
+            assertEquals(ClientChannel.CLOSED, channel.waitFor(ClientChannel.CLOSED, 0) & ClientChannel.CLOSED);
+            session.close(false).await();
         } finally {
             client.stop();
         }
+
+        assertNull("Session closure not signalled", clientSessionHolder.get());
     }
 
     @Test
     public void testClient() throws Exception {
         client.start();
 
-        try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
-            session.addPasswordIdentity(getCurrentTestName());
-            session.auth().verify(5L, TimeUnit.SECONDS);
-
-            try (ClientChannel channel = session.createShellChannel();
-                 ByteArrayOutputStream sent = new ByteArrayOutputStream();
-                 PipedOutputStream pipedIn = new PipedOutputStream();
-                 PipedInputStream pipedOut = new PipedInputStream(pipedIn)) {
+        try (ClientSession session = createTestClientSession();
+             ClientChannel channel = session.createShellChannel();
+             ByteArrayOutputStream sent = new ByteArrayOutputStream();
+             PipedOutputStream pipedIn = new PipedOutputStream();
+             PipedInputStream pipedOut = new PipedInputStream(pipedIn)) {
 
-                channel.setIn(pipedOut);
+            channel.setIn(pipedOut);
 
-                try (OutputStream teeOut = new TeeOutputStream(sent, pipedIn);
-                     ByteArrayOutputStream out = new ByteArrayOutputStream();
-                     ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+            try (OutputStream teeOut = new TeeOutputStream(sent, pipedIn);
+                 ByteArrayOutputStream out = new ByteArrayOutputStream();
+                 ByteArrayOutputStream err = new ByteArrayOutputStream()) {
 
-                    channel.setOut(out);
-                    channel.setErr(err);
-                    channel.open();
+                channel.setOut(out);
+                channel.setErr(err);
+                channel.open();
 
-                    teeOut.write("this is my command\n".getBytes(StandardCharsets.UTF_8));
-                    teeOut.flush();
+                teeOut.write("this is my command\n".getBytes(StandardCharsets.UTF_8));
+                teeOut.flush();
 
-                    StringBuilder sb = new StringBuilder();
-                    for (int i = 0; i < 1000; i++) {
-                        sb.append("0123456789");
-                    }
-                    sb.append("\n");
-                    teeOut.write(sb.toString().getBytes(StandardCharsets.UTF_8));
+                StringBuilder sb = new StringBuilder();
+                for (int i = 0; i < 1000; i++) {
+                    sb.append("0123456789");
+                }
+                sb.append("\n");
+                teeOut.write(sb.toString().getBytes(StandardCharsets.UTF_8));
 
-                    teeOut.write("exit\n".getBytes(StandardCharsets.UTF_8));
-                    teeOut.flush();
+                teeOut.write("exit\n".getBytes(StandardCharsets.UTF_8));
+                teeOut.flush();
 
-                    channel.waitFor(ClientChannel.CLOSED, 0);
+                channel.waitFor(ClientChannel.CLOSED, 0);
 
-                    channel.close(false);
-                    client.stop();
+                channel.close(false);
+                client.stop();
 
-                    assertArrayEquals(sent.toByteArray(), out.toByteArray());
-                }
+                assertArrayEquals("Mismatched sent and received data", sent.toByteArray(), out.toByteArray());
             }
         } finally {
             client.stop();
         }
+
+        assertNull("Session closure not signalled", clientSessionHolder.get());
     }
 
     @Test
     public void testClientInverted() throws Exception {
         client.start();
 
-        try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
-            session.addPasswordIdentity(getCurrentTestName());
-            session.auth().verify(5L, TimeUnit.SECONDS);
-
-            try (ClientChannel channel = session.createShellChannel();
-                 ByteArrayOutputStream sent = new ByteArrayOutputStream();
-                 ByteArrayOutputStream out = new ByteArrayOutputStream();
-                 ByteArrayOutputStream err = new ByteArrayOutputStream()) {
-
-                channel.setOut(out);
-                channel.setErr(err);
-                channel.open().verify(9L, TimeUnit.SECONDS);
+        try (ClientSession session = createTestClientSession();
+             ClientChannel channel = session.createShellChannel();
+             ByteArrayOutputStream sent = new ByteArrayOutputStream();
+             ByteArrayOutputStream out = new ByteArrayOutputStream();
+             ByteArrayOutputStream err = new ByteArrayOutputStream()) {
 
-                try (OutputStream pipedIn = new TeeOutputStream(sent, channel.getInvertedIn())) {
-                    pipedIn.write("this is my command\n".getBytes(StandardCharsets.UTF_8));
-                    pipedIn.flush();
+            channel.setOut(out);
+            channel.setErr(err);
+            channel.open().verify(9L, TimeUnit.SECONDS);
 
-                    StringBuilder sb = new StringBuilder();
-                    for (int i = 0; i < 1000; i++) {
-                        sb.append("0123456789");
-                    }
-                    sb.append("\n");
-                    pipedIn.write(sb.toString().getBytes(StandardCharsets.UTF_8));
+            try (OutputStream pipedIn = new TeeOutputStream(sent, channel.getInvertedIn())) {
+                pipedIn.write("this is my command\n".getBytes(StandardCharsets.UTF_8));
+                pipedIn.flush();
 
-                    pipedIn.write("exit\n".getBytes(StandardCharsets.UTF_8));
-                    pipedIn.flush();
+                StringBuilder sb = new StringBuilder();
+                for (int i = 0; i < 1000; i++) {
+                    sb.append("0123456789");
                 }
+                sb.append("\n");
+                pipedIn.write(sb.toString().getBytes(StandardCharsets.UTF_8));
 
-                channel.waitFor(ClientChannel.CLOSED, 0);
+                pipedIn.write("exit\n".getBytes(StandardCharsets.UTF_8));
+                pipedIn.flush();
+            }
 
-                channel.close(false);
-                client.stop();
+            channel.waitFor(ClientChannel.CLOSED, 0);
 
-                assertArrayEquals(sent.toByteArray(), out.toByteArray());
-            }
+            channel.close(false);
+            client.stop();
+
+            assertArrayEquals("Mismatched sent and received data", sent.toByteArray(), out.toByteArray());
         } finally {
             client.stop();
         }
+
+        assertNull("Session closure not signalled", clientSessionHolder.get());
     }
 
     @Test
     public void testClientWithCustomChannel() throws Exception {
         client.start();
 
-        try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
-            session.addPasswordIdentity(getCurrentTestName());
-            session.auth().verify(5L, TimeUnit.SECONDS);
-
-            try (ChannelShell channel = new ChannelShell();
-                 ByteArrayOutputStream sent = new ByteArrayOutputStream();
-                 ByteArrayOutputStream out = new ByteArrayOutputStream();
-                 ByteArrayOutputStream err = new ByteArrayOutputStream()) {
-
-                session.getService(ConnectionService.class).registerChannel(channel);
-                channel.setOut(out);
-                channel.setErr(err);
-                channel.open().verify(5L, TimeUnit.SECONDS);
-                channel.close(false).await();
-            }
+        try (ClientSession session = createTestClientSession();
+             ChannelShell channel = new ChannelShell();
+             ByteArrayOutputStream sent = new ByteArrayOutputStream();
+             ByteArrayOutputStream out = new ByteArrayOutputStream();
+             ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+
+            session.getService(ConnectionService.class).registerChannel(channel);
+            channel.setOut(out);
+            channel.setErr(err);
+            channel.open().verify(5L, TimeUnit.SECONDS);
+            channel.close(false).await();
         } finally {
             client.stop();
         }
+
+        assertNull("Session closure not signalled", clientSessionHolder.get());
     }
 
     @Test
     public void testClientClosingStream() throws Exception {
         client.start();
 
-        try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
-            session.addPasswordIdentity(getCurrentTestName());
-            session.auth().verify(5L, TimeUnit.SECONDS);
-
-            try (ClientChannel channel = session.createShellChannel();
-                 ByteArrayOutputStream sent = new ByteArrayOutputStream();
-                 PipedOutputStream pipedIn = new PipedOutputStream();
-                 InputStream inPipe = new PipedInputStream(pipedIn);
-                 ByteArrayOutputStream out = new ByteArrayOutputStream();
-                 ByteArrayOutputStream err = new ByteArrayOutputStream()) {
-
-                channel.setIn(inPipe);
-                channel.setOut(out);
-                channel.setErr(err);
-                channel.open();
-
-                try (OutputStream teeOut = new TeeOutputStream(sent, pipedIn)) {
-                    teeOut.write("this is my command\n".getBytes(StandardCharsets.UTF_8));
-                    teeOut.flush();
-
-                    StringBuilder sb = new StringBuilder();
-                    for (int i = 0; i < 1000; i++) {
-                        sb.append("0123456789");
-                    }
-                    sb.append("\n");
-                    teeOut.write(sb.toString().getBytes(StandardCharsets.UTF_8));
+        try (ClientSession session = createTestClientSession();
+             ClientChannel channel = session.createShellChannel();
+             ByteArrayOutputStream sent = new ByteArrayOutputStream();
+             PipedOutputStream pipedIn = new PipedOutputStream();
+             InputStream inPipe = new PipedInputStream(pipedIn);
+             ByteArrayOutputStream out = new ByteArrayOutputStream();
+             ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+
+            channel.setIn(inPipe);
+            channel.setOut(out);
+            channel.setErr(err);
+            channel.open();
+
+            try (OutputStream teeOut = new TeeOutputStream(sent, pipedIn)) {
+                teeOut.write("this is my command\n".getBytes(StandardCharsets.UTF_8));
+                teeOut.flush();
+
+                StringBuilder sb = new StringBuilder();
+                for (int i = 0; i < 1000; i++) {
+                    sb.append("0123456789");
                 }
+                sb.append("\n");
+                teeOut.write(sb.toString().getBytes(StandardCharsets.UTF_8));
+            }
 
-                channel.waitFor(ClientChannel.CLOSED, 0);
+            channel.waitFor(ClientChannel.CLOSED, 0);
 
-                channel.close(false);
-                client.stop();
+            channel.close(false);
+            client.stop();
 
-                assertArrayEquals(sent.toByteArray(), out.toByteArray());
-            }
+            assertArrayEquals("Mismatched sent and received data", sent.toByteArray(), out.toByteArray());
         } finally {
             client.stop();
         }
+
+        assertNull("Session closure not signalled", clientSessionHolder.get());
     }
 
     @Test
@@ -496,79 +686,73 @@ public class ClientTest extends BaseTestSupport {
 //        FactoryManagerUtils.updateProperty(sshd, SshServer.MAX_PACKET_SIZE, 0x1000);
         client.start();
 
-        try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
-            session.addPasswordIdentity(getCurrentTestName());
-            session.auth().verify(5L, TimeUnit.SECONDS);
-
-            try (ClientChannel channel = session.createShellChannel();
-                 ByteArrayOutputStream sent = new ByteArrayOutputStream();
-                 PipedOutputStream pipedIn = new PipedOutputStream();
-                 InputStream inPipe = new PipedInputStream(pipedIn);
-                 ByteArrayOutputStream out = new ByteArrayOutputStream();
-                 ByteArrayOutputStream err = new ByteArrayOutputStream()) {
-
-                channel.setIn(inPipe);
-                channel.setOut(out);
-                channel.setErr(err);
-                channel.open().verify(9L, TimeUnit.SECONDS);
-
-
-                int bytes = 0;
-                byte[] data = "01234567890123456789012345678901234567890123456789\n".getBytes(StandardCharsets.UTF_8);
-                long t0 = System.currentTimeMillis();
-                try (OutputStream teeOut = new TeeOutputStream(sent, pipedIn)) {
-                    for (int i = 0; i < 10000; i++) {
-                        teeOut.write(data);
-                        teeOut.flush();
-                        bytes += data.length;
-                        if ((bytes & 0xFFF00000) != ((bytes - data.length) & 0xFFF00000)) {
-                            System.out.println("Bytes written: " + bytes);
-                        }
-                    }
-                    teeOut.write("exit\n".getBytes(StandardCharsets.UTF_8));
+        try (ClientSession session = createTestClientSession();
+             ClientChannel channel = session.createShellChannel();
+             ByteArrayOutputStream sent = new ByteArrayOutputStream();
+             PipedOutputStream pipedIn = new PipedOutputStream();
+             InputStream inPipe = new PipedInputStream(pipedIn);
+             ByteArrayOutputStream out = new ByteArrayOutputStream();
+             ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+
+            channel.setIn(inPipe);
+            channel.setOut(out);
+            channel.setErr(err);
+            channel.open().verify(9L, TimeUnit.SECONDS);
+
+            int bytes = 0;
+            byte[] data = "01234567890123456789012345678901234567890123456789\n".getBytes(StandardCharsets.UTF_8);
+            long t0 = System.currentTimeMillis();
+            try (OutputStream teeOut = new TeeOutputStream(sent, pipedIn)) {
+                for (int i = 0; i < 10000; i++) {
+                    teeOut.write(data);
                     teeOut.flush();
+                    bytes += data.length;
+                    if ((bytes & 0xFFF00000) != ((bytes - data.length) & 0xFFF00000)) {
+                        System.out.println("Bytes written: " + bytes);
+                    }
                 }
-                long t1 = System.currentTimeMillis();
-
-                System.out.println("Sent " + (bytes / 1024) + " Kb in " + (t1 - t0) + " ms");
+                teeOut.write("exit\n".getBytes(StandardCharsets.UTF_8));
+                teeOut.flush();
+            }
 
-                System.out.println("Waiting for channel to be closed");
+            long t1 = System.currentTimeMillis();
+            System.out.println("Sent " + (bytes / 1024) + " Kb in " + (t1 - t0) + " ms");
 
-                channel.waitFor(ClientChannel.CLOSED, 0);
-
-                channel.close(false);
-                client.stop();
+            System.out.println("Waiting for channel to be closed");
+            channel.waitFor(ClientChannel.CLOSED, 0);
+            channel.close(false);
+            client.stop();
 
-                assertArrayEquals(sent.toByteArray(), out.toByteArray());
-                //assertArrayEquals(sent.toByteArray(), out.toByteArray());
-            }
+            assertArrayEquals("Mismatched sent and received data", sent.toByteArray(), out.toByteArray());
         } finally {
             client.stop();
         }
+
+        assertNull("Session closure not signalled", clientSessionHolder.get());
     }
 
     @Test(expected = SshException.class)
     public void testOpenChannelOnClosedSession() throws Exception {
         client.start();
 
-        try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
-            session.addPasswordIdentity(getCurrentTestName());
-            session.auth().verify(5L, TimeUnit.SECONDS);
+        try (ClientSession session = createTestClientSession();
+             ClientChannel channel = session.createShellChannel()) {
 
-            try (ClientChannel channel = session.createShellChannel()) {
-                session.close(false);
+            session.close(false);
+            assertNull("Session closure not signalled", clientSessionHolder.get());
 
-                try (PipedOutputStream pipedIn = new PipedOutputStream();
-                     InputStream inPipe = new PipedInputStream(pipedIn);
-                     ByteArrayOutputStream out = new ByteArrayOutputStream();
-                     ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+            try (PipedOutputStream pipedIn = new PipedOutputStream();
+                 InputStream inPipe = new PipedInputStream(pipedIn);
+                 ByteArrayOutputStream out = new ByteArrayOutputStream();
+                 ByteArrayOutputStream err = new ByteArrayOutputStream()) {
 
-                    channel.setIn(inPipe);
-                    channel.setOut(out);
-                    channel.setErr(err);
-                    channel.open();
-                }
+                channel.setIn(inPipe);
+                channel.setOut(out);
+                channel.setErr(err);
+                channel.open();
             }
+        } finally {
+            client.stop();
         }
     }
 
@@ -578,10 +762,12 @@ public class ClientTest extends BaseTestSupport {
         client.start();
 
         try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
+            assertNotNull("Client session creation not signalled", clientSessionHolder.get());
             session.addPasswordIdentity(getCurrentTestName());
 
             AuthFuture authFuture = session.auth();
             CloseFuture closeFuture = session.close(false);
+
             authLatch.countDown();
             assertTrue("Authentication writing not completed in time", authFuture.await(11L, TimeUnit.SECONDS));
             assertTrue("Session closing not complete in time", closeFuture.await(8L, TimeUnit.SECONDS));
@@ -590,35 +776,35 @@ public class ClientTest extends BaseTestSupport {
         } finally {
             client.stop();
         }
+
+        assertNull("Session closure not signalled", clientSessionHolder.get());
     }
 
     @Test
     public void testCloseCleanBeforeChannelOpened() throws Exception {
         client.start();
 
-        try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
-            session.addPasswordIdentity(getCurrentTestName());
-            session.auth().verify(5L, TimeUnit.SECONDS);
-
-            try (ClientChannel channel = session.createShellChannel();
-                 InputStream inp = new ByteArrayInputStream(GenericUtils.EMPTY_BYTE_ARRAY);
-                 OutputStream out = new ByteArrayOutputStream();
-                 OutputStream err = new ByteArrayOutputStream()) {
+        try (ClientSession session = createTestClientSession();
+             ClientChannel channel = session.createShellChannel();
+             InputStream inp = new ByteArrayInputStream(GenericUtils.EMPTY_BYTE_ARRAY);
+             OutputStream out = new ByteArrayOutputStream();
+             OutputStream err = new ByteArrayOutputStream()) {
 
-                channel.setIn(inp);
-                channel.setOut(out);
-                channel.setErr(err);
+            channel.setIn(inp);
+            channel.setOut(out);
+            channel.setErr(err);
 
-                OpenFuture openFuture = channel.open();
-                CloseFuture closeFuture = session.close(false);
-                assertTrue("Channel not open in time", openFuture.await(11L, TimeUnit.SECONDS));
-                assertTrue("Session closing not complete in time", closeFuture.await(8L, TimeUnit.SECONDS));
-                assertTrue("Not open", openFuture.isOpened());
-                assertTrue("Not closed", closeFuture.isClosed());
-            }
+            OpenFuture openFuture = channel.open();
+            CloseFuture closeFuture = session.close(false);
+            assertTrue("Channel not open in time", openFuture.await(11L, TimeUnit.SECONDS));
+            assertTrue("Session closing not complete in time", closeFuture.await(8L, TimeUnit.SECONDS));
+            assertTrue("Not open", openFuture.isOpened());
+            assertTrue("Not closed", closeFuture.isClosed());
         } finally {
             client.stop();
         }
+
+        assertNull("Session closure not signalled", clientSessionHolder.get());
     }
 
     @Test
@@ -626,27 +812,25 @@ public class ClientTest extends BaseTestSupport {
         channelLatch = new CountDownLatch(1);
         client.start();
 
-        try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
-            session.addPasswordIdentity(getCurrentTestName());
-            session.auth().verify(5L, TimeUnit.SECONDS);
+        try (ClientSession session = createTestClientSession();
+             ClientChannel channel = session.createShellChannel();
+             InputStream inp = new ByteArrayInputStream(GenericUtils.EMPTY_BYTE_ARRAY);
+             OutputStream out = new ByteArrayOutputStream();
+             OutputStream err = new ByteArrayOutputStream()) {
 
-            try (ClientChannel channel = session.createShellChannel();
-                 InputStream inp = new ByteArrayInputStream(GenericUtils.EMPTY_BYTE_ARRAY);
-                 OutputStream out = new ByteArrayOutputStream();
-                 OutputStream err = new ByteArrayOutputStream()) {
+            channel.setIn(inp);
+            channel.setOut(out);
+            channel.setErr(err);
 
-                channel.setIn(inp);
-                channel.setOut(out);
-                channel.setErr(err);
+            OpenFuture openFuture = channel.open();
+            CloseFuture closeFuture = session.close(true);
+            assertNull("Session closure not signalled", clientSessionHolder.get());
 
-                OpenFuture openFuture = channel.open();
-                CloseFuture closeFuture = session.close(true);
-                channelLatch.countDown();
-                assertTrue("Channel not open in time", openFuture.await(11L, TimeUnit.SECONDS));
-                assertTrue("Session closing not complete in time", closeFuture.await(8L, TimeUnit.SECONDS));
-                assertNotNull("No open exception", openFuture.getException());
-                assertTrue("Not closed", closeFuture.isClosed());
-            }
+            channelLatch.countDown();
+            assertTrue("Channel not open in time", openFuture.await(11L, TimeUnit.SECONDS));
+            assertTrue("Session closing not complete in time", closeFuture.await(8L, TimeUnit.SECONDS));
+            assertNotNull("No open exception", openFuture.getException());
+            assertTrue("Not closed", closeFuture.isClosed());
         } finally {
             client.stop();
         }
@@ -657,12 +841,14 @@ public class ClientTest extends BaseTestSupport {
         client.start();
 
         try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
+            assertNotNull("Client session creation not signalled", clientSessionHolder.get());
             KeyPair pair = Utils.createTestHostKeyProvider().loadKey(KeyPairProvider.SSH_RSA);
             session.addPublicKeyIdentity(pair);
             session.auth().verify(5L, TimeUnit.SECONDS);
         } finally {
             client.stop();
         }
+        assertNull("Session closure not signalled", clientSessionHolder.get());
     }
 
     @Test
@@ -671,15 +857,20 @@ public class ClientTest extends BaseTestSupport {
         client.start();
 
         try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
+            assertNotNull("Client session creation not signalled", clientSessionHolder.get());
             session.addPublicKeyIdentity(Utils.createTestHostKeyProvider().loadKey(KeyPairProvider.SSH_RSA));
             session.auth().verify(5L, TimeUnit.SECONDS);
         } finally {
             client.stop();
         }
+        assertNull("Session closure not signalled", clientSessionHolder.get());
     }
 
     @Test
     public void testPublicKeyAuthNewWithFailureOnFirstIdentity() throws Exception {
+        SimpleGeneratorHostKeyProvider provider = new SimpleGeneratorHostKeyProvider();
+        provider.setAlgorithm("RSA");
+
         final KeyPair pair = Utils.createTestHostKeyProvider().loadKey(KeyPairProvider.SSH_RSA);
         sshd.setPublickeyAuthenticator(new PublickeyAuthenticator() {
             @Override
@@ -690,16 +881,15 @@ public class ClientTest extends BaseTestSupport {
         client.setUserAuthFactories(Arrays.<NamedFactory<UserAuth>>asList(UserAuthPublicKeyFactory.INSTANCE));
         client.start();
 
-        SimpleGeneratorHostKeyProvider provider = new SimpleGeneratorHostKeyProvider();
-        provider.setAlgorithm("RSA");
-
         try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
+            assertNotNull("Client session creation not signalled", clientSessionHolder.get());
             session.addPublicKeyIdentity(provider.loadKey(KeyPairProvider.SSH_RSA));
             session.addPublicKeyIdentity(pair);
             session.auth().verify(5L, TimeUnit.SECONDS);
         } finally {
             client.stop();
         }
+        assertNull("Session closure not signalled", clientSessionHolder.get());
     }
 
     @Test
@@ -707,12 +897,12 @@ public class ClientTest extends BaseTestSupport {
         client.setUserAuthFactories(Arrays.<NamedFactory<UserAuth>>asList(new UserAuthPasswordFactory()));
         client.start();
 
-        try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
-            session.addPasswordIdentity(getCurrentTestName());
-            session.auth().verify(5L, TimeUnit.SECONDS);
+        try (ClientSession session = createTestClientSession()) {
+            // nothing extra
         } finally {
             client.stop();
         }
+        assertNull("Session closure not signalled", clientSessionHolder.get());
     }
 
     @Test
@@ -721,12 +911,14 @@ public class ClientTest extends BaseTestSupport {
         client.start();
 
         try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
+            assertNotNull("Client session creation not signalled", clientSessionHolder.get());
             session.addPasswordIdentity(getClass().getSimpleName());
             session.addPasswordIdentity(getCurrentTestName());
             session.auth().verify(5L, TimeUnit.SECONDS);
         } finally {
             client.stop();
         }
+        assertNull("Session closure not signalled", clientSessionHolder.get());
     }
 
     @Test
@@ -734,12 +926,12 @@ public class ClientTest extends BaseTestSupport {
         client.setUserAuthFactories(Arrays.<NamedFactory<UserAuth>>asList(UserAuthKeyboardInteractiveFactory.INSTANCE));
         client.start();
 
-        try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
-            session.addPasswordIdentity(getCurrentTestName());
-            session.auth().verify(5L, TimeUnit.SECONDS);
+        try (ClientSession session = createTestClientSession()) {
+            // nothing extra
         } finally {
             client.stop();
         }
+        assertNull("Session closure not signalled", clientSessionHolder.get());
     }
 
     @Test
@@ -748,12 +940,14 @@ public class ClientTest extends BaseTestSupport {
         client.start();
 
         try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
+            assertNotNull("Client session creation not signalled", clientSessionHolder.get());
             session.addPasswordIdentity(getClass().getSimpleName());
             session.addPasswordIdentity(getCurrentTestName());
             session.auth().verify(5L, TimeUnit.SECONDS);
         } finally {
             client.stop();
         }
+        assertNull("Session closure not signalled", clientSessionHolder.get());
     }
 
     @Test   // see SSHD-504
@@ -837,6 +1031,7 @@ public class ClientTest extends BaseTestSupport {
         try {
             for (int index = 0; index < xformers.size(); index++) {
                 try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7, TimeUnit.SECONDS).getSession()) {
+                    assertNotNull("Client session creation not signalled at iteration #" + index, clientSessionHolder.get());
                     String password = "bad-" + getCurrentTestName() + "-" + index;
                     session.addPasswordIdentity(password);
 
@@ -845,13 +1040,15 @@ public class ClientTest extends BaseTestSupport {
                     assertFalse("Unexpected success for password=" + password, future.isSuccess());
                     session.removePasswordIdentity(password);
                 }
+
+                assertNull("Session closure not signalled at iteration #" + index, clientSessionHolder.get());
             }
 
-            try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
-                session.addPasswordIdentity(getCurrentTestName());
-                session.auth().verify(5L, TimeUnit.SECONDS);
+            try (ClientSession session = createTestClientSession()) {
                 assertTrue("Mismatched prompts evaluation results", mismatchedPrompts.isEmpty());
             }
+
+            assertNull("Final session closure not signalled", clientSessionHolder.get());
         } finally {
             client.stop();
         }
@@ -879,6 +1076,8 @@ public class ClientTest extends BaseTestSupport {
         client.start();
 
         try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
+            assertNotNull("Client session creation not signalled", clientSessionHolder.get());
+
             AuthFuture future = session.auth();
             future.await();
             assertTrue("Unexpected authentication success", future.isFailure());
@@ -886,6 +1085,8 @@ public class ClientTest extends BaseTestSupport {
         } finally {
             client.stop();
         }
+
+        assertNull("Session closure not signalled", clientSessionHolder.get());
     }
 
     @Test
@@ -899,6 +1100,7 @@ public class ClientTest extends BaseTestSupport {
         client.start();
 
         try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
+            assertNotNull("Client session creation not signalled", clientSessionHolder.get());
             session.setUserInteraction(new UserInteraction() {
                 @Override
                 public void welcome(String banner) {
@@ -912,6 +1114,7 @@ public class ClientTest extends BaseTestSupport {
                     return new String[]{getCurrentTestName()};
                 }
             });
+
             AuthFuture future = session.auth();
             future.await();
             assertTrue("Authentication not marked as success", future.isSuccess());
@@ -920,6 +1123,8 @@ public class ClientTest extends BaseTestSupport {
         } finally {
             client.stop();
         }
+
+        assertNull("Session closure not signalled", clientSessionHolder.get());
     }
 
     @Test
@@ -932,6 +1137,7 @@ public class ClientTest extends BaseTestSupport {
         client.start();
 
         try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
+            assertNotNull("Client session creation not signalled", clientSessionHolder.get());
             session.setUserInteraction(new UserInteraction() {
                 @Override
                 public void welcome(String banner) {
@@ -945,6 +1151,7 @@ public class ClientTest extends BaseTestSupport {
                     return new String[]{"bad#" + attemptId};
                 }
             });
+
             AuthFuture future = session.auth();
             assertTrue("Authentication not completed in time", future.await(11L, TimeUnit.SECONDS));
             assertTrue("Authentication not, marked as failure", future.isFailure());
@@ -952,6 +1159,8 @@ public class ClientTest extends BaseTestSupport {
         } finally {
             client.stop();
         }
+
+        assertNull("Session closure not signalled", clientSessionHolder.get());
     }
 
     @Test
@@ -960,36 +1169,35 @@ public class ClientTest extends BaseTestSupport {
         try {
             client.start();
 
-            try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
-                session.addPasswordIdentity(getCurrentTestName());
-                session.auth().verify(5L, TimeUnit.SECONDS);
-
-                try (ClientChannel channel = session.createShellChannel();
-                     PipedOutputStream pipedIn = new PipedOutputStream();
-                     InputStream inPipe = new PipedInputStream(pipedIn);
-                     ByteArrayOutputStream out = new ByteArrayOutputStream();
-                     ByteArrayOutputStream err = new ByteArrayOutputStream()) {
-
-                    channel.setIn(inPipe);
-                    channel.setOut(out);
-                    channel.setErr(err);
-                    channel.open().verify(9L, TimeUnit.SECONDS);
-
-                    //            ((AbstractSession) session).disconnect(SshConstants.SSH2_DISCONNECT_BY_APPLICATION, "Cancel");
-                    AbstractSession cs = (AbstractSession) session;
-                    Buffer buffer = cs.createBuffer(SshConstants.SSH_MSG_DISCONNECT);
-                    buffer.putInt(SshConstants.SSH2_DISCONNECT_BY_APPLICATION);
-                    buffer.putString("Cancel");
-                    buffer.putString("");
-                    IoWriteFuture f = cs.writePacket(buffer);
-                    assertTrue("Packet writing not completed in time", f.await(11L, TimeUnit.SECONDS));
-                    suspend(cs.getIoSession());
-
-                    TestEchoShellFactory.TestEchoShell.latch.await();
-                }
+            try (ClientSession session = createTestClientSession();
+                 ClientChannel channel = session.createShellChannel();
+                 PipedOutputStream pipedIn = new PipedOutputStream();
+                 InputStream inPipe = new PipedInputStream(pipedIn);
+                 ByteArrayOutputStream out = new ByteArrayOutputStream();
+                 ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+
+                channel.setIn(inPipe);
+                channel.setOut(out);
+                channel.setErr(err);
+                channel.open().verify(9L, TimeUnit.SECONDS);
+
+                //            ((AbstractSession) session).disconnect(SshConstants.SSH2_DISCONNECT_BY_APPLICATION, "Cancel");
+                AbstractSession cs = (AbstractSession) session;
+                Buffer buffer = cs.createBuffer(SshConstants.SSH_MSG_DISCONNECT);
+                buffer.putInt(SshConstants.SSH2_DISCONNECT_BY_APPLICATION);
+                buffer.putString("Cancel");
+                buffer.putString("");
+
+                IoWriteFuture f = cs.writePacket(buffer);
+                assertTrue("Packet writing not completed in time", f.await(11L, TimeUnit.SECONDS));
+                suspend(cs.getIoSession());
+
+                TestEchoShellFactory.TestEchoShell.latch.await();
             } finally {
                 client.stop();
             }
+
+            assertNull("Session closure not signalled", clientSessionHolder.get());
         } finally {
             TestEchoShellFactory.TestEchoShell.latch = null;
         }
@@ -1015,11 +1223,13 @@ public class ClientTest extends BaseTestSupport {
         client.start();
 
         try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
+            assertNotNull("Client session creation not signalled", clientSessionHolder.get());
             session.waitFor(ClientSession.WAIT_AUTH, TimeUnit.SECONDS.toMillis(10L));
-            assertTrue(ok.get());
+            assertTrue("Server key verifier invoked ?", ok.get());
         } finally {
             client.stop();
         }
+        assertNull("Session closure not signalled", clientSessionHolder.get());
     }
 
     @Test
@@ -1028,9 +1238,7 @@ public class ClientTest extends BaseTestSupport {
         client.getCipherFactories().add(BuiltinCiphers.none);
         client.start();
 
-        try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
-            session.addPasswordIdentity(getCurrentTestName());
-            session.auth().verify(5L, TimeUnit.SECONDS);
+        try (ClientSession session = createTestClientSession()) {
             assertTrue("Failed to switch to NONE cipher on time", session.switchToNoneCipher().await(5L, TimeUnit.SECONDS));
 
             try (ClientChannel channel = session.createSubsystemChannel(SftpConstants.SFTP_SUBSYSTEM_NAME)) {
@@ -1039,6 +1247,7 @@ public class ClientTest extends BaseTestSupport {
         } finally {
             client.stop();
         }
+        assertNull("Session closure not signalled", clientSessionHolder.get());
     }
 
     @Test
@@ -1046,10 +1255,7 @@ public class ClientTest extends BaseTestSupport {
         client.start();
 
         Collection<ClientChannel> channels = new LinkedList<>();
-        try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
-            session.addPasswordIdentity(getCurrentTestName());
-            session.auth().verify(5L, TimeUnit.SECONDS);
-
+        try (ClientSession session = createTestClientSession()) {
             channels.add(session.createChannel(ClientChannel.CHANNEL_SUBSYSTEM, SftpConstants.SFTP_SUBSYSTEM_NAME));
             channels.add(session.createChannel(ClientChannel.CHANNEL_EXEC, getCurrentTestName()));
             channels.add(session.createChannel(ClientChannel.CHANNEL_SHELL, getClass().getSimpleName()));
@@ -1069,6 +1275,87 @@ public class ClientTest extends BaseTestSupport {
             }
             client.stop();
         }
+
+        assertNull("Session closure not signalled", clientSessionHolder.get());
+    }
+
+    /**
+     * Makes sure that the {@link ChannelListener}s added to the client, session
+     * and channel are <U>cumulative</U> - i.e., all of them invoked
+     * @throws Exception If failed
+     */
+    @Test
+    public void testChannelListenersPropagation() throws Exception {
+        Map<String,TestChannelListener> clientListeners = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+        addChannelListener(clientListeners, client, new TestChannelListener(client.getClass().getSimpleName()));
+
+        client.start();
+        try (ClientSession session = createTestClientSession()) {
+            addChannelListener(clientListeners, session, new TestChannelListener(session.getClass().getSimpleName()));
+            assertListenerSizes("ClientSessionOpen", clientListeners, 0, 0);
+
+            try (ClientChannel channel = session.createSubsystemChannel(SftpConstants.SFTP_SUBSYSTEM_NAME)) {
+                channel.open().verify(5L, TimeUnit.SECONDS);
+
+                TestChannelListener channelListener = new TestChannelListener(channel.getClass().getSimpleName());
+                // need to emulate them since we are adding the listener AFTER the channel is open
+                channelListener.channelInitialized(channel);
+                channelListener.channelOpenSuccess(channel);
+                channel.addChannelListener(channelListener);
+                assertListenerSizes("ClientChannelOpen", clientListeners, 1, 1);
+            }
+
+            assertListenerSizes("ClientChannelClose", clientListeners, 0, 1);
+        } finally {
+            client.stop();
+        }
+
+        assertListenerSizes("ClientStop", clientListeners, 0, 1);
+    }
+
+    private static void assertListenerSizes(String phase, Map<String,? extends TestChannelListener> listeners, int activeSize, int openSize) {
+        assertListenerSizes(phase, listeners.values(), activeSize, openSize);
+    }
+
+    private static void assertListenerSizes(String phase, Collection<? extends TestChannelListener> listeners, int activeSize, int openSize) {
+        if (GenericUtils.isEmpty(listeners)) {
+            return;
+        }
+
+        for (TestChannelListener l : listeners) {
+            if (activeSize >= 0) {
+                assertEquals(phase + ": mismatched active channels size for " + l.getName() + " listener", activeSize, GenericUtils.size(l.getActiveChannels()));
+            }
+
+            if (openSize >= 0) {
+                assertEquals(phase + ": mismatched open channels size for " + l.getName() + " listener", openSize, GenericUtils.size(l.getOpenChannels()));
+            }
+
+            assertEquals(phase + ": unexpected failed channels size for " + l.getName() + " listener", 0, GenericUtils.size(l.getFailedChannels()));
+        }
+    }
+
+    private static <L extends ChannelListener & NamedResource> void addChannelListener(Map<String,L> listeners, ChannelListenerManager manager, L listener) {
+        String name = listener.getName();
+        assertNull("Duplicate listener named " + name, listeners.put(name, listener));
+        manager.addChannelListener(listener);
+    }
+
+    private ClientSession createTestClientSession() throws IOException {
+        ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession();
+        try {
+            assertNotNull("Client session creation not signalled", clientSessionHolder.get());
+            session.addPasswordIdentity(getCurrentTestName());
+            session.auth().verify(5L, TimeUnit.SECONDS);
+
+            ClientSession returnValue = session;
+            session = null; // avoid 'finally' close
+            return returnValue;
+        } finally {
+            if (session != null) {
+                session.close();
+            }
+        }
     }
 
     private void suspend(IoSession ioSession) {
@@ -1099,6 +1386,25 @@ public class ClientTest extends BaseTestSupport {
         }
     }
 
+    public static class ChannelFailureException extends RuntimeException implements NamedResource {
+        private static final long serialVersionUID = 1L;    // we're not serializing it
+        private final String name;
+
+        public ChannelFailureException(String name) {
+            this.name = ValidateUtils.checkNotNullAndNotEmpty(name, "No event name provided");
+        }
+
+        @Override
+        public String getName() {
+            return name;
+        }
+
+        @Override
+        public String toString() {
+            return getName();
+        }
+    }
+
     public static void main(String[] args) throws Exception {
         SshClient.main(args);
     }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/test/java/org/apache/sshd/common/channel/TestChannelListener.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/channel/TestChannelListener.java b/sshd-core/src/test/java/org/apache/sshd/common/channel/TestChannelListener.java
new file mode 100644
index 0000000..0f7c18b
--- /dev/null
+++ b/sshd-core/src/test/java/org/apache/sshd/common/channel/TestChannelListener.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.channel;
+
+import java.util.Collection;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.apache.sshd.common.NamedResource;
+import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.channel.ChannelListener;
+import org.apache.sshd.common.util.logging.AbstractLoggingBean;
+import org.junit.Assert;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public class TestChannelListener extends AbstractLoggingBean implements ChannelListener, NamedResource {
+    private final String name;
+    private final Collection<Channel> activeChannels = new CopyOnWriteArraySet<>();
+    private final Collection<Channel> openChannels = new CopyOnWriteArraySet<>();
+    private final Collection<Channel> failedChannels = new CopyOnWriteArraySet<>();
+
+    public TestChannelListener() {
+        this("");
+    }
+
+    public TestChannelListener(String discriminator) {
+        super(discriminator);
+        name = discriminator;
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    public Collection<Channel> getActiveChannels() {
+        return activeChannels;
+    }
+
+    @Override
+    public void channelInitialized(Channel channel) {
+        Assert.assertTrue("Same channel instance re-initialized: " + channel, activeChannels.add(channel));
+        log.info("channelInitialized({})", channel);
+    }
+
+    public Collection<Channel> getOpenChannels() {
+        return openChannels;
+    }
+
+    @Override
+    public void channelOpenSuccess(Channel channel) {
+        Assert.assertTrue("Open channel not activated: " + channel, activeChannels.contains(channel));
+        Assert.assertTrue("Same channel instance re-opened: " + channel, openChannels.add(channel));
+        log.info("channelOpenSuccess({})", channel);
+    }
+
+    public Collection<Channel> getFailedChannels() {
+        return failedChannels;
+    }
+
+    @Override
+    public void channelOpenFailure(Channel channel, Throwable reason) {
+        Assert.assertTrue("Failed channel not activated: " + channel, activeChannels.contains(channel));
+        Assert.assertTrue("Same channel instance re-failed: " + channel, failedChannels.add(channel));
+        log.warn("channelOpenFailure({}) {} : {}", channel, reason.getClass().getSimpleName(), reason.getMessage());
+    }
+
+    @Override
+    public void channelClosed(Channel channel) {
+        Assert.assertTrue("Unknown closed channel instance: " + channel, activeChannels.remove(channel));
+        log.info("channelClosed({})", channel);
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + "[" + getName() + "]";
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/test/java/org/apache/sshd/common/io/nio2/Nio2ServiceTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/io/nio2/Nio2ServiceTest.java b/sshd-core/src/test/java/org/apache/sshd/common/io/nio2/Nio2ServiceTest.java
index f55a262..002196c 100644
--- a/sshd-core/src/test/java/org/apache/sshd/common/io/nio2/Nio2ServiceTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/common/io/nio2/Nio2ServiceTest.java
@@ -55,7 +55,6 @@ public class Nio2ServiceTest extends BaseTestSupport {
             sshd.setKeyPairProvider(Utils.createTestHostKeyProvider());
             sshd.setShellFactory(new TestEchoShellFactory());
             sshd.setPasswordAuthenticator(BogusPasswordAuthenticator.INSTANCE);
-            sshd.setSessionFactory(new org.apache.sshd.server.session.SessionFactory());
             sshd.start();
 
             int port = sshd.getPort();

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/test/java/org/apache/sshd/common/signature/AbstractSignatureFactoryTestSupport.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/signature/AbstractSignatureFactoryTestSupport.java b/sshd-core/src/test/java/org/apache/sshd/common/signature/AbstractSignatureFactoryTestSupport.java
index b6a9407..b958ceb 100644
--- a/sshd-core/src/test/java/org/apache/sshd/common/signature/AbstractSignatureFactoryTestSupport.java
+++ b/sshd-core/src/test/java/org/apache/sshd/common/signature/AbstractSignatureFactoryTestSupport.java
@@ -68,7 +68,6 @@ public abstract class AbstractSignatureFactoryTestSupport extends BaseTestSuppor
     public void setUp() throws Exception {
         sshd = SshServer.setUpDefaultServer();
         sshd.setPasswordAuthenticator(BogusPasswordAuthenticator.INSTANCE);
-        sshd.setSessionFactory(new org.apache.sshd.server.session.SessionFactory());
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/test/java/org/apache/sshd/deprecated/ClientUserAuthServiceOld.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/deprecated/ClientUserAuthServiceOld.java b/sshd-core/src/test/java/org/apache/sshd/deprecated/ClientUserAuthServiceOld.java
index bff5738..a969502 100644
--- a/sshd-core/src/test/java/org/apache/sshd/deprecated/ClientUserAuthServiceOld.java
+++ b/sshd-core/src/test/java/org/apache/sshd/deprecated/ClientUserAuthServiceOld.java
@@ -188,10 +188,10 @@ public class ClientUserAuthServiceOld extends CloseableUtils.AbstractCloseable i
 
     @Override
     protected void preClose() {
-        super.preClose();
         if (!authFuture.isDone()) {
             authFuture.setException(new SshException("Session is closed"));
         }
+        super.preClose();
     }
 
     public AuthFuture auth(UserAuth userAuth) throws IOException {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/test/java/org/apache/sshd/server/PublickeyAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/server/PublickeyAuthenticatorTest.java b/sshd-core/src/test/java/org/apache/sshd/server/PublickeyAuthenticatorTest.java
index 97a4b26..59ab3fb 100644
--- a/sshd-core/src/test/java/org/apache/sshd/server/PublickeyAuthenticatorTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/server/PublickeyAuthenticatorTest.java
@@ -82,7 +82,7 @@ public class PublickeyAuthenticatorTest extends BaseTestSupport {
                     Object result;
                     try {
                         result = method.invoke(authenticator, invArgs);
-                    } catch(InvocationTargetException e) {
+                    } catch (InvocationTargetException e) {
                         Throwable t = e.getTargetException();   // peel of the real exception
                         System.err.println("Failed (" + t.getClass().getSimpleName() + ")"
                                           + " to invoke with user=" + useUsername


[4/4] mina-sshd git commit: [SSHD-555] Add ChannelListener support

Posted by lg...@apache.org.
[SSHD-555] Add ChannelListener support


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/40195ab4
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/40195ab4
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/40195ab4

Branch: refs/heads/master
Commit: 40195ab466bb7ae97fd88c21b9a3394e78566ba9
Parents: 051aec3
Author: Lyor Goldstein <lg...@vmware.com>
Authored: Tue Aug 25 07:38:01 2015 +0300
Committer: Lyor Goldstein <lg...@vmware.com>
Committed: Tue Aug 25 07:38:01 2015 +0300

----------------------------------------------------------------------
 .../agent/local/ChannelAgentForwarding.java     |  22 +-
 .../sshd/agent/unix/AgentForwardedChannel.java  |   2 -
 .../sshd/agent/unix/ChannelAgentForwarding.java |  35 +-
 .../org/apache/sshd/client/SessionFactory.java  |  46 -
 .../java/org/apache/sshd/client/SshClient.java  |  13 +-
 .../java/org/apache/sshd/client/SshKeyScan.java |   4 +-
 .../client/channel/AbstractClientChannel.java   |  15 +-
 .../sshd/client/channel/ChannelSession.java     |   1 -
 .../sshd/client/session/ClientSession.java      |   1 -
 .../sshd/client/session/ClientSessionImpl.java  |   4 +-
 .../client/session/ClientUserAuthService.java   |   4 +-
 .../sshd/client/session/SessionFactory.java     |  46 +
 .../sshd/client/subsystem/SubsystemClient.java  |   6 +
 .../subsystem/sftp/DefaultSftpClient.java       |   6 +
 .../client/subsystem/sftp/SftpFileSystem.java   |   6 +
 .../sshd/common/AbstractFactoryManager.java     |  99 ++-
 .../org/apache/sshd/common/FactoryManager.java  |   4 +-
 .../sshd/common/channel/AbstractChannel.java    |  94 +-
 .../org/apache/sshd/common/channel/Channel.java |   6 +-
 .../common/channel/ChannelAsyncInputStream.java |   4 +
 .../sshd/common/channel/ChannelListener.java    |  74 ++
 .../common/channel/ChannelListenerManager.java  |  47 +
 .../common/channel/ChannelPipedInputStream.java |  25 +-
 .../sshd/common/future/DefaultSshFuture.java    |  17 +-
 .../sshd/common/session/AbstractSession.java    | 168 ++--
 .../common/session/AbstractSessionFactory.java  |  55 +-
 .../org/apache/sshd/common/session/Session.java |  17 +-
 .../common/session/SessionListenerManager.java  |  53 ++
 .../common/session/SessionTimeoutListener.java  |  17 +-
 .../sshd/common/util/EventListenerUtils.java    |  23 +-
 .../apache/sshd/common/util/GenericUtils.java   |  46 +
 .../sshd/server/ServerFactoryManager.java       |   1 -
 .../java/org/apache/sshd/server/SshServer.java  |   3 +-
 .../auth/CachingPublicKeyAuthenticator.java     |   2 +-
 .../server/channel/AbstractServerChannel.java   |  18 +-
 .../sshd/server/channel/ChannelSession.java     |  11 +-
 .../sshd/server/forward/TcpipServerChannel.java |  74 +-
 .../sshd/server/session/SessionFactory.java     |  24 +-
 .../org/apache/sshd/AuthenticationTest.java     |   5 +-
 .../java/org/apache/sshd/KeepAliveTest.java     |  79 +-
 .../java/org/apache/sshd/KeyReExchangeTest.java |   2 +-
 .../java/org/apache/sshd/client/ClientTest.java | 874 +++++++++++++------
 .../common/channel/TestChannelListener.java     |  95 ++
 .../sshd/common/io/nio2/Nio2ServiceTest.java    |   1 -
 .../AbstractSignatureFactoryTestSupport.java    |   1 -
 .../deprecated/ClientUserAuthServiceOld.java    |   2 +-
 .../sshd/server/PublickeyAuthenticatorTest.java |   2 +-
 .../java/org/apache/sshd/server/ServerTest.java | 224 ++---
 .../java/org/apache/sshd/git/util/Utils.java    |   2 +-
 49 files changed, 1638 insertions(+), 742 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
index a93d12b..19baa6b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
@@ -22,13 +22,18 @@ import java.io.IOException;
 import java.io.OutputStream;
 
 import org.apache.sshd.agent.SshAgent;
+import org.apache.sshd.agent.SshAgentFactory;
 import org.apache.sshd.agent.common.AbstractAgentClient;
 import org.apache.sshd.client.future.DefaultOpenFuture;
 import org.apache.sshd.client.future.OpenFuture;
+import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.channel.ChannelListener;
 import org.apache.sshd.common.channel.ChannelOutputStream;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.server.channel.AbstractServerChannel;
@@ -48,13 +53,24 @@ public class ChannelAgentForwarding extends AbstractServerChannel {
     @Override
     protected OpenFuture doInit(Buffer buffer) {
         final OpenFuture f = new DefaultOpenFuture(this);
+        ChannelListener listener = getChannelListenerProxy();
         try {
             out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
-            agent = session.getFactoryManager().getAgentFactory().createClient(session.getFactoryManager());
+            FactoryManager manager = session.getFactoryManager();
+            SshAgentFactory factory = ValidateUtils.checkNotNull(manager.getAgentFactory(), "No agent factory");
+            agent = factory.createClient(manager);
             client = new AgentClient();
-            f.setOpened();
 
-        } catch (Exception e) {
+            listener.channelOpenSuccess(this);
+            f.setOpened();
+        } catch (Throwable t) {
+            Throwable e = GenericUtils.peelException(t);
+            try {
+                listener.channelOpenFailure(this, e);
+            } catch (Throwable ignored) {
+                log.warn("doInit({}) failed ({}) to inform listener of open failure={}: {}",
+                         this, ignored.getClass().getSimpleName(), e.getClass().getSimpleName(), ignored.getMessage());
+            }
             f.setException(e);
         }
         return f;

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
index eb33231..8ed5c07 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
@@ -77,7 +77,5 @@ public class AgentForwardedChannel extends AbstractClientChannel implements Runn
         if (result < Status.APR_SUCCESS) {
             AgentServerProxy.throwException(result);
         }
-
     }
-
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
index 1cccc8f..113f33d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
@@ -29,10 +29,12 @@ import org.apache.sshd.client.future.DefaultOpenFuture;
 import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.FactoryManagerUtils;
 import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.channel.ChannelListener;
 import org.apache.sshd.common.channel.ChannelOutputStream;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.threads.ThreadUtils;
 import org.apache.sshd.server.channel.AbstractServerChannel;
@@ -45,6 +47,18 @@ import org.apache.tomcat.jni.Status;
  * The client side channel that will receive requests forwards by the SSH server.
  */
 public class ChannelAgentForwarding extends AbstractServerChannel {
+    /**
+     * Property that can be set on the factory manager in order to control
+     * the buffer size used to forward data from the established channel
+     *
+     * @see #MIN_FORWARDER_BUF_SIZE
+     * @see #MAX_FORWARDER_BUF_SIZE
+     * @see #DEFAULT_FORWARDER_BUF_SIZE
+     */
+    public static final String FORWARDER_BUFFER_SIZE = "channel-agent-fwd-buf-size";
+    public static final int MIN_FORWARDER_BUF_SIZE = Byte.MAX_VALUE;
+    public static final int DEFAULT_FORWARDER_BUF_SIZE = 1024;
+    public static final int MAX_FORWARDER_BUF_SIZE = Short.MAX_VALUE;
 
     private String authSocket;
     private long pool;
@@ -61,6 +75,7 @@ public class ChannelAgentForwarding extends AbstractServerChannel {
     @Override
     protected OpenFuture doInit(Buffer buffer) {
         final OpenFuture f = new DefaultOpenFuture(this);
+        ChannelListener listener = getChannelListenerProxy();
         try {
             out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
             authSocket = FactoryManagerUtils.getString(session, SshAgent.SSH_AUTHSOCKET_ENV_NAME);
@@ -74,12 +89,17 @@ public class ChannelAgentForwarding extends AbstractServerChannel {
             ExecutorService service = getExecutorService();
             forwardService = (service == null) ? ThreadUtils.newSingleThreadExecutor("ChannelAgentForwarding[" + authSocket + "]") : service;
             shutdownForwarder = (service == forwardService) ? isShutdownOnExit() : true;
+
+            final int copyBufSize = FactoryManagerUtils.getIntProperty(getSession(), FORWARDER_BUFFER_SIZE, DEFAULT_FORWARDER_BUF_SIZE);
+            ValidateUtils.checkTrue(copyBufSize >= MIN_FORWARDER_BUF_SIZE, "Copy buf size below min.: %d", copyBufSize);
+            ValidateUtils.checkTrue(copyBufSize <= MAX_FORWARDER_BUF_SIZE, "Copy buf size above max.: %d", copyBufSize);
+
             forwarder = forwardService.submit(new Runnable() {
                 @SuppressWarnings("synthetic-access")
                 @Override
                 public void run() {
                     try {
-                        byte[] buf = new byte[1024];
+                        byte[] buf = new byte[copyBufSize];
                         while (true) {
                             int len = Socket.recv(handle, buf, 0, buf.length);
                             if (len > 0) {
@@ -92,11 +112,20 @@ public class ChannelAgentForwarding extends AbstractServerChannel {
                     }
                 }
             });
-            f.setOpened();
 
-        } catch (Exception e) {
+            listener.channelOpenSuccess(this);
+            f.setOpened();
+        } catch (Exception t) {
+            Throwable e = GenericUtils.peelException(t);
+            try {
+                listener.channelOpenFailure(this, e);
+            } catch (Throwable ignored) {
+                log.warn("doInit({}) failed ({}) to inform listener of open failure={}: {}",
+                         this, ignored.getClass().getSimpleName(), e.getClass().getSimpleName(), ignored.getMessage());
+            }
             f.setException(e);
         }
+
         return f;
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/client/SessionFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/SessionFactory.java b/sshd-core/src/main/java/org/apache/sshd/client/SessionFactory.java
deleted file mode 100644
index 76527a2..0000000
--- a/sshd-core/src/main/java/org/apache/sshd/client/SessionFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sshd.client;
-
-import org.apache.sshd.client.session.ClientSessionImpl;
-import org.apache.sshd.common.io.IoSession;
-import org.apache.sshd.common.session.AbstractSession;
-import org.apache.sshd.common.session.AbstractSessionFactory;
-
-/**
- * A factory of client sessions.
- * This class can be used as a way to customize the creation of client sessions.
- *
- * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
- * @see org.apache.sshd.client.SshClient#setSessionFactory(SessionFactory)
- */
-public class SessionFactory extends AbstractSessionFactory {
-
-    protected ClientFactoryManager client;
-
-    public void setClient(ClientFactoryManager client) {
-        this.client = client;
-    }
-
-    @Override
-    protected AbstractSession doCreateSession(IoSession ioSession) throws Exception {
-        return new ClientSessionImpl(client, ioSession);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/client/SshClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/SshClient.java b/sshd-core/src/main/java/org/apache/sshd/client/SshClient.java
index c98ce14..6e21329 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/SshClient.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/SshClient.java
@@ -55,6 +55,7 @@ import org.apache.sshd.client.future.DefaultConnectFuture;
 import org.apache.sshd.client.session.ClientConnectionServiceFactory;
 import org.apache.sshd.client.session.ClientSession;
 import org.apache.sshd.client.session.ClientUserAuthServiceFactory;
+import org.apache.sshd.client.session.SessionFactory;
 import org.apache.sshd.common.AbstractFactoryManager;
 import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.Factory;
@@ -230,7 +231,6 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
 
         setupSessionTimeout(sessionFactory);
 
-        sessionFactory.setClient(this);
         connector = createConnector();
     }
 
@@ -276,17 +276,12 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
     }
 
     public ConnectFuture connect(String username, String host, int port) throws IOException {
-        assert host != null;
-        assert port >= 0;
-        if (connector == null) {
-            throw new IllegalStateException("SshClient not started. Please call start() method before connecting to a server");
-        }
-        SocketAddress address = new InetSocketAddress(host, port);
+        ValidateUtils.checkTrue(port >= 0, "Invalid port: %d", port);
+        SocketAddress address = new InetSocketAddress(ValidateUtils.checkNotNullAndNotEmpty(host, "No host"), port);
         return connect(username, address);
     }
 
     public ConnectFuture connect(final String username, SocketAddress address) {
-        assert address != null;
         if (connector == null) {
             throw new IllegalStateException("SshClient not started. Please call start() method before connecting to a server");
         }
@@ -313,7 +308,7 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
     }
 
     protected SessionFactory createSessionFactory() {
-        return new SessionFactory();
+        return new SessionFactory(this);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/client/SshKeyScan.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/SshKeyScan.java b/sshd-core/src/main/java/org/apache/sshd/client/SshKeyScan.java
index 2a21e75..8e1b49a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/SshKeyScan.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/SshKeyScan.java
@@ -301,7 +301,7 @@ public class SshKeyScan extends AbstractSimplifiedLog
             }
 
             try {
-                session.addListener(this);
+                session.addSessionListener(this);
                 if (isEnabled(Level.FINER)) {
                     log(Level.FINER, "Authenticating with key type=" + kt + " to " + remoteLocation);
                 }
@@ -324,7 +324,7 @@ public class SshKeyScan extends AbstractSimplifiedLog
                     }
                 }
             } finally {
-                session.removeListener(this);
+                session.removeSessionListener(this);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
index 1ad6023..8eccb4e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
@@ -32,9 +32,11 @@ import org.apache.sshd.common.channel.AbstractChannel;
 import org.apache.sshd.common.channel.Channel;
 import org.apache.sshd.common.channel.ChannelAsyncInputStream;
 import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
+import org.apache.sshd.common.channel.ChannelListener;
 import org.apache.sshd.common.channel.ChannelRequestHandler;
 import org.apache.sshd.common.io.IoInputStream;
 import org.apache.sshd.common.io.IoOutputStream;
+import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.common.util.io.IoUtils;
@@ -250,11 +252,22 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
     public void handleOpenSuccess(int recipient, int rwSize, int packetSize, Buffer buffer) {
         this.recipient = recipient;
         this.remoteWindow.init(rwSize, packetSize, session.getFactoryManager().getProperties());
+        ChannelListener listener = getChannelListenerProxy();
         try {
             doOpen();
+
+            listener.channelOpenSuccess(this);
             this.opened.set(true);
             this.openFuture.setOpened();
-        } catch (Exception e) {
+        } catch (Throwable t) {
+            Throwable e = GenericUtils.peelException(t);
+            try {
+                listener.channelOpenFailure(this, e);
+            } catch (Throwable ignored) {
+                log.warn("handleOpenSuccess({}) failed ({}) to inform listener of open failure={}: {}",
+                         this, ignored.getClass().getSimpleName(), e.getClass().getSimpleName(), ignored.getMessage());
+            }
+
             this.openFuture.setException(e);
             this.closeFuture.setClosed();
             this.doCloseImmediately();

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
index 77b2c70..9709f57 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
@@ -171,5 +171,4 @@ public class ChannelSession extends AbstractClientChannel {
             }
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSession.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSession.java
index 8059eae..b90eade 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSession.java
@@ -340,5 +340,4 @@ public interface ClientSession extends Session {
      */
     @SuppressWarnings("rawtypes")
     SshFuture switchToNoneCipher() throws IOException;
-
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
index 51f01bb..2b112ea 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
@@ -585,14 +585,14 @@ public class ClientSessionImpl extends AbstractSession implements ClientSession
     }
 
     @Override
-    protected void sendEvent(SessionListener.Event event) throws IOException {
+    protected void sendSessionEvent(SessionListener.Event event) throws IOException {
         if (event == SessionListener.Event.KeyEstablished) {
             sendInitialServiceRequest();
         }
         synchronized (lock) {
             lock.notifyAll();
         }
-        super.sendEvent(event);
+        super.sendSessionEvent(event);
     }
 
     protected void sendInitialServiceRequest() throws IOException {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java
index 7ddc0d4..0ef430c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java
@@ -220,10 +220,10 @@ public class ClientUserAuthService extends CloseableUtils.AbstractCloseable impl
 
     @Override
     protected void preClose() {
-        super.preClose();
         if (!authFuture.isDone()) {
             authFuture.setException(new SshException("Session is closed"));
         }
-    }
 
+        super.preClose();
+    }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/client/session/SessionFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/SessionFactory.java b/sshd-core/src/main/java/org/apache/sshd/client/session/SessionFactory.java
new file mode 100644
index 0000000..9e43bed
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/client/session/SessionFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.session;
+
+import org.apache.sshd.client.ClientFactoryManager;
+import org.apache.sshd.common.io.IoSession;
+import org.apache.sshd.common.session.AbstractSessionFactory;
+
+/**
+ * A factory of client sessions.
+ * This class can be used as a way to customize the creation of client sessions.
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ * @see org.apache.sshd.client.SshClient#setSessionFactory(SessionFactory)
+ */
+public class SessionFactory extends AbstractSessionFactory<ClientFactoryManager, ClientSessionImpl> {
+
+    public SessionFactory(ClientFactoryManager client) {
+        super(client);
+    }
+
+    public final ClientFactoryManager getClient() {
+        return getFactoryManager();
+    }
+
+    @Override
+    protected ClientSessionImpl doCreateSession(IoSession ioSession) throws Exception {
+        return new ClientSessionImpl(getClient(), ioSession);
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/client/subsystem/SubsystemClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/SubsystemClient.java b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/SubsystemClient.java
index 122c0ab..809b3af 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/SubsystemClient.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/SubsystemClient.java
@@ -21,6 +21,7 @@ package org.apache.sshd.client.subsystem;
 
 import java.nio.channels.Channel;
 
+import org.apache.sshd.client.channel.ClientChannel;
 import org.apache.sshd.client.session.ClientSession;
 import org.apache.sshd.common.NamedResource;
 
@@ -32,4 +33,9 @@ public interface SubsystemClient extends NamedResource, Channel {
      * @return The underlying {@link ClientSession} used
      */
     ClientSession getClientSession();
+
+    /**
+     * @return The underlying {@link ClientChannel} used
+     */
+    ClientChannel getClientChannel();
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/DefaultSftpClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/DefaultSftpClient.java b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/DefaultSftpClient.java
index 5f2c0d4..a4cf4a6 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/DefaultSftpClient.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/DefaultSftpClient.java
@@ -34,6 +34,7 @@ import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.sshd.client.channel.ChannelSubsystem;
+import org.apache.sshd.client.channel.ClientChannel;
 import org.apache.sshd.client.session.ClientSession;
 import org.apache.sshd.common.FactoryManagerUtils;
 import org.apache.sshd.common.SshException;
@@ -107,6 +108,11 @@ public class DefaultSftpClient extends AbstractSftpClient {
     }
 
     @Override
+    public ClientChannel getClientChannel() {
+        return channel;
+    }
+
+    @Override
     public Map<String, byte[]> getServerExtensions() {
         return exposedExtensions;
     }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileSystem.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileSystem.java b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileSystem.java
index de91de3..3532e64 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileSystem.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileSystem.java
@@ -38,6 +38,7 @@ import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.sshd.client.channel.ClientChannel;
 import org.apache.sshd.client.session.ClientSession;
 import org.apache.sshd.common.FactoryManagerUtils;
 import org.apache.sshd.common.file.util.BaseFileSystem;
@@ -208,6 +209,11 @@ public class SftpFileSystem extends BaseFileSystem<SftpPath> {
         }
 
         @Override
+        public ClientChannel getClientChannel() {
+            return delegate.getClientChannel();
+        }
+
+        @Override
         public Map<String, byte[]> getServerExtensions() {
             return delegate.getServerExtensions();
         }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/AbstractFactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/AbstractFactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/common/AbstractFactoryManager.java
index 461149a..962ade1 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/AbstractFactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/AbstractFactoryManager.java
@@ -18,15 +18,18 @@
  */
 package org.apache.sshd.common;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.sshd.agent.SshAgentFactory;
 import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.channel.ChannelListener;
 import org.apache.sshd.common.channel.RequestHandler;
 import org.apache.sshd.common.cipher.Cipher;
 import org.apache.sshd.common.compression.Compression;
@@ -42,9 +45,11 @@ import org.apache.sshd.common.mac.Mac;
 import org.apache.sshd.common.random.Random;
 import org.apache.sshd.common.session.AbstractSessionFactory;
 import org.apache.sshd.common.session.ConnectionService;
+import org.apache.sshd.common.session.SessionListener;
 import org.apache.sshd.common.session.SessionTimeoutListener;
 import org.apache.sshd.common.signature.Signature;
 import org.apache.sshd.common.util.CloseableUtils;
+import org.apache.sshd.common.util.EventListenerUtils;
 import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.threads.ThreadUtils;
 import org.apache.sshd.server.forward.ForwardingFilter;
@@ -56,7 +61,7 @@ import org.apache.sshd.server.forward.ForwardingFilter;
  */
 public abstract class AbstractFactoryManager extends CloseableUtils.AbstractInnerCloseable implements FactoryManager {
 
-    protected Map<String, Object> properties = new HashMap<String, Object>();
+    protected Map<String, Object> properties = new HashMap<>();
     protected IoServiceFactoryFactory ioServiceFactoryFactory;
     protected IoServiceFactory ioServiceFactory;
     protected List<NamedFactory<KeyExchange>> keyExchangeFactories;
@@ -77,9 +82,15 @@ public abstract class AbstractFactoryManager extends CloseableUtils.AbstractInne
     protected List<RequestHandler<ConnectionService>> globalRequestHandlers;
     protected SessionTimeoutListener sessionTimeoutListener;
     protected ScheduledFuture<?> timeoutListenerFuture;
+    protected final Collection<SessionListener> sessionListeners = new CopyOnWriteArraySet<>();
+    protected final SessionListener sessionListenerProxy;
+    protected final Collection<ChannelListener> channelListeners = new CopyOnWriteArraySet<>();
+    protected final ChannelListener channelListenerProxy;
 
     protected AbstractFactoryManager() {
-        super();
+        ClassLoader loader = getClass().getClassLoader();
+        sessionListenerProxy = EventListenerUtils.proxyWrapper(SessionListener.class, loader, sessionListeners);
+        channelListenerProxy = EventListenerUtils.proxyWrapper(ChannelListener.class, loader, channelListeners);
     }
 
     @Override
@@ -271,16 +282,76 @@ public abstract class AbstractFactoryManager extends CloseableUtils.AbstractInne
         this.globalRequestHandlers = globalRequestHandlers;
     }
 
-    protected void setupSessionTimeout(final AbstractSessionFactory sessionFactory) {
+    @Override
+    public void addSessionListener(SessionListener listener) {
+        ValidateUtils.checkNotNull(listener, "addSessionListener(%s) null instance", this);
+        // avoid race conditions on notifications while manager is being closed
+        if (!isOpen()) {
+            log.warn("addSessionListener({})[{}] ignore registration while manager is closing", this, listener);
+            return;
+        }
+
+        if (this.sessionListeners.add(listener)) {
+            log.trace("addSessionListener({})[{}] registered", this, listener);
+        } else {
+            log.trace("addSessionListener({})[{}] ignored duplicate", this, listener);
+        }
+    }
+
+    @Override
+    public void removeSessionListener(SessionListener listener) {
+        if (this.sessionListeners.remove(listener)) {
+            log.trace("removeSessionListener({})[{}] removed", this, listener);
+        } else {
+            log.trace("removeSessionListener({})[{}] not registered", this, listener);
+        }
+    }
+
+    @Override
+    public SessionListener getSessionListenerProxy() {
+        return sessionListenerProxy;
+    }
+
+    @Override
+    public void addChannelListener(ChannelListener listener) {
+        ValidateUtils.checkNotNull(listener, "addChannelListener(%s) null instance", this);
+        // avoid race conditions on notifications while manager is being closed
+        if (!isOpen()) {
+            log.warn("addChannelListener({})[{}] ignore registration while session is closing", this, listener);
+            return;
+        }
+
+        if (this.channelListeners.add(listener)) {
+            log.trace("addChannelListener({})[{}] registered", this, listener);
+        } else {
+            log.trace("addChannelListener({})[{}] ignored duplicate", this, listener);
+        }
+    }
+
+    @Override
+    public void removeChannelListener(ChannelListener listener) {
+        if (this.channelListeners.remove(listener)) {
+            log.trace("removeChannelListener({})[{}] removed", this, listener);
+        } else {
+            log.trace("removeChannelListener({})[{}] not registered", this, listener);
+        }
+    }
+
+    @Override
+    public ChannelListener getChannelListenerProxy() {
+        return channelListenerProxy;
+    }
+
+    protected void setupSessionTimeout(final AbstractSessionFactory<?, ?> sessionFactory) {
         // set up the the session timeout listener and schedule it
         sessionTimeoutListener = createSessionTimeoutListener();
-        sessionFactory.addListener(sessionTimeoutListener);
+        addSessionListener(sessionTimeoutListener);
 
         timeoutListenerFuture = getScheduledExecutorService()
                 .scheduleAtFixedRate(sessionTimeoutListener, 1, 1, TimeUnit.SECONDS);
     }
 
-    protected void removeSessionTimeout(final AbstractSessionFactory sessionFactory) {
+    protected void removeSessionTimeout(final AbstractSessionFactory<?, ?> sessionFactory) {
         stopSessionTimeoutListener(sessionFactory);
     }
 
@@ -288,19 +359,25 @@ public abstract class AbstractFactoryManager extends CloseableUtils.AbstractInne
         return new SessionTimeoutListener();
     }
 
-    protected void stopSessionTimeoutListener(final AbstractSessionFactory sessionFactory) {
+    protected void stopSessionTimeoutListener(final AbstractSessionFactory<?, ?> sessionFactory) {
         // cancel the timeout monitoring task
         if (timeoutListenerFuture != null) {
-            timeoutListenerFuture.cancel(true);
-            timeoutListenerFuture = null;
+            try {
+                timeoutListenerFuture.cancel(true);
+            } finally {
+                timeoutListenerFuture = null;
+            }
         }
 
         // remove the sessionTimeoutListener completely; should the SSH server/client be restarted, a new one
         // will be created.
-        if (sessionFactory != null && sessionTimeoutListener != null) {
-            sessionFactory.removeListener(sessionTimeoutListener);
+        if (sessionTimeoutListener != null) {
+            try {
+                removeSessionListener(sessionTimeoutListener);
+            } finally {
+                sessionTimeoutListener = null;
+            }
         }
-        sessionTimeoutListener = null;
     }
 
     protected void checkConfig() {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
index 734fb6a..ee8f168 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.sshd.agent.SshAgentFactory;
 import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.channel.ChannelListenerManager;
 import org.apache.sshd.common.channel.RequestHandler;
 import org.apache.sshd.common.cipher.Cipher;
 import org.apache.sshd.common.compression.Compression;
@@ -35,6 +36,7 @@ import org.apache.sshd.common.keyprovider.KeyPairProvider;
 import org.apache.sshd.common.mac.Mac;
 import org.apache.sshd.common.random.Random;
 import org.apache.sshd.common.session.ConnectionService;
+import org.apache.sshd.common.session.SessionListenerManager;
 import org.apache.sshd.common.signature.Signature;
 import org.apache.sshd.server.forward.ForwardingFilter;
 
@@ -44,7 +46,7 @@ import org.apache.sshd.server.forward.ForwardingFilter;
  *
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public interface FactoryManager {
+public interface FactoryManager extends SessionListenerManager, ChannelListenerManager {
 
     /**
      * Key used to retrieve the value of the window size in the

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index e468da4..04ed9ad 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -38,8 +39,10 @@ import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.CloseableUtils;
+import org.apache.sshd.common.util.EventListenerUtils;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.Int2IntFunction;
+import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.BufferUtils;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
@@ -80,6 +83,11 @@ public abstract class AbstractChannel
     protected AtomicReference<GracefulState> gracefulState = new AtomicReference<GracefulState>(GracefulState.Opened);
     protected final DefaultCloseFuture gracefulFuture = new DefaultCloseFuture(lock);
     protected final List<RequestHandler<Channel>> handlers = new ArrayList<RequestHandler<Channel>>();
+    /**
+     * Channel events listener
+     */
+    protected final Collection<ChannelListener> channelListeners = new CopyOnWriteArraySet<>();
+    protected final ChannelListener channelListenerProxy;
 
     protected AbstractChannel() {
         this("");
@@ -87,6 +95,7 @@ public abstract class AbstractChannel
 
     protected AbstractChannel(String discriminator) {
         super(discriminator);
+        channelListenerProxy = EventListenerUtils.proxyWrapper(ChannelListener.class, getClass().getClassLoader(), channelListeners);
     }
 
     public void addRequestHandler(RequestHandler<Channel> handler) {
@@ -195,10 +204,20 @@ public abstract class AbstractChannel
     }
 
     @Override
-    public void init(ConnectionService service, Session session, int id) {
+    public void init(ConnectionService service, Session session, int id) throws IOException {
         this.service = service;
         this.session = session;
         this.id = id;
+
+        ChannelListener listener = session.getChannelListenerProxy();
+        try {
+            listener.channelInitialized(this);
+        } catch (RuntimeException t) {
+            Throwable e = GenericUtils.peelException(t);
+            throw new IOException("Failed (" + e.getClass().getSimpleName() + ") to notify channel " + toString() + " initialization: " + e.getMessage(), e);
+        }
+        // delegate the rest of the notifications to the channel
+        addChannelListener(listener);
         configureWindow();
     }
 
@@ -209,6 +228,36 @@ public abstract class AbstractChannel
     }
 
     @Override
+    public void addChannelListener(ChannelListener listener) {
+        ValidateUtils.checkNotNull(listener, "addChannelListener(%s) null instance", this);
+        // avoid race conditions on notifications while channel is being closed
+        if (!isOpen()) {
+            log.warn("addChannelListener({})[{}] ignore registration while channel is closing", this, listener);
+            return;
+        }
+
+        if (this.channelListeners.add(listener)) {
+            log.trace("addChannelListener({})[{}] registered", this, listener);
+        } else {
+            log.trace("addChannelListener({})[{}] ignored duplicate", this, listener);
+        }
+    }
+
+    @Override
+    public void removeChannelListener(ChannelListener listener) {
+        if (this.channelListeners.remove(listener)) {
+            log.trace("removeChannelListener({})[{}] removed", this, listener);
+        } else {
+            log.trace("removeChannelListener({})[{}] not registered", this, listener);
+        }
+    }
+
+    @Override
+    public ChannelListener getChannelListenerProxy() {
+        return channelListenerProxy;
+    }
+
+    @Override
     public void handleClose() throws IOException {
         log.debug("Received SSH_MSG_CHANNEL_CLOSE on channel {}", this);
         if (gracefulState.compareAndSet(GracefulState.Opened, GracefulState.CloseReceived)) {
@@ -252,8 +301,9 @@ public abstract class AbstractChannel
                 gracefulFuture.setClosed();
             } else if (!gracefulFuture.isClosed()) {
                 log.debug("Send SSH_MSG_CHANNEL_CLOSE on channel {}", AbstractChannel.this);
-                Buffer buffer = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_CLOSE);
-                buffer.putInt(recipient);
+                Session s = getSession();
+                Buffer buffer = s.createBuffer(SshConstants.SSH_MSG_CHANNEL_CLOSE);
+                buffer.putInt(getRecipient());
                 try {
                     long timeout = FactoryManagerUtils.getLongProperty(getSession(), FactoryManager.CHANNEL_CLOSE_TIMEOUT, DEFAULT_CHANNEL_CLOSE_TIMEOUT);
                     session.writePacket(buffer, timeout, TimeUnit.MILLISECONDS).addListener(new SshFutureListener<IoWriteFuture>() {
@@ -292,6 +342,20 @@ public abstract class AbstractChannel
     }
 
     @Override
+    protected void preClose() {
+        ChannelListener listener = getChannelListenerProxy();
+        try {
+            listener.channelClosed(this);
+        } catch (RuntimeException t) {
+            Throwable e = GenericUtils.peelException(t);
+            log.warn(e.getClass().getSimpleName() + " while signal channel " + toString() + " closed: " + e.getMessage(), e);
+        } finally {
+            // clear the listeners since we are closing the channel (quicker GC)
+            this.channelListeners.clear();
+        }
+    }
+
+    @Override
     protected void doCloseImmediately() {
         if (service != null) {
             service.unregisterChannel(AbstractChannel.this);
@@ -301,9 +365,10 @@ public abstract class AbstractChannel
 
     protected void writePacket(Buffer buffer) throws IOException {
         if (!isClosing()) {
-            session.writePacket(buffer);
+            Session s = getSession();
+            s.writePacket(buffer);
         } else {
-            log.debug("Discarding output packet because channel is being closed");
+            log.debug("writePacket({}) Discarding output packet because channel is being closed", this);
         }
     }
 
@@ -326,8 +391,9 @@ public abstract class AbstractChannel
         // Only accept extended data for stderr
         if (ex != 1) {
             log.debug("Send SSH_MSG_CHANNEL_FAILURE on channel {}", this);
-            buffer = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_FAILURE);
-            buffer.putInt(recipient);
+            Session s = getSession();
+            buffer = s.createBuffer(SshConstants.SSH_MSG_CHANNEL_FAILURE);
+            buffer.putInt(getRecipient());
             writePacket(buffer);
             return;
         }
@@ -376,27 +442,29 @@ public abstract class AbstractChannel
 
     protected void sendEof() throws IOException {
         log.debug("Send SSH_MSG_CHANNEL_EOF on channel {}", this);
-        Buffer buffer = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_EOF);
-        buffer.putInt(recipient);
+        Session s = getSession();
+        Buffer buffer = s.createBuffer(SshConstants.SSH_MSG_CHANNEL_EOF);
+        buffer.putInt(getRecipient());
         writePacket(buffer);
     }
 
     protected void configureWindow() {
-        localWindow.init(session);
+        localWindow.init(getSession());
     }
 
     protected void sendWindowAdjust(int len) throws IOException {
         if (log.isDebugEnabled()) {
             log.debug("Send SSH_MSG_CHANNEL_WINDOW_ADJUST on channel {}", Integer.valueOf(id));
         }
-        Buffer buffer = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_WINDOW_ADJUST);
-        buffer.putInt(recipient);
+        Session s = getSession();
+        Buffer buffer = s.createBuffer(SshConstants.SSH_MSG_CHANNEL_WINDOW_ADJUST);
+        buffer.putInt(getRecipient());
         buffer.putInt(len);
         writePacket(buffer);
     }
 
     @Override
     public String toString() {
-        return getClass().getSimpleName() + "[id=" + id + ", recipient=" + recipient + "]";
+        return getClass().getSimpleName() + "[id=" + getId() + ", recipient=" + getRecipient() + "]";
     }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java
index 1d55d8a..e4b0635 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java
@@ -28,11 +28,12 @@ import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.buffer.Buffer;
 
 /**
- * TODO Add javadoc
+ * Represents a channel opened over an SSH session - holds information that is
+ * common both to server and client channels.
  *
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public interface Channel extends Closeable {
+public interface Channel extends ChannelListenerManager, Closeable {
 
     /**
      * @return Local channel identifier
@@ -109,5 +110,4 @@ public interface Channel extends Closeable {
      * @throws IOException If failed to handle the success
      */
     void handleOpenFailure(Buffer buffer) throws IOException;
-
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
index b1798be..eaef733 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
@@ -33,6 +33,9 @@ import org.apache.sshd.common.util.Readable;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
 public class ChannelAsyncInputStream extends CloseableUtils.AbstractCloseable implements IoInputStream {
 
     private final Channel channel;
@@ -76,6 +79,7 @@ public class ChannelAsyncInputStream extends CloseableUtils.AbstractCloseable im
                 }
             }
         }
+        super.preClose();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java
new file mode 100644
index 0000000..4a4078c
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.channel;
+
+import java.util.EventListener;
+
+/**
+ * Provides a simple listener for client / server channels being established
+ * or torn down. <B>Note:</B> for server-side listeners, some of the established
+ * channels may be <U>client</U> - especially where connection proxy or forwarding
+ * is concerned
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface ChannelListener extends EventListener {
+    /**
+     * Called to inform about initial setup of a channel via the
+     * {@link Channel#init(org.apache.sshd.common.session.ConnectionService, org.apache.sshd.common.session.Session, int)}
+     * method. <B>Note:</B> this method is guaranteed to be called
+     * before either of the {@link #channelOpenSuccess(Channel)} or
+     * {@link #channelOpenFailure(Channel, Throwable)} will be called
+     *
+     * @param channel The initialized {@link Channel}
+     */
+    void channelInitialized(Channel channel);
+
+    /**
+     * Called to inform about a channel being successfully opened for a
+     * session. <B>Note:</B> when the call is made, the channel is known
+     * to be open but nothing beyond that.
+     *
+     * @param channel The newly opened {@link Channel}
+     */
+    void channelOpenSuccess(Channel channel);
+
+    /**
+     * Called to inform about the failure to open a channel
+     *
+     * @param channel The failed {@link Channel}
+     * @param reason The {@link Throwable} reason - <B>Note:</B> if the
+     * {@link #channelOpenSuccess(Channel)} notification throws an exception
+     * it will cause this method to be invoked
+     */
+    void channelOpenFailure(Channel channel, Throwable reason);
+
+    /**
+     * Called to inform about a channel being closed. <B>Note:</B> when the call
+     * is made there are no guarantees about the channel's actual state
+     * except that it either has been already closed or may be in the process
+     * of being closed. <B>Note:</B> this method is guaranteed to be called
+     * regardless of whether {@link #channelOpenSuccess(Channel)} or
+     * {@link #channelOpenFailure(Channel, Throwable)} have been called
+     *
+     * @param channel The referenced {@link Channel}
+     */
+    void channelClosed(Channel channel);
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListenerManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListenerManager.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListenerManager.java
new file mode 100644
index 0000000..511379b
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListenerManager.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.channel;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface ChannelListenerManager {
+    /**
+     * Add a channel listener
+     *
+     * @param listener The {@link ChannelListener} to add - not {@code null}
+     */
+    void addChannelListener(ChannelListener listener);
+
+    /**
+     * Remove a channel listener
+     *
+     * @param listener The {@link ChannelListener} to remove
+     */
+    void removeChannelListener(ChannelListener listener);
+
+    /**
+     * @return A (never {@code null} proxy {@link ChannelListener} that represents
+     * all the currently registered listeners. Any method invocation on the proxy
+     * is replicated to the currently registered listeners
+     */
+    ChannelListener getChannelListenerProxy();
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
index a904bc2..3df0526 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
 import java.io.InterruptedIOException;
 import java.net.SocketException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -44,20 +45,20 @@ public class ChannelPipedInputStream extends InputStream implements ChannelPiped
     private final Window localWindow;
     private final Buffer buffer = new ByteArrayBuffer();
     private final byte[] b = new byte[1];
-    private boolean closed;
-    private boolean eofSent;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final AtomicBoolean eofSent = new AtomicBoolean(false);
 
     private final Lock lock = new ReentrantLock();
     private final Condition dataAvailable = lock.newCondition();
 
-    private long timeout;
-
     /**
      * {@link ChannelPipedOutputStream} is already closed and so we will not receive additional data.
      * This is different from the {@link #closed}, which indicates that the reader of this {@link InputStream}
      * will not be reading data any more.
      */
-    private boolean writerClosed;
+    private final AtomicBoolean writerClosed = new AtomicBoolean(false);
+
+    private long timeout;
 
     public ChannelPipedInputStream(Window localWindow) {
         this.localWindow = ValidateUtils.checkNotNull(localWindow, "No local window provided");
@@ -77,7 +78,7 @@ public class ChannelPipedInputStream extends InputStream implements ChannelPiped
         lock.lock();
         try {
             int avail = buffer.available();
-            if (avail == 0 && writerClosed) {
+            if ((avail == 0) && writerClosed.get()) {
                 return -1;
             }
             return avail;
@@ -103,14 +104,14 @@ public class ChannelPipedInputStream extends InputStream implements ChannelPiped
         lock.lock();
         try {
             for (int index = 0;; index++) {
-                if ((closed && writerClosed && eofSent) || (closed && !writerClosed)) {
+                if ((closed.get() && writerClosed.get() && eofSent.get()) || (closed.get() && (!writerClosed.get()))) {
                     throw new IOException("Pipe closed after " + index + " cycles");
                 }
                 if (buffer.available() > 0) {
                     break;
                 }
-                if (writerClosed) {
-                    eofSent = true;
+                if (writerClosed.get()) {
+                    eofSent.set(true);
                     return -1; // no more data to read
                 }
 
@@ -146,7 +147,7 @@ public class ChannelPipedInputStream extends InputStream implements ChannelPiped
     public void eof() {
         lock.lock();
         try {
-            writerClosed = true;
+            writerClosed.set(true);
             dataAvailable.signalAll();
         } finally {
             lock.unlock();
@@ -159,7 +160,7 @@ public class ChannelPipedInputStream extends InputStream implements ChannelPiped
         try {
             dataAvailable.signalAll();
         } finally {
-            closed = true;
+            closed.set(true);
             lock.unlock();
         }
     }
@@ -168,7 +169,7 @@ public class ChannelPipedInputStream extends InputStream implements ChannelPiped
     public void receive(byte[] bytes, int off, int len) throws IOException {
         lock.lock();
         try {
-            if (writerClosed || closed) {
+            if (writerClosed.get() || closed.get()) {
                 throw new IOException("Pipe closed");
             }
             buffer.putRawBytes(bytes, off, len);

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java b/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
index 2b911e1..92c915c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
@@ -150,10 +150,19 @@ public class DefaultSshFuture<T extends SshFuture> extends AbstractLoggingBean i
         Class<?> actualType = value.getClass();
         if (expectedType.isAssignableFrom(actualType)) {
             return expectedType.cast(value);
-        } else if (IOException.class.isAssignableFrom(actualType)) {
-            throw (IOException) value;
-        } else if (Throwable.class.isAssignableFrom(actualType)) {
-            Throwable t = (Throwable) value;
+        }
+
+        if (Throwable.class.isAssignableFrom(actualType)) {
+            Throwable t = GenericUtils.peelException((Throwable) value);
+            if (t != value) {
+                value = t;
+                actualType = value.getClass();
+            }
+
+            if (IOException.class.isAssignableFrom(actualType)) {
+                throw (IOException) value;
+            }
+
             throw new SshException("Failed (" + t.getClass().getSimpleName() + ") to execute: " + t.getMessage(), GenericUtils.resolveExceptionCause(t));
         } else {    // what else can it be ????
             throw new StreamCorruptedException("Unknown result type: " + actualType.getName());

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
index 9ccc2da..8b1b59d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
@@ -21,13 +21,13 @@ package org.apache.sshd.common.session;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.nio.charset.StandardCharsets;
+import java.util.Collection;
 import java.util.EnumMap;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -43,6 +43,7 @@ import org.apache.sshd.common.NamedResource;
 import org.apache.sshd.common.Service;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.channel.ChannelListener;
 import org.apache.sshd.common.cipher.Cipher;
 import org.apache.sshd.common.compression.Compression;
 import org.apache.sshd.common.digest.Digest;
@@ -65,21 +66,12 @@ import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.BufferUtils;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 
-import static org.apache.sshd.common.SshConstants.SSH_MSG_DEBUG;
-import static org.apache.sshd.common.SshConstants.SSH_MSG_DISCONNECT;
-import static org.apache.sshd.common.SshConstants.SSH_MSG_IGNORE;
-import static org.apache.sshd.common.SshConstants.SSH_MSG_KEXINIT;
-import static org.apache.sshd.common.SshConstants.SSH_MSG_NEWKEYS;
-import static org.apache.sshd.common.SshConstants.SSH_MSG_SERVICE_ACCEPT;
-import static org.apache.sshd.common.SshConstants.SSH_MSG_SERVICE_REQUEST;
-import static org.apache.sshd.common.SshConstants.SSH_MSG_UNIMPLEMENTED;
-
 /**
  * <P>
  * The AbstractSession handles all the basic SSH protocol such as key exchange, authentication,
  * encoding and decoding. Both server side and client side sessions should inherit from this
  * abstract class. Some basic packet processing methods are defined but the actual call to these
- * methods should be done from the {@link #handleMessage(org.apache.sshd.common.util.buffer.Buffer)}
+ * methods should be done from the {@link #handleMessage(Buffer)}
  * method, which is dependent on the state and side of this session.
  * </P>
  *
@@ -128,8 +120,15 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
     /**
      * Session listeners container
      */
-    protected final List<SessionListener> listeners = new CopyOnWriteArrayList<SessionListener>();
+    protected final Collection<SessionListener> sessionListeners = new CopyOnWriteArraySet<>();
     protected final SessionListener sessionListenerProxy;
+
+    /**
+     * Channel events listener
+     */
+    protected final Collection<ChannelListener> channelListeners = new CopyOnWriteArraySet<>();
+    protected final ChannelListener channelListenerProxy;
+
     //
     // Key exchange support
     //
@@ -202,7 +201,11 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
         this.isServer = isServer;
         this.factoryManager = ValidateUtils.checkNotNull(factoryManager, "No factory manager provided", GenericUtils.EMPTY_OBJECT_ARRAY);
         this.ioSession = ioSession;
-        sessionListenerProxy = EventListenerUtils.proxyWrapper(SessionListener.class, getClass().getClassLoader(), listeners);
+
+        ClassLoader loader = getClass().getClassLoader();
+        sessionListenerProxy = EventListenerUtils.proxyWrapper(SessionListener.class, loader, sessionListeners);
+        channelListenerProxy = EventListenerUtils.proxyWrapper(ChannelListener.class, loader, channelListeners);
+
         random = factoryManager.getRandomFactory().create();
         authTimeoutMs = getLongProperty(FactoryManager.AUTH_TIMEOUT, authTimeoutMs);
         authTimeoutTimestamp = System.currentTimeMillis() + authTimeoutMs;
@@ -301,7 +304,7 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
     @Override
     public void setAuthenticated() throws IOException {
         this.authed = true;
-        sendEvent(SessionListener.Event.Authenticated);
+        sendSessionEvent(SessionListener.Event.Authenticated);
     }
 
     /**
@@ -353,22 +356,22 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
     protected void doHandleMessage(Buffer buffer) throws Exception {
         int cmd = buffer.getUByte();
         switch (cmd) {
-            case SSH_MSG_DISCONNECT: {
+            case SshConstants.SSH_MSG_DISCONNECT: {
                 handleDisconnect(buffer);
                 break;
             }
-            case SSH_MSG_IGNORE: {
+            case SshConstants.SSH_MSG_IGNORE: {
                 log.debug("Received SSH_MSG_IGNORE");
                 break;
             }
-            case SSH_MSG_UNIMPLEMENTED: {
+            case SshConstants.SSH_MSG_UNIMPLEMENTED: {
                 int code = buffer.getInt();
                 if (log.isDebugEnabled()) {
                     log.debug("Received SSH_MSG_UNIMPLEMENTED #{}", Integer.valueOf(code));
                 }
                 break;
             }
-            case SSH_MSG_DEBUG: {
+            case SshConstants.SSH_MSG_DEBUG: {
                 boolean display = buffer.getBoolean();
                 String msg = buffer.getString();
                 if (log.isDebugEnabled()) {
@@ -376,16 +379,16 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
                 }
                 break;
             }
-            case SSH_MSG_SERVICE_REQUEST:
+            case SshConstants.SSH_MSG_SERVICE_REQUEST:
                 handleServiceRequest(buffer);
                 break;
-            case SSH_MSG_SERVICE_ACCEPT:
+            case SshConstants.SSH_MSG_SERVICE_ACCEPT:
                 handleServiceAccept();
                 break;
-            case SSH_MSG_KEXINIT:
+            case SshConstants.SSH_MSG_KEXINIT:
                 handleKexInit(buffer);
                 break;
-            case SSH_MSG_NEWKEYS:
+            case SshConstants.SSH_MSG_NEWKEYS:
                 handleNewKeys(cmd);
                 break;
             default:
@@ -420,7 +423,7 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
     private void handleServiceRequest(Buffer buffer) throws IOException {
         String service = buffer.getString();
         log.debug("Received SSH_MSG_SERVICE_REQUEST '{}'", service);
-        validateKexState(SSH_MSG_SERVICE_REQUEST, KexState.DONE);
+        validateKexState(SshConstants.SSH_MSG_SERVICE_REQUEST, KexState.DONE);
         try {
             startService(service);
         } catch (Exception e) {
@@ -434,39 +437,39 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
         writePacket(response);
     }
 
-    private void handleServiceAccept() throws java.io.IOException {
+    private void handleServiceAccept() throws IOException {
         log.debug("Received SSH_MSG_SERVICE_ACCEPT");
-        validateKexState(SSH_MSG_SERVICE_ACCEPT, org.apache.sshd.common.kex.KexState.DONE);
+        validateKexState(SshConstants.SSH_MSG_SERVICE_ACCEPT, KexState.DONE);
         serviceAccept();
     }
 
-    private void handleKexInit(org.apache.sshd.common.util.buffer.Buffer buffer) throws Exception {
+    private void handleKexInit(Buffer buffer) throws Exception {
         log.debug("Received SSH_MSG_KEXINIT");
         receiveKexInit(buffer);
-        if (kexState.compareAndSet(org.apache.sshd.common.kex.KexState.DONE, org.apache.sshd.common.kex.KexState.RUN)) {
+        if (kexState.compareAndSet(KexState.DONE, KexState.RUN)) {
             sendKexInit();
-        } else if (!kexState.compareAndSet(org.apache.sshd.common.kex.KexState.INIT, org.apache.sshd.common.kex.KexState.RUN)) {
+        } else if (!kexState.compareAndSet(KexState.INIT, KexState.RUN)) {
             throw new IllegalStateException("Received SSH_MSG_KEXINIT while key exchange is running");
         }
 
-        java.util.Map<org.apache.sshd.common.kex.KexProposalOption, String> result = negotiate();
-        String kexAlgorithm = result.get(org.apache.sshd.common.kex.KexProposalOption.ALGORITHMS);
-        kex = org.apache.sshd.common.util.ValidateUtils.checkNotNull(org.apache.sshd.common.NamedFactory.Utils.create(factoryManager.getKeyExchangeFactories(), kexAlgorithm),
+        Map<KexProposalOption, String> result = negotiate();
+        String kexAlgorithm = result.get(KexProposalOption.ALGORITHMS);
+        kex = ValidateUtils.checkNotNull(NamedFactory.Utils.create(factoryManager.getKeyExchangeFactories(), kexAlgorithm),
                 "Unknown negotiated KEX algorithm: %s",
                 kexAlgorithm);
-        kex.init(this, serverVersion.getBytes(java.nio.charset.StandardCharsets.UTF_8), clientVersion.getBytes(java.nio.charset.StandardCharsets.UTF_8), i_s, i_c);
+        kex.init(this, serverVersion.getBytes(StandardCharsets.UTF_8), clientVersion.getBytes(StandardCharsets.UTF_8), i_s, i_c);
 
-        sendEvent(org.apache.sshd.common.session.SessionListener.Event.KexCompleted);
+        sendSessionEvent(SessionListener.Event.KexCompleted);
     }
 
     private void handleNewKeys(int cmd) throws Exception {
         log.debug("Received SSH_MSG_NEWKEYS");
-        validateKexState(cmd, org.apache.sshd.common.kex.KexState.KEYS);
+        validateKexState(cmd, KexState.KEYS);
         receiveNewKeys();
         if (reexchangeFuture != null) {
             reexchangeFuture.setValue(Boolean.TRUE);
         }
-        sendEvent(org.apache.sshd.common.session.SessionListener.Event.KeyEstablished);
+        sendSessionEvent(SessionListener.Event.KeyEstablished);
         synchronized (pendingPackets) {
             if (!pendingPackets.isEmpty()) {
                 log.debug("Dequeing pending packets");
@@ -477,7 +480,7 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
                     }
                 }
             }
-            kexState.set(org.apache.sshd.common.kex.KexState.DONE);
+            kexState.set(KexState.DONE);
         }
         synchronized (lock) {
             lock.notifyAll();
@@ -495,7 +498,7 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
      * Handle any exceptions that occurred on this session.
      * The session will be closed and a disconnect packet will be
      * sent before if the given exception is an
-     * {@link org.apache.sshd.common.SshException}.
+     * {@link SshException}.
      *
      * @param t the exception to process
      */
@@ -534,10 +537,19 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
     }
 
     @Override
-    protected void doCloseImmediately() {
-        super.doCloseImmediately();
+    protected void preClose() {
         // Fire 'close' event
-        sessionListenerProxy.sessionClosed(this);
+        SessionListener listener = getSessionListenerProxy();
+        try {
+            listener.sessionClosed(this);
+        } catch (RuntimeException t) {
+            Throwable e = GenericUtils.peelException(t);
+            log.warn(e.getClass().getSimpleName() + " while signal session " + toString() + " closed: " + e.getMessage(), e);
+        } finally {
+            // clear the listeners since we are closing the session (quicker GC)
+            this.sessionListeners.clear();
+            this.channelListeners.clear();
+        }
     }
 
     protected Service[] getServices() {
@@ -561,7 +573,7 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
      *
      * @param buffer the buffer to encode and send
      * @return a future that can be used to check when the packet has actually been sent
-     * @throws java.io.IOException if an error occured when encoding sending the packet
+     * @throws IOException if an error occurred when encoding sending the packet
      */
     @Override
     public IoWriteFuture writePacket(Buffer buffer) throws IOException {
@@ -630,7 +642,7 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
      *
      * @param buffer the buffer containing the global request
      * @return <code>true</code> if the request was successful, <code>false</code> otherwise.
-     * @throws java.io.IOException if an error occured when encoding sending the packet
+     * @throws IOException if an error occurred when encoding sending the packet
      */
     @Override
     public Buffer request(Buffer buffer) throws IOException {
@@ -873,7 +885,7 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
     /**
      * Read the other side identification.
      * This method is specific to the client or server side, but both should call
-     * {@link #doReadIdentification(org.apache.sshd.common.util.buffer.Buffer, boolean)} and
+     * {@link #doReadIdentification(Buffer, boolean)} and
      * store the result in the needed property.
      *
      * @param buffer the buffer containing the remote identification
@@ -1397,18 +1409,68 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
     }
 
     @Override
-    public void addListener(SessionListener listener) {
-        ValidateUtils.checkNotNull(listener, "addListener(%s) null instance", this);
-        this.listeners.add(listener);
+    public void addSessionListener(SessionListener listener) {
+        ValidateUtils.checkNotNull(listener, "addSessionListener(%s) null instance", this);
+        // avoid race conditions on notifications while session is being closed
+        if (!isOpen()) {
+            log.warn("addSessionListener({})[{}] ignore registration while session is closing", this, listener);
+            return;
+        }
+
+        if (this.sessionListeners.add(listener)) {
+            log.trace("addSessionListener({})[{}] registered", this, listener);
+        } else {
+            log.trace("addSessionListener({})[{}] ignored duplicate", this, listener);
+        }
+    }
+
+    @Override
+    public void removeSessionListener(SessionListener listener) {
+        if (this.sessionListeners.remove(listener)) {
+            log.trace("removeSessionListener({})[{}] removed", this, listener);
+        } else {
+            log.trace("removeSessionListener({})[{}] not registered", this, listener);
+        }
+    }
+
+    @Override
+    public SessionListener getSessionListenerProxy() {
+        return sessionListenerProxy;
+    }
+
+    @Override
+    public void addChannelListener(ChannelListener listener) {
+        ValidateUtils.checkNotNull(listener, "addChannelListener(%s) null instance", this);
+        // avoid race conditions on notifications while session is being closed
+        if (!isOpen()) {
+            log.warn("addChannelListener({})[{}] ignore registration while session is closing", this, listener);
+            return;
+        }
+
+        if (this.channelListeners.add(listener)) {
+            log.trace("addChannelListener({})[{}] registered", this, listener);
+        } else {
+            log.trace("addChannelListener({})[{}] ignored duplicate", this, listener);
+        }
+    }
+
+    @Override
+    public void removeChannelListener(ChannelListener listener) {
+        if (this.channelListeners.remove(listener)) {
+            log.trace("removeChannelListener({})[{}] removed", this, listener);
+        } else {
+            log.trace("removeChannelListener({})[{}] not registered", this, listener);
+        }
     }
 
     @Override
-    public void removeListener(SessionListener listener) {
-        this.listeners.remove(listener);
+    public ChannelListener getChannelListenerProxy() {
+        return channelListenerProxy;
     }
 
-    protected void sendEvent(SessionListener.Event event) throws IOException {
-        sessionListenerProxy.sessionEvent(this, event);
+    protected void sendSessionEvent(SessionListener.Event event) throws IOException {
+        SessionListener listener = getSessionListenerProxy();
+        listener.sessionEvent(this, event);
     }
 
     @Override
@@ -1509,11 +1571,11 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
     protected void checkForTimeouts() throws IOException {
         if (!isClosing()) {
             long now = System.currentTimeMillis();
-            if (!authed && authTimeoutMs > 0 && now > authTimeoutTimestamp) {
+            if ((!authed) && (authTimeoutMs > 0L) && (now > authTimeoutTimestamp)) {
                 timeoutStatus.set(TimeoutStatus.AuthTimeout);
                 disconnect(SshConstants.SSH2_DISCONNECT_PROTOCOL_ERROR, "Session has timed out waiting for authentication after " + authTimeoutMs + " ms.");
             }
-            if (idleTimeoutMs > 0 && idleTimeoutTimestamp > 0 && now > idleTimeoutTimestamp) {
+            if ((idleTimeoutMs > 0) && (idleTimeoutTimestamp > 0L) && (now > idleTimeoutTimestamp)) {
                 timeoutStatus.set(TimeoutStatus.AuthTimeout);
                 disconnect(SshConstants.SSH2_DISCONNECT_PROTOCOL_ERROR, "User session has timed out idling after " + idleTimeoutMs + " ms.");
             }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSessionFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSessionFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSessionFactory.java
index eb8636b..37fa761 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSessionFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSessionFactory.java
@@ -18,9 +18,7 @@
  */
 package org.apache.sshd.common.session;
 
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
+import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.util.ValidateUtils;
 
@@ -29,45 +27,32 @@ import org.apache.sshd.common.util.ValidateUtils;
  *
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public abstract class AbstractSessionFactory extends AbstractSessionIoHandler {
+public abstract class AbstractSessionFactory<M extends FactoryManager, S extends AbstractSession> extends AbstractSessionIoHandler {
+    private final M manager;
 
-    protected final List<SessionListener> listeners = new CopyOnWriteArrayList<>();
+    protected AbstractSessionFactory(M manager) {
+        this.manager = ValidateUtils.checkNotNull(manager, "No factory manager instance");
+    }
 
-    protected AbstractSessionFactory() {
-        super();
+    public M getFactoryManager() {
+        return manager;
     }
 
     @Override
-    protected AbstractSession createSession(IoSession ioSession) throws Exception {
-        AbstractSession session = doCreateSession(ioSession);
-
-        for (SessionListener l : listeners) {
-            l.sessionCreated(session);
-            session.addListener(l);
-        }
-
-        return session;
+    protected S createSession(IoSession ioSession) throws Exception {
+        return setupSession(doCreateSession(ioSession));
     }
 
-    protected abstract AbstractSession doCreateSession(IoSession ioSession) throws Exception;
+    protected abstract S doCreateSession(IoSession ioSession) throws Exception;
 
-    /**
-     * Add a session |listener|.
-     *
-     * @param listener the session listener to add
-     */
-    public void addListener(SessionListener listener) {
-        ValidateUtils.checkNotNull(listener, "addListener(%s) no listener", this);
-        this.listeners.add(listener);
-    }
-
-    /**
-     * Remove a session |listener|.
-     *
-     * @param listener the session listener to remove
-     */
-    public void removeListener(SessionListener listener) {
-        this.listeners.remove(listener);
+    protected S setupSession(S session) throws Exception {
+        FactoryManager listenersManager = getFactoryManager();
+        SessionListener sessionListener = listenersManager.getSessionListenerProxy();
+        // Inform the listener of the newly created session
+        sessionListener.sessionCreated(session);
+        // Delegate the task of further notifications to the session
+        session.addSessionListener(sessionListener);
+        session.addChannelListener(listenersManager.getChannelListenerProxy());
+        return session;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
index 66616af..ec35195 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.Service;
+import org.apache.sshd.common.channel.ChannelListenerManager;
 import org.apache.sshd.common.future.SshFuture;
 import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.io.IoWriteFuture;
@@ -36,7 +37,7 @@ import org.apache.sshd.common.util.buffer.Buffer;
  *
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public interface Session extends Closeable {
+public interface Session extends SessionListenerManager, ChannelListenerManager, Closeable {
 
     /**
      * Timeout status.
@@ -181,20 +182,6 @@ public interface Session extends Closeable {
     void exceptionCaught(Throwable t);
 
     /**
-     * Add a session |listener|.
-     *
-     * @param listener the session listener to add
-     */
-    void addListener(SessionListener listener);
-
-    /**
-     * Remove a session |listener|.
-     *
-     * @param listener the session listener to remove
-     */
-    void removeListener(SessionListener listener);
-
-    /**
      * Initiate a new key exchange.
      *
      * @return An {@link SshFuture} for awaiting the completion of the exchange

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/session/SessionListenerManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/SessionListenerManager.java b/sshd-core/src/main/java/org/apache/sshd/common/session/SessionListenerManager.java
new file mode 100644
index 0000000..eda580a
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/SessionListenerManager.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.session;
+
+/**
+ * Marker interface for classes that allow to add/remove session listeners.
+ * <B>Note:</B> if adding/removing listeners while connections are being
+ * established and/or torn down there are no guarantees as to the order of
+ * the calls to the recently added/removed listener's methods in the interim.
+ * The correct order is guaranteed only as of the <U>next</U> session after
+ * the listener has been added/removed.
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface SessionListenerManager {
+    /**
+     * Add a session listener.
+     *
+     * @param listener The {@link SessionListener} to add - not {@code null}
+     */
+    void addSessionListener(SessionListener listener);
+
+    /**
+     * Remove a session listener.
+     *
+     * @param listener The {@link SessionListener} to remove
+     */
+    void removeSessionListener(SessionListener listener);
+
+    /**
+     * @return A (never {@code null} proxy {@link SessionListener} that represents
+     * all the currently registered listeners. Any method invocation on the proxy
+     * is replicated to the currently registered listeners
+     */
+    SessionListener getSessionListenerProxy();
+}