You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2016/07/30 14:15:45 UTC

[1/2] mina-sshd git commit: [SSHD-682] Provide PortForwardingEventListener support

Repository: mina-sshd
Updated Branches:
  refs/heads/master cac64e4f9 -> 549ac4e2a


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/test/java/org/apache/sshd/ProxyTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/ProxyTest.java b/sshd-core/src/test/java/org/apache/sshd/ProxyTest.java
index 36f9eb9..70d8775 100644
--- a/sshd-core/src/test/java/org/apache/sshd/ProxyTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/ProxyTest.java
@@ -26,6 +26,8 @@ import java.net.Proxy;
 import java.net.Socket;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.mina.core.buffer.IoBuffer;
 import org.apache.mina.core.service.IoAcceptor;
@@ -37,6 +39,7 @@ import org.apache.sshd.client.session.ClientSession;
 import org.apache.sshd.client.session.forward.DynamicPortForwardingTracker;
 import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.PropertyResolverUtils;
+import org.apache.sshd.common.forward.PortForwardingEventListener;
 import org.apache.sshd.common.util.net.SshdSocketAddress;
 import org.apache.sshd.server.SshServer;
 import org.apache.sshd.server.forward.AcceptAllForwardingFilter;
@@ -99,13 +102,78 @@ public class ProxyTest extends BaseTestSupport {
 
     @Test
     public void testSocksProxy() throws Exception {
-        try (ClientSession session = createNativeSession()) {
+        final AtomicReference<SshdSocketAddress> localAddressHolder = new AtomicReference<>();
+        final AtomicReference<SshdSocketAddress> boundAddressHolder = new AtomicReference<>();
+        final AtomicInteger tearDownSignal = new AtomicInteger(0);
+        @SuppressWarnings("checkstyle:anoninnerlength")
+        PortForwardingEventListener listener = new PortForwardingEventListener() {
+            @Override
+            public void tornDownExplicitTunnel(
+                    org.apache.sshd.common.session.Session session, SshdSocketAddress address, boolean localForwarding, Throwable reason)
+                            throws IOException {
+                throw new UnsupportedOperationException("Unexpected explicit tunnel torn down indication: session=" + session + ", address=" + address);
+            }
+
+            @Override
+            public void tornDownDynamicTunnel(
+                    org.apache.sshd.common.session.Session session, SshdSocketAddress address, Throwable reason) throws IOException {
+                assertNotNull("Establishment (local) indication not invoked for address=" + address, localAddressHolder.get());
+                assertNotNull("Establishment (bound) indication not invoked for address=" + address, boundAddressHolder.get());
+                assertEquals("No tear down indication", 1, tearDownSignal.get());
+            }
+
+            @Override
+            public void tearingDownExplicitTunnel(
+                    org.apache.sshd.common.session.Session session, SshdSocketAddress address, boolean localForwarding)
+                            throws IOException {
+                throw new UnsupportedOperationException("Unexpected explicit tunnel tear down indication: session=" + session + ", address=" + address);
+            }
+
+            @Override
+            public void tearingDownDynamicTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress address)
+                    throws IOException {
+                assertNotNull("Establishment (local) indication not invoked for address=" + address, localAddressHolder.get());
+                assertNotNull("Establishment (bound) indication not invoked for address=" + address, boundAddressHolder.get());
+                assertEquals("Multiple tearing down indications", 1, tearDownSignal.incrementAndGet());
+            }
+
+            @Override
+            public void establishingExplicitTunnel(
+                    org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding)
+                            throws IOException {
+                throw new UnsupportedOperationException("Unexpected explicit tunnel establishment indication: session=" + session + ", address=" + local);
+            }
+
+            @Override
+            public void establishingDynamicTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local)
+                    throws IOException {
+                assertNull("Multiple calls to establishment indicator", localAddressHolder.getAndSet(local));
+            }
+
+            @Override
+            public void establishedExplicitTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local,
+                    SshdSocketAddress remote, boolean localForwarding, SshdSocketAddress boundAddress, Throwable reason)
+                            throws IOException {
+                throw new UnsupportedOperationException("Unexpected explicit tunnel established indication: session=" + session + ", address=" + boundAddress);
+            }
+
+            @Override
+            public void establishedDynamicTunnel(
+                    org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress boundAddress, Throwable reason)
+                            throws IOException {
+                assertSame("Establishment indication not invoked", local, localAddressHolder.get());
+                assertNull("Multiple calls to establishment indicator", boundAddressHolder.getAndSet(boundAddress));
+            }
+        };
+
+        try (ClientSession session = createNativeSession(listener)) {
             String expected = getCurrentTestName();
             byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
             byte[] buf = new byte[bytes.length + Long.SIZE];
 
             SshdSocketAddress dynamic;
-            try (DynamicPortForwardingTracker tracker = session.createDynamicPortForwardingTracker(new SshdSocketAddress(TEST_LOCALHOST, 0))) {
+            try (DynamicPortForwardingTracker tracker =
+                    session.createDynamicPortForwardingTracker(new SshdSocketAddress(TEST_LOCALHOST, 0))) {
                 dynamic = tracker.getBoundAddress();
                 assertTrue("Tracker not marked as open", tracker.isOpen());
 
@@ -128,8 +196,13 @@ public class ProxyTest extends BaseTestSupport {
 
                 tracker.close();
                 assertFalse("Tracker not marked as closed", tracker.isOpen());
+            } finally {
+                client.removePortForwardingEventListener(listener);
             }
 
+            assertNotNull("Local tunnel address not indicated", localAddressHolder.getAndSet(null));
+            assertNotNull("Bound tunnel address not indicated", boundAddressHolder.getAndSet(null));
+
             try {
                 try (Socket s = new Socket(new Proxy(Proxy.Type.SOCKS, new InetSocketAddress(TEST_LOCALHOST, dynamic.getPort())))) {
                     s.connect(new InetSocketAddress(TEST_LOCALHOST, echoPort));
@@ -143,11 +216,14 @@ public class ProxyTest extends BaseTestSupport {
         }
     }
 
-    protected ClientSession createNativeSession() throws Exception {
+    protected ClientSession createNativeSession(PortForwardingEventListener listener) throws Exception {
         client = setupTestClient();
         PropertyResolverUtils.updateProperty(client, FactoryManager.WINDOW_SIZE, 2048);
         PropertyResolverUtils.updateProperty(client, FactoryManager.MAX_PACKET_SIZE, 256);
         client.setTcpipForwardingFilter(AcceptAllForwardingFilter.INSTANCE);
+        if (listener != null) {
+            client.addPortForwardingEventListener(listener);
+        }
         client.start();
 
         ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, sshPort).verify(7L, TimeUnit.SECONDS).getSession();

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/test/java/org/apache/sshd/client/ClientAuthenticationManagerTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/client/ClientAuthenticationManagerTest.java b/sshd-core/src/test/java/org/apache/sshd/client/ClientAuthenticationManagerTest.java
index 0d08497..f42704a 100644
--- a/sshd-core/src/test/java/org/apache/sshd/client/ClientAuthenticationManagerTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/client/ClientAuthenticationManagerTest.java
@@ -41,6 +41,7 @@ import org.apache.sshd.common.NamedResource;
 import org.apache.sshd.common.channel.Channel;
 import org.apache.sshd.common.channel.ChannelListener;
 import org.apache.sshd.common.forward.DefaultTcpipForwarderFactory;
+import org.apache.sshd.common.forward.PortForwardingEventListener;
 import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.keyprovider.KeyPairProvider;
@@ -50,6 +51,7 @@ import org.apache.sshd.common.random.SingletonRandomFactory;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.session.SessionListener;
 import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.net.SshdSocketAddress;
 import org.apache.sshd.util.test.BaseTestSupport;
 import org.junit.FixMethodOrder;
 import org.junit.Test;
@@ -284,6 +286,52 @@ public class ClientAuthenticationManagerTest extends BaseTestSupport {
                 // ignored
             }
         });
+        Mockito.when(client.getPortForwardingEventListenerProxy()).thenReturn(new PortForwardingEventListener() {
+            @Override
+            public void tornDownExplicitTunnel(Session session, SshdSocketAddress address, boolean localForwarding,
+                    Throwable reason) throws IOException {
+                // ignored
+            }
+
+            @Override
+            public void tornDownDynamicTunnel(Session session, SshdSocketAddress address, Throwable reason) throws IOException {
+                // ignored
+            }
+
+            @Override
+            public void tearingDownExplicitTunnel(Session session, SshdSocketAddress address, boolean localForwarding)
+                    throws IOException {
+                // ignored
+            }
+
+            @Override
+            public void tearingDownDynamicTunnel(Session session, SshdSocketAddress address) throws IOException {
+                // ignored
+            }
+
+            @Override
+            public void establishingExplicitTunnel(Session session, SshdSocketAddress local, SshdSocketAddress remote,
+                    boolean localForwarding) throws IOException {
+                // ignored
+            }
+
+            @Override
+            public void establishingDynamicTunnel(Session session, SshdSocketAddress local) throws IOException {
+                // ignored
+            }
+
+            @Override
+            public void establishedExplicitTunnel(Session session, SshdSocketAddress local, SshdSocketAddress remote,
+                    boolean localForwarding, SshdSocketAddress boundAddress, Throwable reason) throws IOException {
+                // ignored
+            }
+
+            @Override
+            public void establishedDynamicTunnel(Session session, SshdSocketAddress local, SshdSocketAddress boundAddress,
+                    Throwable reason) throws IOException {
+                // ignored
+            }
+        });
         Factory<Random> randomFactory = new SingletonRandomFactory(JceRandomFactory.INSTANCE);
         Mockito.when(client.getRandomFactory()).thenReturn(randomFactory);
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java
index 74d9b91..26480a9 100644
--- a/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.sshd.common.forward;
 
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.lang.reflect.Field;
@@ -35,6 +36,8 @@ import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import com.jcraft.jsch.JSch;
 import com.jcraft.jsch.JSchException;
@@ -267,7 +270,7 @@ public class PortForwardingTest extends BaseTestSupport {
 
     @Test
     public void testRemoteForwardingNative() throws Exception {
-        try (ClientSession session = createNativeSession()) {
+        try (ClientSession session = createNativeSession(null)) {
             SshdSocketAddress remote = new SshdSocketAddress("", 0);
             SshdSocketAddress local = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
             SshdSocketAddress bound = session.startRemotePortForwarding(remote, local);
@@ -295,9 +298,76 @@ public class PortForwardingTest extends BaseTestSupport {
 
     @Test
     public void testRemoteForwardingNativeBigPayload() throws Exception {
-        try (ClientSession session = createNativeSession();
+        final AtomicReference<SshdSocketAddress> localAddressHolder = new AtomicReference<>();
+        final AtomicReference<SshdSocketAddress> remoteAddressHolder = new AtomicReference<>();
+        final AtomicReference<SshdSocketAddress> boundAddressHolder = new AtomicReference<>();
+        final AtomicInteger tearDownSignal = new AtomicInteger(0);
+        @SuppressWarnings("checkstyle:anoninnerlength")
+        PortForwardingEventListener listener = new PortForwardingEventListener() {
+            @Override
+            public void tornDownExplicitTunnel(
+                    org.apache.sshd.common.session.Session session, SshdSocketAddress address, boolean localForwarding, Throwable reason)
+                            throws IOException {
+                assertFalse("Unexpected local tunnel has been torn down: address=" + address, localForwarding);
+                assertEquals("Tear down indication not invoked", 1, tearDownSignal.get());
+            }
+
+            @Override
+            public void tornDownDynamicTunnel(
+                    org.apache.sshd.common.session.Session session, SshdSocketAddress address, Throwable reason) throws IOException {
+                throw new UnsupportedOperationException("Unexpected dynamic tunnel torn down indication: session=" + session + ", address=" + address);
+            }
+
+            @Override
+            public void tearingDownExplicitTunnel(
+                    org.apache.sshd.common.session.Session session, SshdSocketAddress address, boolean localForwarding)
+                            throws IOException {
+                assertFalse("Unexpected local tunnel being torn down: address=" + address, localForwarding);
+                assertEquals("Duplicate tear down signalling", 1, tearDownSignal.incrementAndGet());
+            }
+
+            @Override
+            public void tearingDownDynamicTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress address)
+                    throws IOException {
+                throw new UnsupportedOperationException("Unexpected dynamic tunnel tearing down indication: session=" + session + ", address=" + address);
+            }
+
+            @Override
+            public void establishingExplicitTunnel(
+                    org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding)
+                            throws IOException {
+                assertFalse("Unexpected local tunnel being established: local=" + local + ", remote=" + remote, localForwarding);
+                assertNull("Duplicate establishment indication call for local address=" + local, localAddressHolder.getAndSet(local));
+                assertNull("Duplicate establishment indication call for remote address=" + remote, remoteAddressHolder.getAndSet(remote));
+            }
+
+            @Override
+            public void establishingDynamicTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local)
+                    throws IOException {
+                throw new UnsupportedOperationException("Unexpected dynamic tunnel establishing indication: session=" + session + ", address=" + local);
+            }
+
+            @Override
+            public void establishedExplicitTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local,
+                    SshdSocketAddress remote, boolean localForwarding, SshdSocketAddress boundAddress, Throwable reason)
+                            throws IOException {
+                assertFalse("Unexpected local tunnel has been established: local=" + local + ", remote=" + remote + ", bound=" + boundAddress, localForwarding);
+                assertSame("Mismatched established tunnel local address", local, localAddressHolder.get());
+                assertSame("Mismatched established tunnel remote address", remote, remoteAddressHolder.get());
+                assertNull("Duplicate establishment indication call for bound address=" + boundAddress, boundAddressHolder.getAndSet(boundAddress));
+            }
+
+            @Override
+            public void establishedDynamicTunnel(
+                    org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress boundAddress, Throwable reason)
+                            throws IOException {
+                throw new UnsupportedOperationException("Unexpected dynamic tunnel established indication: session=" + session + ", address=" + boundAddress);
+            }
+        };
+
+        try (ClientSession session = createNativeSession(listener);
              ExplicitPortForwardingTracker tracker =
-                     session.createRemotePortForwardingTracker(new SshdSocketAddress(TEST_LOCALHOST, echoPort), new SshdSocketAddress("", 0))) {
+                     session.createRemotePortForwardingTracker(new SshdSocketAddress("", 0), new SshdSocketAddress(TEST_LOCALHOST, echoPort))) {
             assertTrue("Tracker not marked as open", tracker.isOpen());
             assertFalse("Tracker not marked as remote", tracker.isLocalForwarding());
 
@@ -324,7 +394,13 @@ public class PortForwardingTest extends BaseTestSupport {
                 tracker.close();
             }
             assertFalse("Tracker not marked as closed", tracker.isOpen());
+        } finally {
+            client.removePortForwardingEventListener(listener);
         }
+
+        assertNotNull("Local tunnel address not indicated", localAddressHolder.getAndSet(null));
+        assertNotNull("Remote tunnel address not indicated", remoteAddressHolder.getAndSet(null));
+        assertNotNull("Bound tunnel address not indicated", boundAddressHolder.getAndSet(null));
     }
 
     @Test
@@ -360,7 +436,74 @@ public class PortForwardingTest extends BaseTestSupport {
 
     @Test
     public void testLocalForwardingNative() throws Exception {
-        try (ClientSession session = createNativeSession();
+        final AtomicReference<SshdSocketAddress> localAddressHolder = new AtomicReference<>();
+        final AtomicReference<SshdSocketAddress> remoteAddressHolder = new AtomicReference<>();
+        final AtomicReference<SshdSocketAddress> boundAddressHolder = new AtomicReference<>();
+        final AtomicInteger tearDownSignal = new AtomicInteger(0);
+        @SuppressWarnings("checkstyle:anoninnerlength")
+        PortForwardingEventListener listener = new PortForwardingEventListener() {
+            @Override
+            public void tornDownExplicitTunnel(
+                    org.apache.sshd.common.session.Session session, SshdSocketAddress address, boolean localForwarding, Throwable reason)
+                            throws IOException {
+                assertTrue("Unexpected remote tunnel has been torn down: address=" + address, localForwarding);
+                assertEquals("Tear down indication not invoked", 1, tearDownSignal.get());
+            }
+
+            @Override
+            public void tornDownDynamicTunnel(
+                    org.apache.sshd.common.session.Session session, SshdSocketAddress address, Throwable reason) throws IOException {
+                throw new UnsupportedOperationException("Unexpected dynamic tunnel torn down indication: session=" + session + ", address=" + address);
+            }
+
+            @Override
+            public void tearingDownExplicitTunnel(
+                    org.apache.sshd.common.session.Session session, SshdSocketAddress address, boolean localForwarding)
+                            throws IOException {
+                assertTrue("Unexpected remote tunnel being torn down: address=" + address, localForwarding);
+                assertEquals("Duplicate tear down signalling", 1, tearDownSignal.incrementAndGet());
+            }
+
+            @Override
+            public void tearingDownDynamicTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress address)
+                    throws IOException {
+                throw new UnsupportedOperationException("Unexpected dynamic tunnel tearing down indication: session=" + session + ", address=" + address);
+            }
+
+            @Override
+            public void establishingExplicitTunnel(
+                    org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding)
+                            throws IOException {
+                assertTrue("Unexpected remote tunnel being established: local=" + local + ", remote=" + remote, localForwarding);
+                assertNull("Duplicate establishment indication call for local address=" + local, localAddressHolder.getAndSet(local));
+                assertNull("Duplicate establishment indication call for remote address=" + remote, remoteAddressHolder.getAndSet(remote));
+            }
+
+            @Override
+            public void establishingDynamicTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local)
+                    throws IOException {
+                throw new UnsupportedOperationException("Unexpected dynamic tunnel establishing indication: session=" + session + ", address=" + local);
+            }
+
+            @Override
+            public void establishedExplicitTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local,
+                    SshdSocketAddress remote, boolean localForwarding, SshdSocketAddress boundAddress, Throwable reason)
+                            throws IOException {
+                assertTrue("Unexpected remote tunnel has been established: local=" + local + ", remote=" + remote + ", bound=" + boundAddress, localForwarding);
+                assertSame("Mismatched established tunnel local address", local, localAddressHolder.get());
+                assertSame("Mismatched established tunnel remote address", remote, remoteAddressHolder.get());
+                assertNull("Duplicate establishment indication call for bound address=" + boundAddress, boundAddressHolder.getAndSet(boundAddress));
+            }
+
+            @Override
+            public void establishedDynamicTunnel(
+                    org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress boundAddress, Throwable reason)
+                            throws IOException {
+                throw new UnsupportedOperationException("Unexpected dynamic tunnel established indication: session=" + session + ", address=" + boundAddress);
+            }
+        };
+
+        try (ClientSession session = createNativeSession(listener);
              ExplicitPortForwardingTracker tracker =
                  session.createLocalPortForwardingTracker(new SshdSocketAddress("", 0), new SshdSocketAddress(TEST_LOCALHOST, echoPort))) {
             assertTrue("Tracker not marked as open", tracker.isOpen());
@@ -387,12 +530,18 @@ public class PortForwardingTest extends BaseTestSupport {
                 tracker.close();
             }
             assertFalse("Tracker not marked as closed", tracker.isOpen());
+        } finally {
+            client.removePortForwardingEventListener(listener);
         }
+
+        assertNotNull("Local tunnel address not indicated", localAddressHolder.getAndSet(null));
+        assertNotNull("Remote tunnel address not indicated", remoteAddressHolder.getAndSet(null));
+        assertNotNull("Bound tunnel address not indicated", boundAddressHolder.getAndSet(null));
     }
 
     @Test
     public void testLocalForwardingNativeReuse() throws Exception {
-        try (ClientSession session = createNativeSession()) {
+        try (ClientSession session = createNativeSession(null)) {
             SshdSocketAddress local = new SshdSocketAddress("", 0);
             SshdSocketAddress remote = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
             SshdSocketAddress bound = session.startLocalPortForwarding(local, remote);
@@ -406,7 +555,7 @@ public class PortForwardingTest extends BaseTestSupport {
 
     @Test
     public void testLocalForwardingNativeBigPayload() throws Exception {
-        try (ClientSession session = createNativeSession()) {
+        try (ClientSession session = createNativeSession(null)) {
             String expected = getCurrentTestName();
             byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
             byte[] buf = new byte[bytes.length + Long.SIZE];
@@ -436,7 +585,7 @@ public class PortForwardingTest extends BaseTestSupport {
 
     @Test
     public void testForwardingChannel() throws Exception {
-        try (ClientSession session = createNativeSession()) {
+        try (ClientSession session = createNativeSession(null)) {
             SshdSocketAddress local = new SshdSocketAddress("", 0);
             SshdSocketAddress remote = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
 
@@ -594,11 +743,14 @@ public class PortForwardingTest extends BaseTestSupport {
         return session;
     }
 
-    protected ClientSession createNativeSession() throws Exception {
+    protected ClientSession createNativeSession(PortForwardingEventListener listener) throws Exception {
         client = setupTestClient();
         PropertyResolverUtils.updateProperty(client, FactoryManager.WINDOW_SIZE, 2048);
         PropertyResolverUtils.updateProperty(client, FactoryManager.MAX_PACKET_SIZE, 256);
         client.setTcpipForwardingFilter(AcceptAllForwardingFilter.INSTANCE);
+        if (listener != null) {
+            client.addPortForwardingEventListener(listener);
+        }
         client.start();
 
         ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, sshPort).verify(7L, TimeUnit.SECONDS).getSession();

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/test/java/org/apache/sshd/common/util/EventListenerUtilsTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/util/EventListenerUtilsTest.java b/sshd-core/src/test/java/org/apache/sshd/common/util/EventListenerUtilsTest.java
index fdf7656..03d8d20 100644
--- a/sshd-core/src/test/java/org/apache/sshd/common/util/EventListenerUtilsTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/common/util/EventListenerUtilsTest.java
@@ -20,8 +20,11 @@
 package org.apache.sshd.common.util;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.EventListener;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.sshd.util.test.BaseTestSupport;
 import org.junit.FixMethodOrder;
@@ -39,7 +42,7 @@ public class EventListenerUtilsTest extends BaseTestSupport {
 
     @Test
     public void testProxyWrapper() {
-        List<ProxyListenerImpl> impls = new ArrayList<ProxyListenerImpl>();
+        List<ProxyListenerImpl> impls = new ArrayList<>();
         for (int index = 0; index < Byte.SIZE; index++) {
             impls.add(new ProxyListenerImpl());
         }
@@ -57,6 +60,57 @@ public class EventListenerUtilsTest extends BaseTestSupport {
         }
     }
 
+    @Test
+    public void testListenerInstanceComparatorOnProxy() {
+        Comparator<? super EventListener> comparator = EventListenerUtils.LISTENER_INSTANCE_COMPARATOR;
+        ProxyListener p1 = EventListenerUtils.proxyWrapper(ProxyListener.class, Collections.singletonList(new ProxyListenerImpl()));
+        assertEquals("Mismatched self reference comparison", 0, comparator.compare(p1, p1));
+
+        EventListener l = new EventListener() { /* nothing extra */ };
+        assertEquals("Mismatched proxy vs. non-proxy result", 1, Integer.signum(comparator.compare(p1, l)));
+        assertEquals("Mismatched non-proxy vs. proxy result", -1, Integer.signum(comparator.compare(l, p1)));
+
+        ProxyListener p2 = EventListenerUtils.proxyWrapper(ProxyListener.class, Collections.singletonList(new ProxyListenerImpl()));
+        int p1vsp2 = Integer.signum(comparator.compare(p1, p2));
+        assertNotEquals("Mismatched p1 vs. p2 comparison", 0, p1vsp2);
+        assertEquals("Mismatched p2 vs. p1 comparison result", 0 - p1vsp2, Integer.signum(comparator.compare(p2, p1)));
+    }
+
+    @Test
+    public void testListenerInstanceComparatorOnNonProxy() {
+        Comparator<? super EventListener> comparator = EventListenerUtils.LISTENER_INSTANCE_COMPARATOR;
+        EventListener l1 = new EventListener() { /* nothing extra */ };
+        assertEquals("Mismatched self reference comparison", 0, comparator.compare(l1, l1));
+
+        EventListener l2 = new EventListener() { /* nothing extra */ };
+        int l1vsl2 = Integer.signum(comparator.compare(l1, l2));
+        assertNotEquals("Mismatched l1 vs. l2 comparison result", 0, l1vsl2);
+        assertEquals("Mismatched l2 vs. l1 comparison result", 0 - l1vsl2, Integer.signum(comparator.compare(l2, l1)));
+    }
+
+    @Test
+    public void testSynchronizedListenersSetOnProxies() {
+        ProxyListener p1 = EventListenerUtils.proxyWrapper(ProxyListener.class, Collections.singletonList(new ProxyListenerImpl()));
+        Set<ProxyListener> s = EventListenerUtils.<ProxyListener>synchronizedListenersSet();
+        for (int index = 1; index <= Byte.SIZE; index++) {
+            boolean modified = s.add(p1);
+            assertEquals("Mismatched p1 modification indicator at attempt #" + index, index == 1, modified);
+            assertEquals("Mismatched p1 set size at attempt #" + index, 1, s.size());
+        }
+
+        ProxyListener p2 = EventListenerUtils.proxyWrapper(ProxyListener.class, Collections.singletonList(new ProxyListenerImpl()));
+        for (int index = 1; index <= Byte.SIZE; index++) {
+            boolean modified = s.add(p2);
+            assertEquals("Mismatched p2 modification indicator at attempt #" + index, index == 1, modified);
+            assertEquals("Mismatched p2 set size at attempt #" + index, 2, s.size());
+        }
+
+        assertTrue("Failed to remove p1", s.remove(p1));
+        assertEquals("Mismatched post p1-remove size", 1, s.size());
+        assertTrue("Failed to remove p2", s.remove(p2));
+        assertEquals("Mismatched post p2-remove size", 0, s.size());
+    }
+
     interface ProxyListener extends EventListener {
         void callMeWithString(String s);
 


[2/2] mina-sshd git commit: [SSHD-682] Provide PortForwardingEventListener support

Posted by lg...@apache.org.
[SSHD-682] Provide PortForwardingEventListener support


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/549ac4e2
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/549ac4e2
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/549ac4e2

Branch: refs/heads/master
Commit: 549ac4e2a197afebbd9d66ccc1cd8506c0a515fb
Parents: cac64e4
Author: Lyor Goldstein <ly...@gmail.com>
Authored: Sat Jul 30 17:17:09 2016 +0300
Committer: Lyor Goldstein <ly...@gmail.com>
Committed: Sat Jul 30 17:17:09 2016 +0300

----------------------------------------------------------------------
 .../session/ClientConnectionServiceFactory.java |  20 +-
 .../org/apache/sshd/common/FactoryManager.java  |   2 +
 .../sshd/common/channel/AbstractChannel.java    |   8 +-
 .../common/forward/DefaultTcpipForwarder.java   | 265 ++++++++++++++-----
 .../forward/DefaultTcpipForwarderFactory.java   |  47 +++-
 .../forward/PortForwardingEventListener.java    | 133 ++++++++++
 .../PortForwardingEventListenerManager.java     |  52 ++++
 .../sshd/common/forward/TcpipForwarder.java     |   2 +-
 .../common/forward/TcpipForwarderFactory.java   |   1 -
 .../common/helpers/AbstractFactoryManager.java  |  46 +++-
 .../AbstractConnectionServiceFactory.java       |  60 +++++
 .../sshd/common/session/ConnectionService.java  |   3 +-
 .../org/apache/sshd/common/session/Session.java |   2 +
 .../helpers/AbstractConnectionService.java      |  36 ++-
 .../common/session/helpers/AbstractSession.java |  57 +++-
 .../sshd/common/util/EventListenerUtils.java    |  90 +++++++
 .../org/apache/sshd/server/SignalListener.java  |   4 +-
 .../apache/sshd/server/StandardEnvironment.java |  19 +-
 .../sshd/server/scp/ScpCommandFactory.java      |   8 +-
 .../server/session/ServerConnectionService.java |   4 +-
 .../session/ServerConnectionServiceFactory.java |  20 +-
 .../sftp/AbstractSftpEventListenerManager.java  |   4 +-
 .../server/subsystem/sftp/SftpSubsystem.java    |  12 +-
 .../test/java/org/apache/sshd/ProxyTest.java    |  82 +++++-
 .../client/ClientAuthenticationManagerTest.java |  48 ++++
 .../sshd/common/forward/PortForwardingTest.java | 168 +++++++++++-
 .../common/util/EventListenerUtilsTest.java     |  56 +++-
 27 files changed, 1124 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionServiceFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionServiceFactory.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionServiceFactory.java
index 2efe7b7..d71ce1c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionServiceFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionServiceFactory.java
@@ -22,14 +22,26 @@ import java.io.IOException;
 
 import org.apache.sshd.common.Service;
 import org.apache.sshd.common.ServiceFactory;
+import org.apache.sshd.common.forward.PortForwardingEventListener;
+import org.apache.sshd.common.session.AbstractConnectionServiceFactory;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.ValidateUtils;
 
 /**
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public class ClientConnectionServiceFactory implements ServiceFactory {
-    public static final ClientConnectionServiceFactory INSTANCE = new ClientConnectionServiceFactory();
+public class ClientConnectionServiceFactory extends AbstractConnectionServiceFactory implements ServiceFactory {
+    public static final ClientConnectionServiceFactory INSTANCE = new ClientConnectionServiceFactory() {
+        @Override
+        public void addPortForwardingEventListener(PortForwardingEventListener listener) {
+            throw new UnsupportedOperationException("addPortForwardingListener(" + listener + ") N/A on default instance");
+        }
+
+        @Override
+        public void removePortForwardingEventListener(PortForwardingEventListener listener) {
+            throw new UnsupportedOperationException("removePortForwardingEventListener(" + listener + ") N/A on default instance");
+        }
+    };
 
     public ClientConnectionServiceFactory() {
         super();
@@ -43,6 +55,8 @@ public class ClientConnectionServiceFactory implements ServiceFactory {
     @Override
     public Service create(Session session) throws IOException {
         ValidateUtils.checkTrue(session instanceof AbstractClientSession, "Not a client sesssion: %s", session);
-        return new ClientConnectionService((AbstractClientSession) session);
+        ClientConnectionService service = new ClientConnectionService((AbstractClientSession) session);
+        service.addPortForwardingEventListener(getPortForwardingEventListenerProxy());
+        return service;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
index 516da0b..ea3ad86 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
@@ -27,6 +27,7 @@ import org.apache.sshd.common.channel.Channel;
 import org.apache.sshd.common.channel.ChannelListenerManager;
 import org.apache.sshd.common.channel.RequestHandler;
 import org.apache.sshd.common.file.FileSystemFactory;
+import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
 import org.apache.sshd.common.forward.TcpipForwarderFactory;
 import org.apache.sshd.common.io.IoServiceFactory;
 import org.apache.sshd.common.kex.KexFactoryManager;
@@ -47,6 +48,7 @@ public interface FactoryManager
                 SessionListenerManager,
                 ReservedSessionMessagesManager,
                 ChannelListenerManager,
+                PortForwardingEventListenerManager,
                 AttributeStore,
                 PropertyResolver {
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index 50022ff..f6f5ddf 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -27,7 +27,6 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -79,13 +78,14 @@ public abstract class AbstractChannel
     protected final AtomicBoolean initialized = new AtomicBoolean(false);
     protected final AtomicBoolean eofReceived = new AtomicBoolean(false);
     protected final AtomicBoolean eofSent = new AtomicBoolean(false);
-    protected AtomicReference<GracefulState> gracefulState = new AtomicReference<GracefulState>(GracefulState.Opened);
+    protected AtomicReference<GracefulState> gracefulState = new AtomicReference<>(GracefulState.Opened);
     protected final DefaultCloseFuture gracefulFuture = new DefaultCloseFuture(lock);
-    protected final List<RequestHandler<Channel>> handlers = new ArrayList<RequestHandler<Channel>>();
+    protected final List<RequestHandler<Channel>> handlers = new ArrayList<>();
     /**
      * Channel events listener
      */
-    protected final Collection<ChannelListener> channelListeners = new CopyOnWriteArraySet<>();
+    protected final Collection<ChannelListener> channelListeners =
+            EventListenerUtils.<ChannelListener>synchronizedListenersSet();
     protected final ChannelListener channelListenerProxy;
 
     private int id = -1;

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
index b8aa438..aba9317 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
@@ -28,6 +28,7 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
@@ -49,6 +50,7 @@ import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.session.SessionHolder;
+import org.apache.sshd.common.util.EventListenerUtils;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.Readable;
 import org.apache.sshd.common.util.ValidateUtils;
@@ -65,7 +67,7 @@ import org.apache.sshd.server.forward.ForwardingFilter;
  */
 public class DefaultTcpipForwarder
         extends AbstractInnerCloseable
-        implements TcpipForwarder, SessionHolder<Session> {
+        implements TcpipForwarder, SessionHolder<Session>, PortForwardingEventListenerManager {
 
     /**
      * Used to configure the timeout (milliseconds) for receiving a response
@@ -101,11 +103,35 @@ public class DefaultTcpipForwarder
             return new StaticIoHandler();
         }
     };
+    private final Collection<PortForwardingEventListener> listeners =
+            EventListenerUtils.<PortForwardingEventListener>synchronizedListenersSet();
+    private final PortForwardingEventListener listenerProxy;
+
     private IoAcceptor acceptor;
 
     public DefaultTcpipForwarder(ConnectionService service) {
         this.service = ValidateUtils.checkNotNull(service, "No connection service");
         this.sessionInstance = ValidateUtils.checkNotNull(service.getSession(), "No session");
+        this.listenerProxy = EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, getClass().getClassLoader(), listeners);
+    }
+
+    @Override
+    public PortForwardingEventListener getPortForwardingEventListenerProxy() {
+        return listenerProxy;
+    }
+
+    @Override
+    public void addPortForwardingEventListener(PortForwardingEventListener listener) {
+        listeners.add(Objects.requireNonNull(listener, "No listener to add"));
+    }
+
+    @Override
+    public void removePortForwardingEventListener(PortForwardingEventListener listener) {
+        if (listener == null) {
+            return;
+        }
+
+        listeners.remove(listener);
     }
 
     @Override
@@ -134,22 +160,42 @@ public class DefaultTcpipForwarder
             throw new IllegalStateException("TcpipForwarder is closing");
         }
 
-        InetSocketAddress bound = doBind(local, staticIoHandlerFactory);
-        int port = bound.getPort();
-        SshdSocketAddress prev;
-        synchronized (localToRemote) {
-            prev = localToRemote.put(port, remote);
-        }
+        InetSocketAddress bound;
+        int port;
+        PortForwardingEventListener listener = getPortForwardingEventListenerProxy();
+        listener.establishingExplicitTunnel(getSession(), local, remote, true);
+        try {
+            bound = doBind(local, staticIoHandlerFactory);
+            port = bound.getPort();
+            SshdSocketAddress prev;
+            synchronized (localToRemote) {
+                prev = localToRemote.put(port, remote);
+            }
 
-        if (prev != null) {
-            throw new IOException("Multiple local port forwarding bindings on port=" + port + ": current=" + remote + ", previous=" + prev);
+            if (prev != null) {
+                throw new IOException("Multiple local port forwarding bindings on port=" + port + ": current=" + remote + ", previous=" + prev);
+            }
+        } catch (IOException | RuntimeException e) {
+            try {
+                stopLocalPortForwarding(local);
+            } catch (IOException | RuntimeException err) {
+                e.addSuppressed(err);
+            }
+            listener.establishedExplicitTunnel(getSession(), local, remote, true, null, e);
+            throw e;
         }
 
-        SshdSocketAddress result = new SshdSocketAddress(bound.getHostString(), port);
-        if (log.isDebugEnabled()) {
-            log.debug("startLocalPortForwarding(" + local + " -> " + remote + "): " + result);
+        try {
+            SshdSocketAddress result = new SshdSocketAddress(bound.getHostString(), port);
+            if (log.isDebugEnabled()) {
+                log.debug("startLocalPortForwarding(" + local + " -> " + remote + "): " + result);
+            }
+            listener.establishedExplicitTunnel(getSession(), local, remote, true, result, null);
+            return result;
+        } catch (IOException | RuntimeException e) {
+            stopLocalPortForwarding(local);
+            throw e;
         }
-        return result;
     }
 
     @Override
@@ -165,7 +211,17 @@ public class DefaultTcpipForwarder
             if (log.isDebugEnabled()) {
                 log.debug("stopLocalPortForwarding(" + local + ") unbind " + bound);
             }
-            acceptor.unbind(bound.toInetSocketAddress());
+
+            PortForwardingEventListener listener = getPortForwardingEventListenerProxy();
+            listener.tearingDownExplicitTunnel(getSession(), bound, true);
+            try {
+                acceptor.unbind(bound.toInetSocketAddress());
+            } catch (RuntimeException e) {
+                listener.tornDownExplicitTunnel(getSession(), bound, true, e);
+                throw e;
+            }
+
+            listener.tornDownExplicitTunnel(getSession(), bound, true, null);
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("stopLocalPortForwarding(" + local + ") no mapping/acceptor for " + bound);
@@ -188,27 +244,47 @@ public class DefaultTcpipForwarder
         buffer.putInt(remotePort);
 
         long timeout = PropertyResolverUtils.getLongProperty(session, FORWARD_REQUEST_TIMEOUT, DEFAULT_FORWARD_REQUEST_TIMEOUT);
-        Buffer result = session.request("tcpip-forward", buffer, timeout, TimeUnit.MILLISECONDS);
-        if (result == null) {
-            throw new SshException("Tcpip forwarding request denied by server");
-        }
-        int port = (remotePort == 0) ? result.getInt() : remote.getPort();
-        // TODO: Is it really safe to only store the local address after the request ?
-        SshdSocketAddress prev;
-        synchronized (remoteToLocal) {
-            prev = remoteToLocal.put(port, local);
-        }
+        Buffer result;
+        int port;
+        PortForwardingEventListener listener = getPortForwardingEventListenerProxy();
+        listener.establishingExplicitTunnel(getSession(), local, remote, false);
+        try {
+            result = session.request("tcpip-forward", buffer, timeout, TimeUnit.MILLISECONDS);
+            if (result == null) {
+                throw new SshException("Tcpip forwarding request denied by server");
+            }
+            port = (remotePort == 0) ? result.getInt() : remote.getPort();
+            // TODO: Is it really safe to only store the local address after the request ?
+            SshdSocketAddress prev;
+            synchronized (remoteToLocal) {
+                prev = remoteToLocal.put(port, local);
+            }
 
-        if (prev != null) {
-            throw new IOException("Multiple remote port forwarding bindings on port=" + port + ": current=" + remote + ", previous=" + prev);
+            if (prev != null) {
+                throw new IOException("Multiple remote port forwarding bindings on port=" + port + ": current=" + remote + ", previous=" + prev);
+            }
+        } catch (IOException | RuntimeException e) {
+            try {
+                stopRemotePortForwarding(remote);
+            } catch (IOException | RuntimeException err) {
+                e.addSuppressed(err);
+            }
+            listener.establishedExplicitTunnel(session, local, remote, false, null, e);
+            throw e;
         }
 
-        SshdSocketAddress bound = new SshdSocketAddress(remoteHost, port);
-        if (log.isDebugEnabled()) {
-            log.debug("startRemotePortForwarding(" + remote + " -> " + local + "): " + bound);
-        }
+        try {
+            SshdSocketAddress bound = new SshdSocketAddress(remoteHost, port);
+            if (log.isDebugEnabled()) {
+                log.debug("startRemotePortForwarding(" + remote + " -> " + local + "): " + bound);
+            }
 
-        return bound;
+            listener.establishedExplicitTunnel(getSession(), local, remote, false, bound, null);
+            return bound;
+        } catch (IOException | RuntimeException e) {
+            stopRemotePortForwarding(remote);
+            throw e;
+        }
     }
 
     @Override
@@ -230,7 +306,17 @@ public class DefaultTcpipForwarder
             buffer.putBoolean(false);   // want reply
             buffer.putString(remoteHost);
             buffer.putInt(remote.getPort());
-            session.writePacket(buffer);
+
+            PortForwardingEventListener listener = getPortForwardingEventListenerProxy();
+            listener.tearingDownExplicitTunnel(getSession(), bound, false);
+            try {
+                session.writePacket(buffer);
+            } catch (IOException | RuntimeException e) {
+                listener.tornDownExplicitTunnel(getSession(), bound, false, e);
+                throw e;
+            }
+
+            listener.tornDownExplicitTunnel(getSession(), bound, false, null);
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("stopRemotePortForwarding(" + remote + ") no binding found");
@@ -252,27 +338,47 @@ public class DefaultTcpipForwarder
 
         SocksProxy socksProxy = new SocksProxy(service);
         SocksProxy prev;
-        InetSocketAddress bound = doBind(local, socksProxyIoHandlerFactory);
-        int port = bound.getPort();
-        synchronized (dynamicLocal) {
-            prev = dynamicLocal.put(port, socksProxy);
-        }
+        InetSocketAddress bound;
+        int port;
+        PortForwardingEventListener listener = getPortForwardingEventListenerProxy();
+        listener.establishingDynamicTunnel(getSession(), local);
+        try {
+            bound = doBind(local, socksProxyIoHandlerFactory);
+            port = bound.getPort();
+            synchronized (dynamicLocal) {
+                prev = dynamicLocal.put(port, socksProxy);
+            }
 
-        if (prev != null) {
-            throw new IOException("Multiple dynamic port mappings found for port=" + port + ": current=" + socksProxy + ", previous=" + prev);
+            if (prev != null) {
+                throw new IOException("Multiple dynamic port mappings found for port=" + port + ": current=" + socksProxy + ", previous=" + prev);
+            }
+        } catch (IOException | RuntimeException e) {
+            try {
+                stopDynamicPortForwarding(local);
+            } catch (IOException | RuntimeException err) {
+                e.addSuppressed(err);
+            }
+            listener.establishedDynamicTunnel(getSession(), local, null, e);
+            throw e;
         }
 
-        SshdSocketAddress result = new SshdSocketAddress(bound.getHostString(), port);
-        if (log.isDebugEnabled()) {
-            log.debug("startDynamicPortForwarding(" + local + "): " + result);
-        }
+        try {
+            SshdSocketAddress result = new SshdSocketAddress(bound.getHostString(), port);
+            if (log.isDebugEnabled()) {
+                log.debug("startDynamicPortForwarding(" + local + "): " + result);
+            }
 
-        return result;
+            listener.establishedDynamicTunnel(getSession(), local, result, null);
+            return result;
+        } catch (IOException | RuntimeException e) {
+            stopDynamicPortForwarding(local);
+            throw e;
+        }
     }
 
     @Override
     public synchronized void stopDynamicPortForwarding(SshdSocketAddress local) throws IOException {
-        Closeable obj;
+        SocksProxy obj;
         synchronized (dynamicLocal) {
             obj = dynamicLocal.remove(local.getPort());
         }
@@ -281,8 +387,18 @@ public class DefaultTcpipForwarder
             if (log.isDebugEnabled()) {
                 log.debug("stopDynamicPortForwarding(" + local + ") unbinding");
             }
-            obj.close(true);
-            acceptor.unbind(local.toInetSocketAddress());
+
+            PortForwardingEventListener listener = getPortForwardingEventListenerProxy();
+            listener.tearingDownDynamicTunnel(sessionInstance, local);
+            try {
+                obj.close(true);
+                acceptor.unbind(local.toInetSocketAddress());
+            } catch (RuntimeException e) {
+                listener.tornDownDynamicTunnel(getSession(), local, e);
+                throw e;
+            }
+
+            listener.tornDownDynamicTunnel(getSession(), local, null);
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("stopDynamicPortForwarding(" + local + ") no binding found");
@@ -321,22 +437,41 @@ public class DefaultTcpipForwarder
             throw new RuntimeSshException(e);
         }
 
-        InetSocketAddress bound = doBind(local, staticIoHandlerFactory);
-        SshdSocketAddress result = new SshdSocketAddress(bound.getHostString(), bound.getPort());
-        if (log.isDebugEnabled()) {
-            log.debug("localPortForwardingRequested(" + local + "): " + result);
-        }
+        PortForwardingEventListener listener = getPortForwardingEventListenerProxy();
+        listener.establishingExplicitTunnel(getSession(), local, null, true);
+        SshdSocketAddress result;
+        try {
+            InetSocketAddress bound = doBind(local, staticIoHandlerFactory);
+            result = new SshdSocketAddress(bound.getHostString(), bound.getPort());
+            if (log.isDebugEnabled()) {
+                log.debug("localPortForwardingRequested(" + local + "): " + result);
+            }
 
-        boolean added;
-        synchronized (localForwards) {
-            // NOTE !!! it is crucial to use the bound address host name first
-            added = localForwards.add(new LocalForwardingEntry(result.getHostName(), local.getHostName(), result.getPort()));
+            boolean added;
+            synchronized (localForwards) {
+                // NOTE !!! it is crucial to use the bound address host name first
+                added = localForwards.add(new LocalForwardingEntry(result.getHostName(), local.getHostName(), result.getPort()));
+            }
+
+            if (!added) {
+                throw new IOException("Failed to add local port forwarding entry for " + local + " -> " + result);
+            }
+        } catch (IOException | RuntimeException e) {
+            try {
+                localPortForwardingCancelled(local);
+            } catch (IOException | RuntimeException err) {
+                e.addSuppressed(e);
+            }
+            listener.establishedExplicitTunnel(getSession(), local, null, true, null, e);
+            throw e;
         }
 
-        if (!added) {
-            throw new IOException("Failed to add local port forwarding entry for " + local + " -> " + result);
+        try {
+            listener.establishedExplicitTunnel(getSession(), local, null, true, result, null);
+            return result;
+        } catch (IOException | RuntimeException e) {
+            throw e;
         }
-        return result;
     }
 
     @Override
@@ -353,7 +488,17 @@ public class DefaultTcpipForwarder
             if (log.isDebugEnabled()) {
                 log.debug("localPortForwardingCancelled(" + local + ") unbind " + entry);
             }
-            acceptor.unbind(entry.toInetSocketAddress());
+
+            PortForwardingEventListener listener = getPortForwardingEventListenerProxy();
+            listener.tearingDownExplicitTunnel(getSession(), entry, true);
+            try {
+                acceptor.unbind(entry.toInetSocketAddress());
+            } catch (RuntimeException e) {
+                listener.tornDownExplicitTunnel(getSession(), entry, true, e);
+                throw e;
+            }
+
+            listener.tornDownExplicitTunnel(getSession(), entry, true, null);
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("localPortForwardingCancelled(" + local + ") no match/acceptor: " + entry);

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarderFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarderFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarderFactory.java
index d38bf43..e7de9c3 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarderFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarderFactory.java
@@ -18,22 +18,61 @@
  */
 package org.apache.sshd.common.forward;
 
+import java.util.Collection;
+import java.util.Objects;
+
 import org.apache.sshd.common.session.ConnectionService;
+import org.apache.sshd.common.util.EventListenerUtils;
 
 /**
  * The default {@link TcpipForwarderFactory} implementation.
  *
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public class DefaultTcpipForwarderFactory implements TcpipForwarderFactory {
-    public static final DefaultTcpipForwarderFactory INSTANCE = new DefaultTcpipForwarderFactory();
+public class DefaultTcpipForwarderFactory implements TcpipForwarderFactory, PortForwardingEventListenerManager {
+    public static final DefaultTcpipForwarderFactory INSTANCE = new DefaultTcpipForwarderFactory() {
+        @Override
+        public void addPortForwardingEventListener(PortForwardingEventListener listener) {
+            throw new UnsupportedOperationException("addPortForwardingListener(" + listener + ") N/A on default instance");
+        }
+
+        @Override
+        public void removePortForwardingEventListener(PortForwardingEventListener listener) {
+            throw new UnsupportedOperationException("removePortForwardingEventListener(" + listener + ") N/A on default instance");
+        }
+    };
+
+    private final Collection<PortForwardingEventListener> listeners =
+            EventListenerUtils.<PortForwardingEventListener>synchronizedListenersSet();
+    private final PortForwardingEventListener listenerProxy;
 
     public DefaultTcpipForwarderFactory() {
-        super();
+        listenerProxy = EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, getClass().getClassLoader(), listeners);
+    }
+
+    @Override
+    public PortForwardingEventListener getPortForwardingEventListenerProxy() {
+        return listenerProxy;
+    }
+
+    @Override
+    public void addPortForwardingEventListener(PortForwardingEventListener listener) {
+        listeners.add(Objects.requireNonNull(listener, "No listener to add"));
+    }
+
+    @Override
+    public void removePortForwardingEventListener(PortForwardingEventListener listener) {
+        if (listener == null) {
+            return;
+        }
+
+        listeners.remove(listener);
     }
 
     @Override
     public TcpipForwarder create(ConnectionService service) {
-        return new DefaultTcpipForwarder(service);
+        TcpipForwarder forwarder = new DefaultTcpipForwarder(service);
+        forwarder.addPortForwardingEventListener(getPortForwardingEventListenerProxy());
+        return forwarder;
     }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListener.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListener.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListener.java
new file mode 100644
index 0000000..d03dbd9
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListener.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.forward;
+
+import java.io.IOException;
+import java.util.EventListener;
+
+import org.apache.sshd.common.session.Session;
+import org.apache.sshd.common.util.net.SshdSocketAddress;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface PortForwardingEventListener extends EventListener {
+    /**
+     * Signals the attempt to establish a local/remote port forwarding
+     *
+     * @param session The {@link Session} through which the attempt is made
+     * @param local The local address - may be {@code null} on the receiver side
+     * @param remote The remote address - may be {@code null} on the receiver side
+     * @param localForwarding Local/remote port forwarding indicator
+     * @throws IOException If failed to handle the event - in which case
+     * the attempt is aborted and the exception re-thrown to the caller
+     */
+    void establishingExplicitTunnel(
+            Session session, SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding)
+                    throws IOException;
+
+    /**
+     * Signals a successful/failed attempt to establish a local/remote port forwarding
+     *
+     * @param session The {@link Session} through which the attempt was made
+     * @param local The local address - may be {@code null} on the receiver side
+     * @param remote The remote address - may be {@code null} on the receiver side
+     * @param localForwarding Local/remote port forwarding indicator
+     * @param boundAddress The bound address - non-{@code null} if successful
+     * @param reason Reason for failure - {@code null} if successful
+     * @throws IOException If failed to handle the event - in which case
+     * the established tunnel is aborted
+     */
+    void establishedExplicitTunnel(
+            Session session, SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding,
+            SshdSocketAddress boundAddress, Throwable reason)
+                    throws IOException;
+
+    /**
+     * Signals a request to tear down a local/remote port forwarding
+     *
+     * @param session The {@link Session} through which the request is made
+     * @param address The (bound) address - local/remote according to the forwarding type
+     * @param localForwarding Local/remote port forwarding indicator
+     * @throws IOException If failed to handle the event - in which case
+     * the request is aborted
+     */
+    void tearingDownExplicitTunnel(Session session, SshdSocketAddress address, boolean localForwarding) throws IOException;
+
+    /**
+     * Signals a successful/failed request to tear down a local/remote port forwarding
+     *
+     * @param session The {@link Session} through which the request is made
+     * @param address The (bound) address - local/remote according to the forwarding type
+     * @param localForwarding Local/remote port forwarding indicator
+     * @param reason Reason for failure - {@code null} if successful
+     * @throws IOException If failed to handle the event - <B>Note:</B>
+     * the exception is propagated, but the port forwarding may have
+     * been torn down - no rollback
+     */
+    void tornDownExplicitTunnel(Session session, SshdSocketAddress address, boolean localForwarding, Throwable reason) throws IOException;
+
+    /**
+     * Signals the attempt to establish a dynamic port forwarding
+     *
+     * @param session The {@link Session} through which the attempt is made
+     * @param local The local address
+     * @throws IOException If failed to handle the event - in which case
+     * the attempt is aborted and the exception re-thrown to the caller
+     */
+    void establishingDynamicTunnel(Session session, SshdSocketAddress local) throws IOException;
+
+    /**
+     * Signals a successful/failed attempt to establish a dynamic port forwarding
+     *
+     * @param session The {@link Session} through which the attempt is made
+     * @param local The local address
+     * @param boundAddress The bound address - non-{@code null} if successful
+     * @param reason Reason for failure - {@code null} if successful
+     * @throws IOException If failed to handle the event - in which case
+     * the established tunnel is aborted
+     */
+    void establishedDynamicTunnel(
+            Session session, SshdSocketAddress local, SshdSocketAddress boundAddress, Throwable reason)
+                    throws IOException;
+
+    /**
+     * Signals a request to tear down a dynamic forwarding
+     *
+     * @param session The {@link Session} through which the request is made
+     * @param address The (bound) address - local/remote according to the forwarding type
+     * @throws IOException If failed to handle the event - in which case
+     * the request is aborted
+     */
+    void tearingDownDynamicTunnel(Session session, SshdSocketAddress address) throws IOException;
+
+    /**
+     * Signals a successful/failed request to tear down a dynamic port forwarding
+     *
+     * @param session The {@link Session} through which the request is made
+     * @param address The (bound) address - local/remote according to the forwarding type
+     * @param reason Reason for failure - {@code null} if successful
+     * @throws IOException If failed to handle the event - <B>Note:</B>
+     * the exception is propagated, but the port forwarding may have
+     * been torn down - no rollback
+     */
+    void tornDownDynamicTunnel(Session session, SshdSocketAddress address, Throwable reason) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListenerManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListenerManager.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListenerManager.java
new file mode 100644
index 0000000..2e02e48
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListenerManager.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.forward;
+
+/**
+ * Marker interface for classes that allow to add/remove port forwarding
+ * listeners. <B>Note:</B> if adding/removing listeners while tunnels are
+ * being established and/or torn down there are no guarantees as to the order
+ * of the calls to the recently added/removed listener's methods in the interim.
+ * The correct order is guaranteed only as of the <U>next</U> tunnel after
+ * the listener has been added/removed.
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface PortForwardingEventListenerManager {
+    /**
+     * Add a port forwarding listener
+     *
+     * @param listener The {@link PortForwardingEventListener} to add - never {@code null}
+     */
+    void addPortForwardingEventListener(PortForwardingEventListener listener);
+
+    /**
+     * Remove a port forwarding listener
+     *
+     * @param listener The {@link PortForwardingEventListener} to remove - ignored if {@code null}
+     */
+    void removePortForwardingEventListener(PortForwardingEventListener listener);
+
+    /**
+     * @return A proxy listener representing all the currently registered listener
+     * through this manager
+     */
+    PortForwardingEventListener getPortForwardingEventListenerProxy();
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java
index ec0329a..07f4405 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java
@@ -24,7 +24,7 @@ import java.io.IOException;
 import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.util.net.SshdSocketAddress;
 
-public interface TcpipForwarder extends PortForwardingManager, Closeable {
+public interface TcpipForwarder extends PortForwardingManager, PortForwardingEventListenerManager, Closeable {
     /**
      * @param remotePort The remote port
      * @return The local {@link SshdSocketAddress} that the remote port is forwarded to

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarderFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarderFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarderFactory.java
index beb64cd..70cd098 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarderFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarderFactory.java
@@ -33,5 +33,4 @@ public interface TcpipForwarderFactory {
      * @return the {@link TcpipForwarder} that will listen for connections and set up forwarding
      */
     TcpipForwarder create(ConnectionService service);
-
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java
index 51cb5e2..48a78f2 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java
@@ -22,7 +22,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -41,6 +40,7 @@ import org.apache.sshd.common.channel.ChannelListener;
 import org.apache.sshd.common.channel.RequestHandler;
 import org.apache.sshd.common.config.VersionProperties;
 import org.apache.sshd.common.file.FileSystemFactory;
+import org.apache.sshd.common.forward.PortForwardingEventListener;
 import org.apache.sshd.common.forward.TcpipForwarderFactory;
 import org.apache.sshd.common.io.DefaultIoServiceFactoryFactory;
 import org.apache.sshd.common.io.IoServiceFactory;
@@ -78,10 +78,15 @@ public abstract class AbstractFactoryManager extends AbstractKexFactoryManager i
     protected List<RequestHandler<ConnectionService>> globalRequestHandlers;
     protected SessionTimeoutListener sessionTimeoutListener;
     protected ScheduledFuture<?> timeoutListenerFuture;
-    protected final Collection<SessionListener> sessionListeners = new CopyOnWriteArraySet<>();
+    protected final Collection<SessionListener> sessionListeners =
+            EventListenerUtils.<SessionListener>synchronizedListenersSet();
     protected final SessionListener sessionListenerProxy;
-    protected final Collection<ChannelListener> channelListeners = new CopyOnWriteArraySet<>();
+    protected final Collection<ChannelListener> channelListeners =
+            EventListenerUtils.<ChannelListener>synchronizedListenersSet();
     protected final ChannelListener channelListenerProxy;
+    protected final Collection<PortForwardingEventListener> tunnelListeners =
+            EventListenerUtils.<PortForwardingEventListener>synchronizedListenersSet();
+    protected final PortForwardingEventListener tunnelListenerProxy;
 
     private final Map<String, Object> properties = new ConcurrentHashMap<>();
     private final Map<AttributeKey<?>, Object> attributes = new ConcurrentHashMap<>();
@@ -92,6 +97,7 @@ public abstract class AbstractFactoryManager extends AbstractKexFactoryManager i
         ClassLoader loader = getClass().getClassLoader();
         sessionListenerProxy = EventListenerUtils.proxyWrapper(SessionListener.class, loader, sessionListeners);
         channelListenerProxy = EventListenerUtils.proxyWrapper(ChannelListener.class, loader, channelListeners);
+        tunnelListenerProxy = EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, loader, tunnelListeners);
     }
 
     @Override
@@ -329,6 +335,40 @@ public abstract class AbstractFactoryManager extends AbstractKexFactoryManager i
         return channelListenerProxy;
     }
 
+    @Override
+    public PortForwardingEventListener getPortForwardingEventListenerProxy() {
+        return tunnelListenerProxy;
+    }
+
+    @Override
+    public void addPortForwardingEventListener(PortForwardingEventListener listener) {
+        ValidateUtils.checkNotNull(listener, "addChannelListener(%s) null instance", this);
+        // avoid race conditions on notifications while session is being closed
+        if (!isOpen()) {
+            log.warn("addPortForwardingEventListener({})[{}] ignore registration while session is closing", this, listener);
+            return;
+        }
+
+        if (this.tunnelListeners.add(listener)) {
+            log.trace("addPortForwardingEventListener({})[{}] registered", this, listener);
+        } else {
+            log.trace("addPortForwardingEventListener({})[{}] ignored duplicate", this, listener);
+        }
+    }
+
+    @Override
+    public void removePortForwardingEventListener(PortForwardingEventListener listener) {
+        if (listener == null) {
+            return;
+        }
+
+        if (this.tunnelListeners.remove(listener)) {
+            log.trace("removePortForwardingEventListener({})[{}] removed", this, listener);
+        } else {
+            log.trace("removePortForwardingEventListener({})[{}] not registered", this, listener);
+        }
+    }
+
     protected void setupSessionTimeout(final AbstractSessionFactory<?, ?> sessionFactory) {
         // set up the the session timeout listener and schedule it
         sessionTimeoutListener = createSessionTimeoutListener();

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionServiceFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionServiceFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionServiceFactory.java
new file mode 100644
index 0000000..76e20c4
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionServiceFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.session;
+
+import java.util.Collection;
+import java.util.Objects;
+
+import org.apache.sshd.common.forward.PortForwardingEventListener;
+import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
+import org.apache.sshd.common.util.EventListenerUtils;
+import org.apache.sshd.common.util.logging.AbstractLoggingBean;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public abstract class AbstractConnectionServiceFactory extends AbstractLoggingBean implements PortForwardingEventListenerManager {
+    private final Collection<PortForwardingEventListener> listeners =
+            EventListenerUtils.<PortForwardingEventListener>synchronizedListenersSet();
+    private final PortForwardingEventListener listenerProxy;
+
+    protected AbstractConnectionServiceFactory() {
+        listenerProxy = EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, getClass().getClassLoader(), listeners);
+    }
+
+    @Override
+    public PortForwardingEventListener getPortForwardingEventListenerProxy() {
+        return listenerProxy;
+    }
+
+    @Override
+    public void addPortForwardingEventListener(PortForwardingEventListener listener) {
+        listeners.add(Objects.requireNonNull(listener, "No listener to add"));
+    }
+
+    @Override
+    public void removePortForwardingEventListener(PortForwardingEventListener listener) {
+        if (listener == null) {
+            return;
+        }
+
+        listeners.remove(listener);
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java
index c0b695c..a07046e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.sshd.agent.common.AgentForwardSupport;
 import org.apache.sshd.common.Service;
 import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
 import org.apache.sshd.common.forward.TcpipForwarder;
 import org.apache.sshd.server.x11.X11ForwardSupport;
 
@@ -31,7 +32,7 @@ import org.apache.sshd.server.x11.X11ForwardSupport;
  *
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public interface ConnectionService extends Service {
+public interface ConnectionService extends Service, PortForwardingEventListenerManager {
     /**
      * Register a newly created channel with a new unique identifier
      *

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
index 4cf9bf8..b5fe252 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
@@ -30,6 +30,7 @@ import org.apache.sshd.common.auth.MutableUserHolder;
 import org.apache.sshd.common.channel.ChannelListenerManager;
 import org.apache.sshd.common.cipher.CipherInformation;
 import org.apache.sshd.common.compression.CompressionInformation;
+import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
 import org.apache.sshd.common.future.KeyExchangeFuture;
 import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.io.IoWriteFuture;
@@ -50,6 +51,7 @@ public interface Session
                 SessionListenerManager,
                 ReservedSessionMessagesManager,
                 ChannelListenerManager,
+                PortForwardingEventListenerManager,
                 FactoryManagerHolder,
                 PropertyResolver,
                 AttributeStore,

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
index 98caf03..8e60a67 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -42,6 +43,7 @@ import org.apache.sshd.common.channel.ChannelListener;
 import org.apache.sshd.common.channel.OpenChannelException;
 import org.apache.sshd.common.channel.RequestHandler;
 import org.apache.sshd.common.channel.Window;
+import org.apache.sshd.common.forward.PortForwardingEventListener;
 import org.apache.sshd.common.forward.TcpipForwarder;
 import org.apache.sshd.common.forward.TcpipForwarderFactory;
 import org.apache.sshd.common.future.SshFutureListener;
@@ -49,6 +51,7 @@ import org.apache.sshd.common.io.AbstractIoWriteFuture;
 import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.session.Session;
+import org.apache.sshd.common.util.EventListenerUtils;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.Int2IntFunction;
 import org.apache.sshd.common.util.ValidateUtils;
@@ -63,7 +66,9 @@ import org.apache.sshd.server.x11.X11ForwardSupport;
  * @param <S> Type of {@link AbstractSession} being used
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public abstract class AbstractConnectionService<S extends AbstractSession> extends AbstractInnerCloseable implements ConnectionService {
+public abstract class AbstractConnectionService<S extends AbstractSession>
+                extends AbstractInnerCloseable
+                implements ConnectionService {
     /**
      * Property that can be used to configure max. allowed concurrent active channels
      *
@@ -94,11 +99,34 @@ public abstract class AbstractConnectionService<S extends AbstractSession> exten
     private final AtomicReference<X11ForwardSupport> x11ForwardHolder = new AtomicReference<>();
     private final AtomicReference<TcpipForwarder> tcpipForwarderHolder = new AtomicReference<>();
     private final AtomicBoolean allowMoreSessions = new AtomicBoolean(true);
+    private final Collection<PortForwardingEventListener> listeners =
+            EventListenerUtils.<PortForwardingEventListener>synchronizedListenersSet();
+    private final PortForwardingEventListener listenerProxy;
 
     private final S sessionInstance;
 
     protected AbstractConnectionService(S session) {
         sessionInstance = ValidateUtils.checkNotNull(session, "No session");
+        listenerProxy = EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, getClass().getClassLoader(), listeners);
+    }
+
+    @Override
+    public PortForwardingEventListener getPortForwardingEventListenerProxy() {
+        return listenerProxy;
+    }
+
+    @Override
+    public void addPortForwardingEventListener(PortForwardingEventListener listener) {
+        listeners.add(Objects.requireNonNull(listener, "No listener to add"));
+    }
+
+    @Override
+    public void removePortForwardingEventListener(PortForwardingEventListener listener) {
+        if (listener == null) {
+            return;
+        }
+
+        listeners.remove(listener);
     }
 
     public Collection<Channel> getChannels() {
@@ -140,7 +168,10 @@ public abstract class AbstractConnectionService<S extends AbstractSession> exten
                 ValidateUtils.checkNotNull(session.getFactoryManager(), "No factory manager");
         TcpipForwarderFactory factory =
                 ValidateUtils.checkNotNull(manager.getTcpipForwarderFactory(), "No forwarder factory");
-        return factory.create(this);
+        TcpipForwarder forwarder = factory.create(this);
+        forwarder.addPortForwardingEventListener(getPortForwardingEventListenerProxy());
+        forwarder.addPortForwardingEventListener(session.getPortForwardingEventListenerProxy());
+        return forwarder;
     }
 
     @Override
@@ -184,6 +215,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession> exten
         if (log.isDebugEnabled()) {
             log.debug("getAgentForwardSupport({}) created instance", session);
         }
+
         return agentForward;
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
index 824b9dd..4b2af70 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
@@ -34,7 +34,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -60,6 +59,7 @@ import org.apache.sshd.common.cipher.CipherInformation;
 import org.apache.sshd.common.compression.Compression;
 import org.apache.sshd.common.compression.CompressionInformation;
 import org.apache.sshd.common.digest.Digest;
+import org.apache.sshd.common.forward.PortForwardingEventListener;
 import org.apache.sshd.common.future.DefaultKeyExchangeFuture;
 import org.apache.sshd.common.future.DefaultSshFuture;
 import org.apache.sshd.common.future.KeyExchangeFuture;
@@ -134,15 +134,24 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
     /**
      * Session listeners container
      */
-    protected final Collection<SessionListener> sessionListeners = new CopyOnWriteArraySet<>();
+    protected final Collection<SessionListener> sessionListeners =
+            EventListenerUtils.<SessionListener>synchronizedListenersSet();
     protected final SessionListener sessionListenerProxy;
 
     /**
      * Channel events listener container
      */
-    protected final Collection<ChannelListener> channelListeners = new CopyOnWriteArraySet<>();
+    protected final Collection<ChannelListener> channelListeners =
+            EventListenerUtils.<ChannelListener>synchronizedListenersSet();
     protected final ChannelListener channelListenerProxy;
 
+    /**
+     * Port forwarding events listener container
+     */
+    protected final Collection<PortForwardingEventListener> tunnelListeners =
+            EventListenerUtils.<PortForwardingEventListener>synchronizedListenersSet();
+    protected final PortForwardingEventListener tunnelListenerProxy;
+
     /*
      * Key exchange support
      */
@@ -256,10 +265,12 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
         ClassLoader loader = getClass().getClassLoader();
         sessionListenerProxy = EventListenerUtils.proxyWrapper(SessionListener.class, loader, sessionListeners);
         channelListenerProxy = EventListenerUtils.proxyWrapper(ChannelListener.class, loader, channelListeners);
+        tunnelListenerProxy = EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, loader, tunnelListeners);
 
         // Delegate the task of further notifications to the session
         addSessionListener(factoryManager.getSessionListenerProxy());
         addChannelListener(factoryManager.getChannelListenerProxy());
+        addPortForwardingEventListener(factoryManager.getPortForwardingEventListenerProxy());
     }
 
     /**
@@ -2052,6 +2063,40 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
         return channelListenerProxy;
     }
 
+    @Override
+    public PortForwardingEventListener getPortForwardingEventListenerProxy() {
+        return tunnelListenerProxy;
+    }
+
+    @Override
+    public void addPortForwardingEventListener(PortForwardingEventListener listener) {
+        ValidateUtils.checkNotNull(listener, "addPortForwardingEventListener(%s) null instance", this);
+        // avoid race conditions on notifications while session is being closed
+        if (!isOpen()) {
+            log.warn("addPortForwardingEventListener({})[{}] ignore registration while session is closing", this, listener);
+            return;
+        }
+
+        if (this.tunnelListeners.add(listener)) {
+            log.trace("addPortForwardingEventListener({})[{}] registered", this, listener);
+        } else {
+            log.trace("addPortForwardingEventListener({})[{}] ignored duplicate", this, listener);
+        }
+    }
+
+    @Override
+    public void removePortForwardingEventListener(PortForwardingEventListener listener) {
+        if (listener == null) {
+            return;
+        }
+
+        if (this.tunnelListeners.remove(listener)) {
+            log.trace("removePortForwardingEventListener({})[{}] removed", this, listener);
+        } else {
+            log.trace("removePortForwardingEventListener({})[{}] not registered", this, listener);
+        }
+    }
+
     /**
      * Sends a session event to all currently registered session listeners
      *
@@ -2261,7 +2306,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
     protected abstract void checkKeys() throws IOException;
 
     protected void receiveKexInit(Buffer buffer) throws IOException {
-        Map<KexProposalOption, String> proposal = new EnumMap<KexProposalOption, String>(KexProposalOption.class);
+        Map<KexProposalOption, String> proposal = new EnumMap<>(KexProposalOption.class);
         byte[] seed = receiveKexInit(buffer, proposal);
         receiveKexInit(proposal, seed);
     }
@@ -2336,7 +2381,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
     protected Pair<TimeoutStatus, String> checkAuthenticationTimeout(long now, long authTimeoutMs) {
         long authDiff = now - authTimeoutStart;
         if ((!authed) && (authTimeoutMs > 0L) && (authDiff > authTimeoutMs)) {
-            return new Pair<TimeoutStatus, String>(TimeoutStatus.AuthTimeout, "Session has timed out waiting for authentication after " + authTimeoutMs + " ms.");
+            return new Pair<>(TimeoutStatus.AuthTimeout, "Session has timed out waiting for authentication after " + authTimeoutMs + " ms.");
         } else {
             return null;
         }
@@ -2356,7 +2401,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
     protected Pair<TimeoutStatus, String> checkIdleTimeout(long now, long idleTimeoutMs) {
         long idleDiff = now - idleTimeoutStart;
         if ((idleTimeoutMs > 0L) && (idleDiff > idleTimeoutMs)) {
-            return new Pair<TimeoutStatus, String>(TimeoutStatus.IdleTimeout, "User session has timed out idling after " + idleTimeoutMs + " ms.");
+            return new Pair<>(TimeoutStatus.IdleTimeout, "User session has timed out idling after " + idleTimeoutMs + " ms.");
         } else {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java b/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java
index f1003cf..51f7dd5 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java
@@ -22,18 +22,108 @@ package org.apache.sshd.common.util;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.EventListener;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
 
 
 /**
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
 public final class EventListenerUtils {
+    /**
+     * A special &quot;comparator&quot; whose only purpose is to ensure
+     * there are no same references in a listener's set
+     */
+    @SuppressWarnings("checkstyle:anoninnerlength")
+    public static final Comparator<EventListener> LISTENER_INSTANCE_COMPARATOR =
+        new Comparator<EventListener>() {
+            @Override
+            public int compare(EventListener l1, EventListener l2) {
+                if (l1 == l2) {
+                    return 0;
+                } else if (l1 == null) {
+                    return 1;
+                } else if (l2 == null) {
+                    return -1;
+                }
+
+                Class<?> c1 = l1.getClass();
+                Class<?> c2 = l2.getClass();
+                boolean checkHashCodes = true;
+                if (Proxy.isProxyClass(c1)) {
+                    if (Proxy.isProxyClass(c2)) {
+                        checkHashCodes = false; // cannot call hashCode on a proxy
+                    } else {
+                        return 1;
+                    }
+                } else if (Proxy.isProxyClass(c2)) {
+                    return -1;
+                }
+
+                if (checkHashCodes) {
+                    int nRes = Integer.compare(l1.hashCode(), l2.hashCode());
+                    if (nRes != 0) {
+                        return nRes;
+                    }
+                }
+
+                int nRes = Integer.compare(System.identityHashCode(l1), System.identityHashCode(l2));
+                if (nRes != 0) {
+                    return nRes;
+                }
+
+                if (c1 != c2) {
+                    return c1.getName().compareTo(c2.getName());
+                }
+
+                String s1 = Objects.toString(l1.toString(), "");
+                String s2 = Objects.toString(l2.toString(), "");
+                nRes = s1.compareTo(s2);
+                if (nRes != 0) {
+                    return nRes;
+                }
+                throw new UnsupportedOperationException("Ran out of options to compare instance of " + s1 + " vs. " + s2);
+            }
+    };
+
     private EventListenerUtils() {
         throw new UnsupportedOperationException("No instance");
     }
 
     /**
+     * @param <L> Type of {@link EventListener} contained in the set
+     * @param listeners The listeners to pre-add to the create set - ignored
+     * if (@code null}/empty
+     * @return A (synchronized) {@link Set} for containing the listeners ensuring
+     * that if same listener instance is added repeatedly only <U>one</U>
+     * instance is actually contained
+     */
+    public static <L extends EventListener> Set<L> synchronizedListenersSet(Collection<? extends L> listeners) {
+        Set<L> s = EventListenerUtils.<L>synchronizedListenersSet();
+        if (GenericUtils.size(listeners) > 0) {
+            s.addAll(listeners);
+        }
+
+        return s;
+    }
+
+    /**
+     * @param <L> Type of {@link EventListener} contained in the set
+     * @return A (synchronized) {@link Set} for containing the listeners ensuring
+     * that if same listener instance is added repeatedly only <U>one</U>
+     * instance is actually contained
+     * @see #LISTENER_INSTANCE_COMPARATOR
+     */
+    public static <L extends EventListener> Set<L> synchronizedListenersSet() {
+        return Collections.synchronizedSet(new TreeSet<L>(LISTENER_INSTANCE_COMPARATOR));
+    }
+
+    /**
      * Provides proxy wrapper around an {@link Iterable} container of listener
      * interface implementation. <b>Note:</b> a listener interface is one whose
      * invoked methods return <u>only</u> {@code void}.

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/server/SignalListener.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/SignalListener.java b/sshd-core/src/main/java/org/apache/sshd/server/SignalListener.java
index 88da0c3..f16dbfd 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/SignalListener.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/SignalListener.java
@@ -18,10 +18,12 @@
  */
 package org.apache.sshd.server;
 
+import java.util.EventListener;
+
 /**
  * Define a listener to receive signals
  */
-public interface SignalListener {
+public interface SignalListener extends EventListener {
 
     /**
      * @param signal The received {@link Signal}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/server/StandardEnvironment.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/StandardEnvironment.java b/sshd-core/src/main/java/org/apache/sshd/server/StandardEnvironment.java
index 7a5b1c5..0b4adeb 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/StandardEnvironment.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/StandardEnvironment.java
@@ -21,11 +21,10 @@ package org.apache.sshd.server;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
 
 import org.apache.sshd.common.channel.PtyMode;
+import org.apache.sshd.common.util.EventListenerUtils;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.logging.AbstractLoggingBean;
@@ -34,7 +33,7 @@ import org.apache.sshd.common.util.logging.AbstractLoggingBean;
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
 public class StandardEnvironment extends AbstractLoggingBean implements Environment {
-    private final Map<Signal, Set<SignalListener>> listeners;
+    private final Map<Signal, Collection<SignalListener>> listeners;
     private final Map<String, String> env;
     private final Map<PtyMode, Integer> ptyModes;
 
@@ -83,7 +82,7 @@ public class StandardEnvironment extends AbstractLoggingBean implements Environm
     public void removeSignalListener(SignalListener listener) {
         ValidateUtils.checkNotNull(listener, "No listener instance");
         for (Signal s : Signal.SIGNALS) {
-            Set<SignalListener> ls = getSignalListeners(s, false);
+            Collection<SignalListener> ls = getSignalListeners(s, false);
             if (ls != null) {
                 ls.remove(listener);
             }
@@ -91,7 +90,7 @@ public class StandardEnvironment extends AbstractLoggingBean implements Environm
     }
 
     public void signal(Signal signal) {
-        Set<SignalListener> ls = getSignalListeners(signal, false);
+        Collection<SignalListener> ls = getSignalListeners(signal, false);
         if (log.isDebugEnabled()) {
             log.debug("signal({}) - listeners={}", signal, ls);
         }
@@ -131,17 +130,17 @@ public class StandardEnvironment extends AbstractLoggingBean implements Environm
      *
      * @param signal The specified {@link Signal}
      * @param create If {@code true} and no current listeners are mapped then
-     *               creates a new {@link Set}
-     * @return The {@link Set} of listeners registered for the signal - may be
+     *               creates a new {@link Collection}
+     * @return The {@link Collection} of listeners registered for the signal - may be
      * {@code null} in case <tt>create</tt> is {@code false}
      */
-    protected Set<SignalListener> getSignalListeners(Signal signal, boolean create) {
-        Set<SignalListener> ls = listeners.get(signal);
+    protected Collection<SignalListener> getSignalListeners(Signal signal, boolean create) {
+        Collection<SignalListener> ls = listeners.get(signal);
         if ((ls == null) && create) {
             synchronized (listeners) {
                 ls = listeners.get(signal);
                 if (ls == null) {
-                    ls = new CopyOnWriteArraySet<>();
+                    ls =  EventListenerUtils.<SignalListener>synchronizedListenersSet();
                     listeners.put(signal, ls);
                 }
             }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java b/sshd-core/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java
index d515046..0961e1d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java
@@ -19,7 +19,6 @@
 package org.apache.sshd.server.scp;
 
 import java.util.Collection;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.sshd.common.scp.ScpFileOpener;
@@ -107,7 +106,8 @@ public class ScpCommandFactory implements ScpFileOpenerHolder, CommandFactory, C
     private ScpFileOpener fileOpener;
     private int sendBufferSize = ScpHelper.MIN_SEND_BUFFER_SIZE;
     private int receiveBufferSize = ScpHelper.MIN_RECEIVE_BUFFER_SIZE;
-    private Collection<ScpTransferEventListener> listeners = new CopyOnWriteArraySet<>();
+    private Collection<ScpTransferEventListener> listeners =
+            EventListenerUtils.<ScpTransferEventListener>synchronizedListenersSet();
     private ScpTransferEventListener listenerProxy;
 
     public ScpCommandFactory() {
@@ -258,9 +258,7 @@ public class ScpCommandFactory implements ScpFileOpenerHolder, CommandFactory, C
         try {
             ScpCommandFactory other = getClass().cast(super.clone());
             // clone the listeners set as well
-            other.listeners = this.listeners.isEmpty()
-                    ? new CopyOnWriteArraySet<ScpTransferEventListener>()
-                    : new CopyOnWriteArraySet<>(this.listeners);
+            other.listeners = EventListenerUtils.<ScpTransferEventListener>synchronizedListenersSet(this.listeners);
             other.listenerProxy = EventListenerUtils.proxyWrapper(ScpTransferEventListener.class, getClass().getClassLoader(), other.listeners);
             return other;
         } catch (CloneNotSupportedException e) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionService.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionService.java
index 9ed7146..b76bfa3 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionService.java
@@ -26,7 +26,9 @@ import org.apache.sshd.common.session.helpers.AbstractConnectionService;
  *
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public class ServerConnectionService extends AbstractConnectionService<AbstractServerSession> implements ServerSessionHolder {
+public class ServerConnectionService
+        extends AbstractConnectionService<AbstractServerSession>
+        implements ServerSessionHolder {
     protected ServerConnectionService(AbstractServerSession s) throws SshException {
         super(s);
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionServiceFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionServiceFactory.java b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionServiceFactory.java
index 1607e32..7568c81 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionServiceFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionServiceFactory.java
@@ -22,14 +22,26 @@ import java.io.IOException;
 
 import org.apache.sshd.common.Service;
 import org.apache.sshd.common.ServiceFactory;
+import org.apache.sshd.common.forward.PortForwardingEventListener;
+import org.apache.sshd.common.session.AbstractConnectionServiceFactory;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.ValidateUtils;
 
 /**
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public class ServerConnectionServiceFactory implements ServiceFactory {
-    public static final ServerConnectionServiceFactory INSTANCE = new ServerConnectionServiceFactory();
+public class ServerConnectionServiceFactory extends AbstractConnectionServiceFactory implements ServiceFactory {
+    public static final ServerConnectionServiceFactory INSTANCE = new ServerConnectionServiceFactory() {
+        @Override
+        public void addPortForwardingEventListener(PortForwardingEventListener listener) {
+            throw new UnsupportedOperationException("addPortForwardingListener(" + listener + ") N/A on default instance");
+        }
+
+        @Override
+        public void removePortForwardingEventListener(PortForwardingEventListener listener) {
+            throw new UnsupportedOperationException("removePortForwardingEventListener(" + listener + ") N/A on default instance");
+        }
+    };
 
     public ServerConnectionServiceFactory() {
         super();
@@ -43,6 +55,8 @@ public class ServerConnectionServiceFactory implements ServiceFactory {
     @Override
     public Service create(Session session) throws IOException {
         ValidateUtils.checkTrue(session instanceof AbstractServerSession, "Not a server session: %s", session);
-        return new ServerConnectionService((AbstractServerSession) session);
+        ServerConnectionService service = new ServerConnectionService((AbstractServerSession) session);
+        service.addPortForwardingEventListener(getPortForwardingEventListenerProxy());
+        return service;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpEventListenerManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpEventListenerManager.java b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpEventListenerManager.java
index b6d9e28..aa2c01c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpEventListenerManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpEventListenerManager.java
@@ -20,7 +20,6 @@
 package org.apache.sshd.server.subsystem.sftp;
 
 import java.util.Collection;
-import java.util.concurrent.CopyOnWriteArraySet;
 
 import org.apache.sshd.common.util.EventListenerUtils;
 import org.apache.sshd.common.util.ValidateUtils;
@@ -29,7 +28,8 @@ import org.apache.sshd.common.util.ValidateUtils;
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
 public abstract class AbstractSftpEventListenerManager implements SftpEventListenerManager {
-    private final Collection<SftpEventListener> sftpEventListeners = new CopyOnWriteArraySet<>();
+    private final Collection<SftpEventListener> sftpEventListeners =
+            EventListenerUtils.<SftpEventListener>synchronizedListenersSet();
     private final SftpEventListener sftpEventListenerProxy;
 
     protected AbstractSftpEventListenerManager() {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
index 8fe54d2..8b53016 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
@@ -68,7 +68,6 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -239,7 +238,7 @@ public class SftpSubsystem
     public static final String ACL_SUPPORTED_MASK_PROP = "sftp-acl-supported-mask";
     public static final Set<Integer> DEFAULT_ACL_SUPPORTED_MASK =
             Collections.unmodifiableSet(
-                    new HashSet<Integer>(Arrays.asList(
+                    new HashSet<>(Arrays.asList(
                             SftpConstants.SSH_ACL_CAP_ALLOW,
                             SftpConstants.SSH_ACL_CAP_DENY,
                             SftpConstants.SSH_ACL_CAP_AUDIT,
@@ -303,7 +302,8 @@ public class SftpSubsystem
 
     private ServerSession serverSession;
     private final AtomicBoolean closed = new AtomicBoolean(false);
-    private final Collection<SftpEventListener> sftpEventListeners = new CopyOnWriteArraySet<>();
+    private final Collection<SftpEventListener> sftpEventListeners =
+            EventListenerUtils.<SftpEventListener>synchronizedListenersSet();
     private final SftpEventListener sftpEventListenerProxy;
 
     /**
@@ -2297,7 +2297,7 @@ public class SftpSubsystem
 
         Map<String, OptionalFeature> extensions = getSupportedClientExtensions();
         int numExtensions = GenericUtils.size(extensions);
-        List<String> extras = (numExtensions <= 0) ? Collections.<String>emptyList() : new ArrayList<String>(numExtensions);
+        List<String> extras = (numExtensions <= 0) ? Collections.<String>emptyList() : new ArrayList<>(numExtensions);
         if (numExtensions > 0) {
             for (Map.Entry<String, OptionalFeature> ee : extensions.entrySet()) {
                 String name = ee.getKey();
@@ -2354,7 +2354,7 @@ public class SftpSubsystem
         }
 
         String[] names = GenericUtils.split(override, ',');
-        Set<Integer> maskValues = new HashSet<Integer>(names.length);
+        Set<Integer> maskValues = new HashSet<>(names.length);
         for (String n : names) {
             Integer v = ValidateUtils.checkNotNull(
                     AclSupportedParser.AclCapabilities.getAclCapabilityValue(n), "Unknown ACL capability: %s", n);
@@ -3004,7 +3004,7 @@ public class SftpSubsystem
     }
 
     protected void setFileAttributes(Path file, Map<String, ?> attributes, LinkOption ... options) throws IOException {
-        Set<String> unsupported = new TreeSet<String>(String.CASE_INSENSITIVE_ORDER);
+        Set<String> unsupported = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
         for (Map.Entry<String, ?> ae : attributes.entrySet()) {
             String attribute = ae.getKey();
             Object value = ae.getValue();