You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2016/07/30 14:15:45 UTC
[1/2] mina-sshd git commit: [SSHD-682] Provide
PortForwardingEventListener support
Repository: mina-sshd
Updated Branches:
refs/heads/master cac64e4f9 -> 549ac4e2a
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/test/java/org/apache/sshd/ProxyTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/ProxyTest.java b/sshd-core/src/test/java/org/apache/sshd/ProxyTest.java
index 36f9eb9..70d8775 100644
--- a/sshd-core/src/test/java/org/apache/sshd/ProxyTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/ProxyTest.java
@@ -26,6 +26,8 @@ import java.net.Proxy;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.service.IoAcceptor;
@@ -37,6 +39,7 @@ import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.client.session.forward.DynamicPortForwardingTracker;
import org.apache.sshd.common.FactoryManager;
import org.apache.sshd.common.PropertyResolverUtils;
+import org.apache.sshd.common.forward.PortForwardingEventListener;
import org.apache.sshd.common.util.net.SshdSocketAddress;
import org.apache.sshd.server.SshServer;
import org.apache.sshd.server.forward.AcceptAllForwardingFilter;
@@ -99,13 +102,78 @@ public class ProxyTest extends BaseTestSupport {
@Test
public void testSocksProxy() throws Exception {
- try (ClientSession session = createNativeSession()) {
+ final AtomicReference<SshdSocketAddress> localAddressHolder = new AtomicReference<>();
+ final AtomicReference<SshdSocketAddress> boundAddressHolder = new AtomicReference<>();
+ final AtomicInteger tearDownSignal = new AtomicInteger(0);
+ @SuppressWarnings("checkstyle:anoninnerlength")
+ PortForwardingEventListener listener = new PortForwardingEventListener() {
+ @Override
+ public void tornDownExplicitTunnel(
+ org.apache.sshd.common.session.Session session, SshdSocketAddress address, boolean localForwarding, Throwable reason)
+ throws IOException {
+ throw new UnsupportedOperationException("Unexpected explicit tunnel torn down indication: session=" + session + ", address=" + address);
+ }
+
+ @Override
+ public void tornDownDynamicTunnel(
+ org.apache.sshd.common.session.Session session, SshdSocketAddress address, Throwable reason) throws IOException {
+ assertNotNull("Establishment (local) indication not invoked for address=" + address, localAddressHolder.get());
+ assertNotNull("Establishment (bound) indication not invoked for address=" + address, boundAddressHolder.get());
+ assertEquals("No tear down indication", 1, tearDownSignal.get());
+ }
+
+ @Override
+ public void tearingDownExplicitTunnel(
+ org.apache.sshd.common.session.Session session, SshdSocketAddress address, boolean localForwarding)
+ throws IOException {
+ throw new UnsupportedOperationException("Unexpected explicit tunnel tear down indication: session=" + session + ", address=" + address);
+ }
+
+ @Override
+ public void tearingDownDynamicTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress address)
+ throws IOException {
+ assertNotNull("Establishment (local) indication not invoked for address=" + address, localAddressHolder.get());
+ assertNotNull("Establishment (bound) indication not invoked for address=" + address, boundAddressHolder.get());
+ assertEquals("Multiple tearing down indications", 1, tearDownSignal.incrementAndGet());
+ }
+
+ @Override
+ public void establishingExplicitTunnel(
+ org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding)
+ throws IOException {
+ throw new UnsupportedOperationException("Unexpected explicit tunnel establishment indication: session=" + session + ", address=" + local);
+ }
+
+ @Override
+ public void establishingDynamicTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local)
+ throws IOException {
+ assertNull("Multiple calls to establishment indicator", localAddressHolder.getAndSet(local));
+ }
+
+ @Override
+ public void establishedExplicitTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local,
+ SshdSocketAddress remote, boolean localForwarding, SshdSocketAddress boundAddress, Throwable reason)
+ throws IOException {
+ throw new UnsupportedOperationException("Unexpected explicit tunnel established indication: session=" + session + ", address=" + boundAddress);
+ }
+
+ @Override
+ public void establishedDynamicTunnel(
+ org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress boundAddress, Throwable reason)
+ throws IOException {
+ assertSame("Establishment indication not invoked", local, localAddressHolder.get());
+ assertNull("Multiple calls to establishment indicator", boundAddressHolder.getAndSet(boundAddress));
+ }
+ };
+
+ try (ClientSession session = createNativeSession(listener)) {
String expected = getCurrentTestName();
byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
byte[] buf = new byte[bytes.length + Long.SIZE];
SshdSocketAddress dynamic;
- try (DynamicPortForwardingTracker tracker = session.createDynamicPortForwardingTracker(new SshdSocketAddress(TEST_LOCALHOST, 0))) {
+ try (DynamicPortForwardingTracker tracker =
+ session.createDynamicPortForwardingTracker(new SshdSocketAddress(TEST_LOCALHOST, 0))) {
dynamic = tracker.getBoundAddress();
assertTrue("Tracker not marked as open", tracker.isOpen());
@@ -128,8 +196,13 @@ public class ProxyTest extends BaseTestSupport {
tracker.close();
assertFalse("Tracker not marked as closed", tracker.isOpen());
+ } finally {
+ client.removePortForwardingEventListener(listener);
}
+ assertNotNull("Local tunnel address not indicated", localAddressHolder.getAndSet(null));
+ assertNotNull("Bound tunnel address not indicated", boundAddressHolder.getAndSet(null));
+
try {
try (Socket s = new Socket(new Proxy(Proxy.Type.SOCKS, new InetSocketAddress(TEST_LOCALHOST, dynamic.getPort())))) {
s.connect(new InetSocketAddress(TEST_LOCALHOST, echoPort));
@@ -143,11 +216,14 @@ public class ProxyTest extends BaseTestSupport {
}
}
- protected ClientSession createNativeSession() throws Exception {
+ protected ClientSession createNativeSession(PortForwardingEventListener listener) throws Exception {
client = setupTestClient();
PropertyResolverUtils.updateProperty(client, FactoryManager.WINDOW_SIZE, 2048);
PropertyResolverUtils.updateProperty(client, FactoryManager.MAX_PACKET_SIZE, 256);
client.setTcpipForwardingFilter(AcceptAllForwardingFilter.INSTANCE);
+ if (listener != null) {
+ client.addPortForwardingEventListener(listener);
+ }
client.start();
ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, sshPort).verify(7L, TimeUnit.SECONDS).getSession();
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/test/java/org/apache/sshd/client/ClientAuthenticationManagerTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/client/ClientAuthenticationManagerTest.java b/sshd-core/src/test/java/org/apache/sshd/client/ClientAuthenticationManagerTest.java
index 0d08497..f42704a 100644
--- a/sshd-core/src/test/java/org/apache/sshd/client/ClientAuthenticationManagerTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/client/ClientAuthenticationManagerTest.java
@@ -41,6 +41,7 @@ import org.apache.sshd.common.NamedResource;
import org.apache.sshd.common.channel.Channel;
import org.apache.sshd.common.channel.ChannelListener;
import org.apache.sshd.common.forward.DefaultTcpipForwarderFactory;
+import org.apache.sshd.common.forward.PortForwardingEventListener;
import org.apache.sshd.common.io.IoSession;
import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.keyprovider.KeyPairProvider;
@@ -50,6 +51,7 @@ import org.apache.sshd.common.random.SingletonRandomFactory;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.session.SessionListener;
import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.net.SshdSocketAddress;
import org.apache.sshd.util.test.BaseTestSupport;
import org.junit.FixMethodOrder;
import org.junit.Test;
@@ -284,6 +286,52 @@ public class ClientAuthenticationManagerTest extends BaseTestSupport {
// ignored
}
});
+ Mockito.when(client.getPortForwardingEventListenerProxy()).thenReturn(new PortForwardingEventListener() {
+ @Override
+ public void tornDownExplicitTunnel(Session session, SshdSocketAddress address, boolean localForwarding,
+ Throwable reason) throws IOException {
+ // ignored
+ }
+
+ @Override
+ public void tornDownDynamicTunnel(Session session, SshdSocketAddress address, Throwable reason) throws IOException {
+ // ignored
+ }
+
+ @Override
+ public void tearingDownExplicitTunnel(Session session, SshdSocketAddress address, boolean localForwarding)
+ throws IOException {
+ // ignored
+ }
+
+ @Override
+ public void tearingDownDynamicTunnel(Session session, SshdSocketAddress address) throws IOException {
+ // ignored
+ }
+
+ @Override
+ public void establishingExplicitTunnel(Session session, SshdSocketAddress local, SshdSocketAddress remote,
+ boolean localForwarding) throws IOException {
+ // ignored
+ }
+
+ @Override
+ public void establishingDynamicTunnel(Session session, SshdSocketAddress local) throws IOException {
+ // ignored
+ }
+
+ @Override
+ public void establishedExplicitTunnel(Session session, SshdSocketAddress local, SshdSocketAddress remote,
+ boolean localForwarding, SshdSocketAddress boundAddress, Throwable reason) throws IOException {
+ // ignored
+ }
+
+ @Override
+ public void establishedDynamicTunnel(Session session, SshdSocketAddress local, SshdSocketAddress boundAddress,
+ Throwable reason) throws IOException {
+ // ignored
+ }
+ });
Factory<Random> randomFactory = new SingletonRandomFactory(JceRandomFactory.INSTANCE);
Mockito.when(client.getRandomFactory()).thenReturn(randomFactory);
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java
index 74d9b91..26480a9 100644
--- a/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.sshd.common.forward;
+import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Field;
@@ -35,6 +36,8 @@ import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
@@ -267,7 +270,7 @@ public class PortForwardingTest extends BaseTestSupport {
@Test
public void testRemoteForwardingNative() throws Exception {
- try (ClientSession session = createNativeSession()) {
+ try (ClientSession session = createNativeSession(null)) {
SshdSocketAddress remote = new SshdSocketAddress("", 0);
SshdSocketAddress local = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
SshdSocketAddress bound = session.startRemotePortForwarding(remote, local);
@@ -295,9 +298,76 @@ public class PortForwardingTest extends BaseTestSupport {
@Test
public void testRemoteForwardingNativeBigPayload() throws Exception {
- try (ClientSession session = createNativeSession();
+ final AtomicReference<SshdSocketAddress> localAddressHolder = new AtomicReference<>();
+ final AtomicReference<SshdSocketAddress> remoteAddressHolder = new AtomicReference<>();
+ final AtomicReference<SshdSocketAddress> boundAddressHolder = new AtomicReference<>();
+ final AtomicInteger tearDownSignal = new AtomicInteger(0);
+ @SuppressWarnings("checkstyle:anoninnerlength")
+ PortForwardingEventListener listener = new PortForwardingEventListener() {
+ @Override
+ public void tornDownExplicitTunnel(
+ org.apache.sshd.common.session.Session session, SshdSocketAddress address, boolean localForwarding, Throwable reason)
+ throws IOException {
+ assertFalse("Unexpected local tunnel has been torn down: address=" + address, localForwarding);
+ assertEquals("Tear down indication not invoked", 1, tearDownSignal.get());
+ }
+
+ @Override
+ public void tornDownDynamicTunnel(
+ org.apache.sshd.common.session.Session session, SshdSocketAddress address, Throwable reason) throws IOException {
+ throw new UnsupportedOperationException("Unexpected dynamic tunnel torn down indication: session=" + session + ", address=" + address);
+ }
+
+ @Override
+ public void tearingDownExplicitTunnel(
+ org.apache.sshd.common.session.Session session, SshdSocketAddress address, boolean localForwarding)
+ throws IOException {
+ assertFalse("Unexpected local tunnel being torn down: address=" + address, localForwarding);
+ assertEquals("Duplicate tear down signalling", 1, tearDownSignal.incrementAndGet());
+ }
+
+ @Override
+ public void tearingDownDynamicTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress address)
+ throws IOException {
+ throw new UnsupportedOperationException("Unexpected dynamic tunnel tearing down indication: session=" + session + ", address=" + address);
+ }
+
+ @Override
+ public void establishingExplicitTunnel(
+ org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding)
+ throws IOException {
+ assertFalse("Unexpected local tunnel being established: local=" + local + ", remote=" + remote, localForwarding);
+ assertNull("Duplicate establishment indication call for local address=" + local, localAddressHolder.getAndSet(local));
+ assertNull("Duplicate establishment indication call for remote address=" + remote, remoteAddressHolder.getAndSet(remote));
+ }
+
+ @Override
+ public void establishingDynamicTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local)
+ throws IOException {
+ throw new UnsupportedOperationException("Unexpected dynamic tunnel establishing indication: session=" + session + ", address=" + local);
+ }
+
+ @Override
+ public void establishedExplicitTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local,
+ SshdSocketAddress remote, boolean localForwarding, SshdSocketAddress boundAddress, Throwable reason)
+ throws IOException {
+ assertFalse("Unexpected local tunnel has been established: local=" + local + ", remote=" + remote + ", bound=" + boundAddress, localForwarding);
+ assertSame("Mismatched established tunnel local address", local, localAddressHolder.get());
+ assertSame("Mismatched established tunnel remote address", remote, remoteAddressHolder.get());
+ assertNull("Duplicate establishment indication call for bound address=" + boundAddress, boundAddressHolder.getAndSet(boundAddress));
+ }
+
+ @Override
+ public void establishedDynamicTunnel(
+ org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress boundAddress, Throwable reason)
+ throws IOException {
+ throw new UnsupportedOperationException("Unexpected dynamic tunnel established indication: session=" + session + ", address=" + boundAddress);
+ }
+ };
+
+ try (ClientSession session = createNativeSession(listener);
ExplicitPortForwardingTracker tracker =
- session.createRemotePortForwardingTracker(new SshdSocketAddress(TEST_LOCALHOST, echoPort), new SshdSocketAddress("", 0))) {
+ session.createRemotePortForwardingTracker(new SshdSocketAddress("", 0), new SshdSocketAddress(TEST_LOCALHOST, echoPort))) {
assertTrue("Tracker not marked as open", tracker.isOpen());
assertFalse("Tracker not marked as remote", tracker.isLocalForwarding());
@@ -324,7 +394,13 @@ public class PortForwardingTest extends BaseTestSupport {
tracker.close();
}
assertFalse("Tracker not marked as closed", tracker.isOpen());
+ } finally {
+ client.removePortForwardingEventListener(listener);
}
+
+ assertNotNull("Local tunnel address not indicated", localAddressHolder.getAndSet(null));
+ assertNotNull("Remote tunnel address not indicated", remoteAddressHolder.getAndSet(null));
+ assertNotNull("Bound tunnel address not indicated", boundAddressHolder.getAndSet(null));
}
@Test
@@ -360,7 +436,74 @@ public class PortForwardingTest extends BaseTestSupport {
@Test
public void testLocalForwardingNative() throws Exception {
- try (ClientSession session = createNativeSession();
+ final AtomicReference<SshdSocketAddress> localAddressHolder = new AtomicReference<>();
+ final AtomicReference<SshdSocketAddress> remoteAddressHolder = new AtomicReference<>();
+ final AtomicReference<SshdSocketAddress> boundAddressHolder = new AtomicReference<>();
+ final AtomicInteger tearDownSignal = new AtomicInteger(0);
+ @SuppressWarnings("checkstyle:anoninnerlength")
+ PortForwardingEventListener listener = new PortForwardingEventListener() {
+ @Override
+ public void tornDownExplicitTunnel(
+ org.apache.sshd.common.session.Session session, SshdSocketAddress address, boolean localForwarding, Throwable reason)
+ throws IOException {
+ assertTrue("Unexpected remote tunnel has been torn down: address=" + address, localForwarding);
+ assertEquals("Tear down indication not invoked", 1, tearDownSignal.get());
+ }
+
+ @Override
+ public void tornDownDynamicTunnel(
+ org.apache.sshd.common.session.Session session, SshdSocketAddress address, Throwable reason) throws IOException {
+ throw new UnsupportedOperationException("Unexpected dynamic tunnel torn down indication: session=" + session + ", address=" + address);
+ }
+
+ @Override
+ public void tearingDownExplicitTunnel(
+ org.apache.sshd.common.session.Session session, SshdSocketAddress address, boolean localForwarding)
+ throws IOException {
+ assertTrue("Unexpected remote tunnel being torn down: address=" + address, localForwarding);
+ assertEquals("Duplicate tear down signalling", 1, tearDownSignal.incrementAndGet());
+ }
+
+ @Override
+ public void tearingDownDynamicTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress address)
+ throws IOException {
+ throw new UnsupportedOperationException("Unexpected dynamic tunnel tearing down indication: session=" + session + ", address=" + address);
+ }
+
+ @Override
+ public void establishingExplicitTunnel(
+ org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding)
+ throws IOException {
+ assertTrue("Unexpected remote tunnel being established: local=" + local + ", remote=" + remote, localForwarding);
+ assertNull("Duplicate establishment indication call for local address=" + local, localAddressHolder.getAndSet(local));
+ assertNull("Duplicate establishment indication call for remote address=" + remote, remoteAddressHolder.getAndSet(remote));
+ }
+
+ @Override
+ public void establishingDynamicTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local)
+ throws IOException {
+ throw new UnsupportedOperationException("Unexpected dynamic tunnel establishing indication: session=" + session + ", address=" + local);
+ }
+
+ @Override
+ public void establishedExplicitTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local,
+ SshdSocketAddress remote, boolean localForwarding, SshdSocketAddress boundAddress, Throwable reason)
+ throws IOException {
+ assertTrue("Unexpected remote tunnel has been established: local=" + local + ", remote=" + remote + ", bound=" + boundAddress, localForwarding);
+ assertSame("Mismatched established tunnel local address", local, localAddressHolder.get());
+ assertSame("Mismatched established tunnel remote address", remote, remoteAddressHolder.get());
+ assertNull("Duplicate establishment indication call for bound address=" + boundAddress, boundAddressHolder.getAndSet(boundAddress));
+ }
+
+ @Override
+ public void establishedDynamicTunnel(
+ org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress boundAddress, Throwable reason)
+ throws IOException {
+ throw new UnsupportedOperationException("Unexpected dynamic tunnel established indication: session=" + session + ", address=" + boundAddress);
+ }
+ };
+
+ try (ClientSession session = createNativeSession(listener);
ExplicitPortForwardingTracker tracker =
session.createLocalPortForwardingTracker(new SshdSocketAddress("", 0), new SshdSocketAddress(TEST_LOCALHOST, echoPort))) {
assertTrue("Tracker not marked as open", tracker.isOpen());
@@ -387,12 +530,18 @@ public class PortForwardingTest extends BaseTestSupport {
tracker.close();
}
assertFalse("Tracker not marked as closed", tracker.isOpen());
+ } finally {
+ client.removePortForwardingEventListener(listener);
}
+
+ assertNotNull("Local tunnel address not indicated", localAddressHolder.getAndSet(null));
+ assertNotNull("Remote tunnel address not indicated", remoteAddressHolder.getAndSet(null));
+ assertNotNull("Bound tunnel address not indicated", boundAddressHolder.getAndSet(null));
}
@Test
public void testLocalForwardingNativeReuse() throws Exception {
- try (ClientSession session = createNativeSession()) {
+ try (ClientSession session = createNativeSession(null)) {
SshdSocketAddress local = new SshdSocketAddress("", 0);
SshdSocketAddress remote = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
SshdSocketAddress bound = session.startLocalPortForwarding(local, remote);
@@ -406,7 +555,7 @@ public class PortForwardingTest extends BaseTestSupport {
@Test
public void testLocalForwardingNativeBigPayload() throws Exception {
- try (ClientSession session = createNativeSession()) {
+ try (ClientSession session = createNativeSession(null)) {
String expected = getCurrentTestName();
byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
byte[] buf = new byte[bytes.length + Long.SIZE];
@@ -436,7 +585,7 @@ public class PortForwardingTest extends BaseTestSupport {
@Test
public void testForwardingChannel() throws Exception {
- try (ClientSession session = createNativeSession()) {
+ try (ClientSession session = createNativeSession(null)) {
SshdSocketAddress local = new SshdSocketAddress("", 0);
SshdSocketAddress remote = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
@@ -594,11 +743,14 @@ public class PortForwardingTest extends BaseTestSupport {
return session;
}
- protected ClientSession createNativeSession() throws Exception {
+ protected ClientSession createNativeSession(PortForwardingEventListener listener) throws Exception {
client = setupTestClient();
PropertyResolverUtils.updateProperty(client, FactoryManager.WINDOW_SIZE, 2048);
PropertyResolverUtils.updateProperty(client, FactoryManager.MAX_PACKET_SIZE, 256);
client.setTcpipForwardingFilter(AcceptAllForwardingFilter.INSTANCE);
+ if (listener != null) {
+ client.addPortForwardingEventListener(listener);
+ }
client.start();
ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, sshPort).verify(7L, TimeUnit.SECONDS).getSession();
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/test/java/org/apache/sshd/common/util/EventListenerUtilsTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/util/EventListenerUtilsTest.java b/sshd-core/src/test/java/org/apache/sshd/common/util/EventListenerUtilsTest.java
index fdf7656..03d8d20 100644
--- a/sshd-core/src/test/java/org/apache/sshd/common/util/EventListenerUtilsTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/common/util/EventListenerUtilsTest.java
@@ -20,8 +20,11 @@
package org.apache.sshd.common.util;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.EventListener;
import java.util.List;
+import java.util.Set;
import org.apache.sshd.util.test.BaseTestSupport;
import org.junit.FixMethodOrder;
@@ -39,7 +42,7 @@ public class EventListenerUtilsTest extends BaseTestSupport {
@Test
public void testProxyWrapper() {
- List<ProxyListenerImpl> impls = new ArrayList<ProxyListenerImpl>();
+ List<ProxyListenerImpl> impls = new ArrayList<>();
for (int index = 0; index < Byte.SIZE; index++) {
impls.add(new ProxyListenerImpl());
}
@@ -57,6 +60,57 @@ public class EventListenerUtilsTest extends BaseTestSupport {
}
}
+ @Test
+ public void testListenerInstanceComparatorOnProxy() {
+ Comparator<? super EventListener> comparator = EventListenerUtils.LISTENER_INSTANCE_COMPARATOR;
+ ProxyListener p1 = EventListenerUtils.proxyWrapper(ProxyListener.class, Collections.singletonList(new ProxyListenerImpl()));
+ assertEquals("Mismatched self reference comparison", 0, comparator.compare(p1, p1));
+
+ EventListener l = new EventListener() { /* nothing extra */ };
+ assertEquals("Mismatched proxy vs. non-proxy result", 1, Integer.signum(comparator.compare(p1, l)));
+ assertEquals("Mismatched non-proxy vs. proxy result", -1, Integer.signum(comparator.compare(l, p1)));
+
+ ProxyListener p2 = EventListenerUtils.proxyWrapper(ProxyListener.class, Collections.singletonList(new ProxyListenerImpl()));
+ int p1vsp2 = Integer.signum(comparator.compare(p1, p2));
+ assertNotEquals("Mismatched p1 vs. p2 comparison", 0, p1vsp2);
+ assertEquals("Mismatched p2 vs. p1 comparison result", 0 - p1vsp2, Integer.signum(comparator.compare(p2, p1)));
+ }
+
+ @Test
+ public void testListenerInstanceComparatorOnNonProxy() {
+ Comparator<? super EventListener> comparator = EventListenerUtils.LISTENER_INSTANCE_COMPARATOR;
+ EventListener l1 = new EventListener() { /* nothing extra */ };
+ assertEquals("Mismatched self reference comparison", 0, comparator.compare(l1, l1));
+
+ EventListener l2 = new EventListener() { /* nothing extra */ };
+ int l1vsl2 = Integer.signum(comparator.compare(l1, l2));
+ assertNotEquals("Mismatched l1 vs. l2 comparison result", 0, l1vsl2);
+ assertEquals("Mismatched l2 vs. l1 comparison result", 0 - l1vsl2, Integer.signum(comparator.compare(l2, l1)));
+ }
+
+ @Test
+ public void testSynchronizedListenersSetOnProxies() {
+ ProxyListener p1 = EventListenerUtils.proxyWrapper(ProxyListener.class, Collections.singletonList(new ProxyListenerImpl()));
+ Set<ProxyListener> s = EventListenerUtils.<ProxyListener>synchronizedListenersSet();
+ for (int index = 1; index <= Byte.SIZE; index++) {
+ boolean modified = s.add(p1);
+ assertEquals("Mismatched p1 modification indicator at attempt #" + index, index == 1, modified);
+ assertEquals("Mismatched p1 set size at attempt #" + index, 1, s.size());
+ }
+
+ ProxyListener p2 = EventListenerUtils.proxyWrapper(ProxyListener.class, Collections.singletonList(new ProxyListenerImpl()));
+ for (int index = 1; index <= Byte.SIZE; index++) {
+ boolean modified = s.add(p2);
+ assertEquals("Mismatched p2 modification indicator at attempt #" + index, index == 1, modified);
+ assertEquals("Mismatched p2 set size at attempt #" + index, 2, s.size());
+ }
+
+ assertTrue("Failed to remove p1", s.remove(p1));
+ assertEquals("Mismatched post p1-remove size", 1, s.size());
+ assertTrue("Failed to remove p2", s.remove(p2));
+ assertEquals("Mismatched post p2-remove size", 0, s.size());
+ }
+
interface ProxyListener extends EventListener {
void callMeWithString(String s);
[2/2] mina-sshd git commit: [SSHD-682] Provide
PortForwardingEventListener support
Posted by lg...@apache.org.
[SSHD-682] Provide PortForwardingEventListener support
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/549ac4e2
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/549ac4e2
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/549ac4e2
Branch: refs/heads/master
Commit: 549ac4e2a197afebbd9d66ccc1cd8506c0a515fb
Parents: cac64e4
Author: Lyor Goldstein <ly...@gmail.com>
Authored: Sat Jul 30 17:17:09 2016 +0300
Committer: Lyor Goldstein <ly...@gmail.com>
Committed: Sat Jul 30 17:17:09 2016 +0300
----------------------------------------------------------------------
.../session/ClientConnectionServiceFactory.java | 20 +-
.../org/apache/sshd/common/FactoryManager.java | 2 +
.../sshd/common/channel/AbstractChannel.java | 8 +-
.../common/forward/DefaultTcpipForwarder.java | 265 ++++++++++++++-----
.../forward/DefaultTcpipForwarderFactory.java | 47 +++-
.../forward/PortForwardingEventListener.java | 133 ++++++++++
.../PortForwardingEventListenerManager.java | 52 ++++
.../sshd/common/forward/TcpipForwarder.java | 2 +-
.../common/forward/TcpipForwarderFactory.java | 1 -
.../common/helpers/AbstractFactoryManager.java | 46 +++-
.../AbstractConnectionServiceFactory.java | 60 +++++
.../sshd/common/session/ConnectionService.java | 3 +-
.../org/apache/sshd/common/session/Session.java | 2 +
.../helpers/AbstractConnectionService.java | 36 ++-
.../common/session/helpers/AbstractSession.java | 57 +++-
.../sshd/common/util/EventListenerUtils.java | 90 +++++++
.../org/apache/sshd/server/SignalListener.java | 4 +-
.../apache/sshd/server/StandardEnvironment.java | 19 +-
.../sshd/server/scp/ScpCommandFactory.java | 8 +-
.../server/session/ServerConnectionService.java | 4 +-
.../session/ServerConnectionServiceFactory.java | 20 +-
.../sftp/AbstractSftpEventListenerManager.java | 4 +-
.../server/subsystem/sftp/SftpSubsystem.java | 12 +-
.../test/java/org/apache/sshd/ProxyTest.java | 82 +++++-
.../client/ClientAuthenticationManagerTest.java | 48 ++++
.../sshd/common/forward/PortForwardingTest.java | 168 +++++++++++-
.../common/util/EventListenerUtilsTest.java | 56 +++-
27 files changed, 1124 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionServiceFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionServiceFactory.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionServiceFactory.java
index 2efe7b7..d71ce1c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionServiceFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionServiceFactory.java
@@ -22,14 +22,26 @@ import java.io.IOException;
import org.apache.sshd.common.Service;
import org.apache.sshd.common.ServiceFactory;
+import org.apache.sshd.common.forward.PortForwardingEventListener;
+import org.apache.sshd.common.session.AbstractConnectionServiceFactory;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.ValidateUtils;
/**
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public class ClientConnectionServiceFactory implements ServiceFactory {
- public static final ClientConnectionServiceFactory INSTANCE = new ClientConnectionServiceFactory();
+public class ClientConnectionServiceFactory extends AbstractConnectionServiceFactory implements ServiceFactory {
+ public static final ClientConnectionServiceFactory INSTANCE = new ClientConnectionServiceFactory() {
+ @Override
+ public void addPortForwardingEventListener(PortForwardingEventListener listener) {
+ throw new UnsupportedOperationException("addPortForwardingListener(" + listener + ") N/A on default instance");
+ }
+
+ @Override
+ public void removePortForwardingEventListener(PortForwardingEventListener listener) {
+ throw new UnsupportedOperationException("removePortForwardingEventListener(" + listener + ") N/A on default instance");
+ }
+ };
public ClientConnectionServiceFactory() {
super();
@@ -43,6 +55,8 @@ public class ClientConnectionServiceFactory implements ServiceFactory {
@Override
public Service create(Session session) throws IOException {
ValidateUtils.checkTrue(session instanceof AbstractClientSession, "Not a client sesssion: %s", session);
- return new ClientConnectionService((AbstractClientSession) session);
+ ClientConnectionService service = new ClientConnectionService((AbstractClientSession) session);
+ service.addPortForwardingEventListener(getPortForwardingEventListenerProxy());
+ return service;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
index 516da0b..ea3ad86 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
@@ -27,6 +27,7 @@ import org.apache.sshd.common.channel.Channel;
import org.apache.sshd.common.channel.ChannelListenerManager;
import org.apache.sshd.common.channel.RequestHandler;
import org.apache.sshd.common.file.FileSystemFactory;
+import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
import org.apache.sshd.common.forward.TcpipForwarderFactory;
import org.apache.sshd.common.io.IoServiceFactory;
import org.apache.sshd.common.kex.KexFactoryManager;
@@ -47,6 +48,7 @@ public interface FactoryManager
SessionListenerManager,
ReservedSessionMessagesManager,
ChannelListenerManager,
+ PortForwardingEventListenerManager,
AttributeStore,
PropertyResolver {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index 50022ff..f6f5ddf 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -27,7 +27,6 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -79,13 +78,14 @@ public abstract class AbstractChannel
protected final AtomicBoolean initialized = new AtomicBoolean(false);
protected final AtomicBoolean eofReceived = new AtomicBoolean(false);
protected final AtomicBoolean eofSent = new AtomicBoolean(false);
- protected AtomicReference<GracefulState> gracefulState = new AtomicReference<GracefulState>(GracefulState.Opened);
+ protected AtomicReference<GracefulState> gracefulState = new AtomicReference<>(GracefulState.Opened);
protected final DefaultCloseFuture gracefulFuture = new DefaultCloseFuture(lock);
- protected final List<RequestHandler<Channel>> handlers = new ArrayList<RequestHandler<Channel>>();
+ protected final List<RequestHandler<Channel>> handlers = new ArrayList<>();
/**
* Channel events listener
*/
- protected final Collection<ChannelListener> channelListeners = new CopyOnWriteArraySet<>();
+ protected final Collection<ChannelListener> channelListeners =
+ EventListenerUtils.<ChannelListener>synchronizedListenersSet();
protected final ChannelListener channelListenerProxy;
private int id = -1;
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
index b8aa438..aba9317 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
@@ -28,6 +28,7 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -49,6 +50,7 @@ import org.apache.sshd.common.io.IoSession;
import org.apache.sshd.common.session.ConnectionService;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.session.SessionHolder;
+import org.apache.sshd.common.util.EventListenerUtils;
import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.Readable;
import org.apache.sshd.common.util.ValidateUtils;
@@ -65,7 +67,7 @@ import org.apache.sshd.server.forward.ForwardingFilter;
*/
public class DefaultTcpipForwarder
extends AbstractInnerCloseable
- implements TcpipForwarder, SessionHolder<Session> {
+ implements TcpipForwarder, SessionHolder<Session>, PortForwardingEventListenerManager {
/**
* Used to configure the timeout (milliseconds) for receiving a response
@@ -101,11 +103,35 @@ public class DefaultTcpipForwarder
return new StaticIoHandler();
}
};
+ private final Collection<PortForwardingEventListener> listeners =
+ EventListenerUtils.<PortForwardingEventListener>synchronizedListenersSet();
+ private final PortForwardingEventListener listenerProxy;
+
private IoAcceptor acceptor;
public DefaultTcpipForwarder(ConnectionService service) {
this.service = ValidateUtils.checkNotNull(service, "No connection service");
this.sessionInstance = ValidateUtils.checkNotNull(service.getSession(), "No session");
+ this.listenerProxy = EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, getClass().getClassLoader(), listeners);
+ }
+
+ @Override
+ public PortForwardingEventListener getPortForwardingEventListenerProxy() {
+ return listenerProxy;
+ }
+
+ @Override
+ public void addPortForwardingEventListener(PortForwardingEventListener listener) {
+ listeners.add(Objects.requireNonNull(listener, "No listener to add"));
+ }
+
+ @Override
+ public void removePortForwardingEventListener(PortForwardingEventListener listener) {
+ if (listener == null) {
+ return;
+ }
+
+ listeners.remove(listener);
}
@Override
@@ -134,22 +160,42 @@ public class DefaultTcpipForwarder
throw new IllegalStateException("TcpipForwarder is closing");
}
- InetSocketAddress bound = doBind(local, staticIoHandlerFactory);
- int port = bound.getPort();
- SshdSocketAddress prev;
- synchronized (localToRemote) {
- prev = localToRemote.put(port, remote);
- }
+ InetSocketAddress bound;
+ int port;
+ PortForwardingEventListener listener = getPortForwardingEventListenerProxy();
+ listener.establishingExplicitTunnel(getSession(), local, remote, true);
+ try {
+ bound = doBind(local, staticIoHandlerFactory);
+ port = bound.getPort();
+ SshdSocketAddress prev;
+ synchronized (localToRemote) {
+ prev = localToRemote.put(port, remote);
+ }
- if (prev != null) {
- throw new IOException("Multiple local port forwarding bindings on port=" + port + ": current=" + remote + ", previous=" + prev);
+ if (prev != null) {
+ throw new IOException("Multiple local port forwarding bindings on port=" + port + ": current=" + remote + ", previous=" + prev);
+ }
+ } catch (IOException | RuntimeException e) {
+ try {
+ stopLocalPortForwarding(local);
+ } catch (IOException | RuntimeException err) {
+ e.addSuppressed(err);
+ }
+ listener.establishedExplicitTunnel(getSession(), local, remote, true, null, e);
+ throw e;
}
- SshdSocketAddress result = new SshdSocketAddress(bound.getHostString(), port);
- if (log.isDebugEnabled()) {
- log.debug("startLocalPortForwarding(" + local + " -> " + remote + "): " + result);
+ try {
+ SshdSocketAddress result = new SshdSocketAddress(bound.getHostString(), port);
+ if (log.isDebugEnabled()) {
+ log.debug("startLocalPortForwarding(" + local + " -> " + remote + "): " + result);
+ }
+ listener.establishedExplicitTunnel(getSession(), local, remote, true, result, null);
+ return result;
+ } catch (IOException | RuntimeException e) {
+ stopLocalPortForwarding(local);
+ throw e;
}
- return result;
}
@Override
@@ -165,7 +211,17 @@ public class DefaultTcpipForwarder
if (log.isDebugEnabled()) {
log.debug("stopLocalPortForwarding(" + local + ") unbind " + bound);
}
- acceptor.unbind(bound.toInetSocketAddress());
+
+ PortForwardingEventListener listener = getPortForwardingEventListenerProxy();
+ listener.tearingDownExplicitTunnel(getSession(), bound, true);
+ try {
+ acceptor.unbind(bound.toInetSocketAddress());
+ } catch (RuntimeException e) {
+ listener.tornDownExplicitTunnel(getSession(), bound, true, e);
+ throw e;
+ }
+
+ listener.tornDownExplicitTunnel(getSession(), bound, true, null);
} else {
if (log.isDebugEnabled()) {
log.debug("stopLocalPortForwarding(" + local + ") no mapping/acceptor for " + bound);
@@ -188,27 +244,47 @@ public class DefaultTcpipForwarder
buffer.putInt(remotePort);
long timeout = PropertyResolverUtils.getLongProperty(session, FORWARD_REQUEST_TIMEOUT, DEFAULT_FORWARD_REQUEST_TIMEOUT);
- Buffer result = session.request("tcpip-forward", buffer, timeout, TimeUnit.MILLISECONDS);
- if (result == null) {
- throw new SshException("Tcpip forwarding request denied by server");
- }
- int port = (remotePort == 0) ? result.getInt() : remote.getPort();
- // TODO: Is it really safe to only store the local address after the request ?
- SshdSocketAddress prev;
- synchronized (remoteToLocal) {
- prev = remoteToLocal.put(port, local);
- }
+ Buffer result;
+ int port;
+ PortForwardingEventListener listener = getPortForwardingEventListenerProxy();
+ listener.establishingExplicitTunnel(getSession(), local, remote, false);
+ try {
+ result = session.request("tcpip-forward", buffer, timeout, TimeUnit.MILLISECONDS);
+ if (result == null) {
+ throw new SshException("Tcpip forwarding request denied by server");
+ }
+ port = (remotePort == 0) ? result.getInt() : remote.getPort();
+ // TODO: Is it really safe to only store the local address after the request ?
+ SshdSocketAddress prev;
+ synchronized (remoteToLocal) {
+ prev = remoteToLocal.put(port, local);
+ }
- if (prev != null) {
- throw new IOException("Multiple remote port forwarding bindings on port=" + port + ": current=" + remote + ", previous=" + prev);
+ if (prev != null) {
+ throw new IOException("Multiple remote port forwarding bindings on port=" + port + ": current=" + remote + ", previous=" + prev);
+ }
+ } catch (IOException | RuntimeException e) {
+ try {
+ stopRemotePortForwarding(remote);
+ } catch (IOException | RuntimeException err) {
+ e.addSuppressed(err);
+ }
+ listener.establishedExplicitTunnel(session, local, remote, false, null, e);
+ throw e;
}
- SshdSocketAddress bound = new SshdSocketAddress(remoteHost, port);
- if (log.isDebugEnabled()) {
- log.debug("startRemotePortForwarding(" + remote + " -> " + local + "): " + bound);
- }
+ try {
+ SshdSocketAddress bound = new SshdSocketAddress(remoteHost, port);
+ if (log.isDebugEnabled()) {
+ log.debug("startRemotePortForwarding(" + remote + " -> " + local + "): " + bound);
+ }
- return bound;
+ listener.establishedExplicitTunnel(getSession(), local, remote, false, bound, null);
+ return bound;
+ } catch (IOException | RuntimeException e) {
+ stopRemotePortForwarding(remote);
+ throw e;
+ }
}
@Override
@@ -230,7 +306,17 @@ public class DefaultTcpipForwarder
buffer.putBoolean(false); // want reply
buffer.putString(remoteHost);
buffer.putInt(remote.getPort());
- session.writePacket(buffer);
+
+ PortForwardingEventListener listener = getPortForwardingEventListenerProxy();
+ listener.tearingDownExplicitTunnel(getSession(), bound, false);
+ try {
+ session.writePacket(buffer);
+ } catch (IOException | RuntimeException e) {
+ listener.tornDownExplicitTunnel(getSession(), bound, false, e);
+ throw e;
+ }
+
+ listener.tornDownExplicitTunnel(getSession(), bound, false, null);
} else {
if (log.isDebugEnabled()) {
log.debug("stopRemotePortForwarding(" + remote + ") no binding found");
@@ -252,27 +338,47 @@ public class DefaultTcpipForwarder
SocksProxy socksProxy = new SocksProxy(service);
SocksProxy prev;
- InetSocketAddress bound = doBind(local, socksProxyIoHandlerFactory);
- int port = bound.getPort();
- synchronized (dynamicLocal) {
- prev = dynamicLocal.put(port, socksProxy);
- }
+ InetSocketAddress bound;
+ int port;
+ PortForwardingEventListener listener = getPortForwardingEventListenerProxy();
+ listener.establishingDynamicTunnel(getSession(), local);
+ try {
+ bound = doBind(local, socksProxyIoHandlerFactory);
+ port = bound.getPort();
+ synchronized (dynamicLocal) {
+ prev = dynamicLocal.put(port, socksProxy);
+ }
- if (prev != null) {
- throw new IOException("Multiple dynamic port mappings found for port=" + port + ": current=" + socksProxy + ", previous=" + prev);
+ if (prev != null) {
+ throw new IOException("Multiple dynamic port mappings found for port=" + port + ": current=" + socksProxy + ", previous=" + prev);
+ }
+ } catch (IOException | RuntimeException e) {
+ try {
+ stopDynamicPortForwarding(local);
+ } catch (IOException | RuntimeException err) {
+ e.addSuppressed(err);
+ }
+ listener.establishedDynamicTunnel(getSession(), local, null, e);
+ throw e;
}
- SshdSocketAddress result = new SshdSocketAddress(bound.getHostString(), port);
- if (log.isDebugEnabled()) {
- log.debug("startDynamicPortForwarding(" + local + "): " + result);
- }
+ try {
+ SshdSocketAddress result = new SshdSocketAddress(bound.getHostString(), port);
+ if (log.isDebugEnabled()) {
+ log.debug("startDynamicPortForwarding(" + local + "): " + result);
+ }
- return result;
+ listener.establishedDynamicTunnel(getSession(), local, result, null);
+ return result;
+ } catch (IOException | RuntimeException e) {
+ stopDynamicPortForwarding(local);
+ throw e;
+ }
}
@Override
public synchronized void stopDynamicPortForwarding(SshdSocketAddress local) throws IOException {
- Closeable obj;
+ SocksProxy obj;
synchronized (dynamicLocal) {
obj = dynamicLocal.remove(local.getPort());
}
@@ -281,8 +387,18 @@ public class DefaultTcpipForwarder
if (log.isDebugEnabled()) {
log.debug("stopDynamicPortForwarding(" + local + ") unbinding");
}
- obj.close(true);
- acceptor.unbind(local.toInetSocketAddress());
+
+ PortForwardingEventListener listener = getPortForwardingEventListenerProxy();
+ listener.tearingDownDynamicTunnel(sessionInstance, local);
+ try {
+ obj.close(true);
+ acceptor.unbind(local.toInetSocketAddress());
+ } catch (RuntimeException e) {
+ listener.tornDownDynamicTunnel(getSession(), local, e);
+ throw e;
+ }
+
+ listener.tornDownDynamicTunnel(getSession(), local, null);
} else {
if (log.isDebugEnabled()) {
log.debug("stopDynamicPortForwarding(" + local + ") no binding found");
@@ -321,22 +437,41 @@ public class DefaultTcpipForwarder
throw new RuntimeSshException(e);
}
- InetSocketAddress bound = doBind(local, staticIoHandlerFactory);
- SshdSocketAddress result = new SshdSocketAddress(bound.getHostString(), bound.getPort());
- if (log.isDebugEnabled()) {
- log.debug("localPortForwardingRequested(" + local + "): " + result);
- }
+ PortForwardingEventListener listener = getPortForwardingEventListenerProxy();
+ listener.establishingExplicitTunnel(getSession(), local, null, true);
+ SshdSocketAddress result;
+ try {
+ InetSocketAddress bound = doBind(local, staticIoHandlerFactory);
+ result = new SshdSocketAddress(bound.getHostString(), bound.getPort());
+ if (log.isDebugEnabled()) {
+ log.debug("localPortForwardingRequested(" + local + "): " + result);
+ }
- boolean added;
- synchronized (localForwards) {
- // NOTE !!! it is crucial to use the bound address host name first
- added = localForwards.add(new LocalForwardingEntry(result.getHostName(), local.getHostName(), result.getPort()));
+ boolean added;
+ synchronized (localForwards) {
+ // NOTE !!! it is crucial to use the bound address host name first
+ added = localForwards.add(new LocalForwardingEntry(result.getHostName(), local.getHostName(), result.getPort()));
+ }
+
+ if (!added) {
+ throw new IOException("Failed to add local port forwarding entry for " + local + " -> " + result);
+ }
+ } catch (IOException | RuntimeException e) {
+ try {
+ localPortForwardingCancelled(local);
+ } catch (IOException | RuntimeException err) {
+ e.addSuppressed(e);
+ }
+ listener.establishedExplicitTunnel(getSession(), local, null, true, null, e);
+ throw e;
}
- if (!added) {
- throw new IOException("Failed to add local port forwarding entry for " + local + " -> " + result);
+ try {
+ listener.establishedExplicitTunnel(getSession(), local, null, true, result, null);
+ return result;
+ } catch (IOException | RuntimeException e) {
+ throw e;
}
- return result;
}
@Override
@@ -353,7 +488,17 @@ public class DefaultTcpipForwarder
if (log.isDebugEnabled()) {
log.debug("localPortForwardingCancelled(" + local + ") unbind " + entry);
}
- acceptor.unbind(entry.toInetSocketAddress());
+
+ PortForwardingEventListener listener = getPortForwardingEventListenerProxy();
+ listener.tearingDownExplicitTunnel(getSession(), entry, true);
+ try {
+ acceptor.unbind(entry.toInetSocketAddress());
+ } catch (RuntimeException e) {
+ listener.tornDownExplicitTunnel(getSession(), entry, true, e);
+ throw e;
+ }
+
+ listener.tornDownExplicitTunnel(getSession(), entry, true, null);
} else {
if (log.isDebugEnabled()) {
log.debug("localPortForwardingCancelled(" + local + ") no match/acceptor: " + entry);
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarderFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarderFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarderFactory.java
index d38bf43..e7de9c3 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarderFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarderFactory.java
@@ -18,22 +18,61 @@
*/
package org.apache.sshd.common.forward;
+import java.util.Collection;
+import java.util.Objects;
+
import org.apache.sshd.common.session.ConnectionService;
+import org.apache.sshd.common.util.EventListenerUtils;
/**
* The default {@link TcpipForwarderFactory} implementation.
*
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public class DefaultTcpipForwarderFactory implements TcpipForwarderFactory {
- public static final DefaultTcpipForwarderFactory INSTANCE = new DefaultTcpipForwarderFactory();
+public class DefaultTcpipForwarderFactory implements TcpipForwarderFactory, PortForwardingEventListenerManager {
+ public static final DefaultTcpipForwarderFactory INSTANCE = new DefaultTcpipForwarderFactory() {
+ @Override
+ public void addPortForwardingEventListener(PortForwardingEventListener listener) {
+ throw new UnsupportedOperationException("addPortForwardingListener(" + listener + ") N/A on default instance");
+ }
+
+ @Override
+ public void removePortForwardingEventListener(PortForwardingEventListener listener) {
+ throw new UnsupportedOperationException("removePortForwardingEventListener(" + listener + ") N/A on default instance");
+ }
+ };
+
+ private final Collection<PortForwardingEventListener> listeners =
+ EventListenerUtils.<PortForwardingEventListener>synchronizedListenersSet();
+ private final PortForwardingEventListener listenerProxy;
public DefaultTcpipForwarderFactory() {
- super();
+ listenerProxy = EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, getClass().getClassLoader(), listeners);
+ }
+
+ @Override
+ public PortForwardingEventListener getPortForwardingEventListenerProxy() {
+ return listenerProxy;
+ }
+
+ @Override
+ public void addPortForwardingEventListener(PortForwardingEventListener listener) {
+ listeners.add(Objects.requireNonNull(listener, "No listener to add"));
+ }
+
+ @Override
+ public void removePortForwardingEventListener(PortForwardingEventListener listener) {
+ if (listener == null) {
+ return;
+ }
+
+ listeners.remove(listener);
}
@Override
public TcpipForwarder create(ConnectionService service) {
- return new DefaultTcpipForwarder(service);
+ TcpipForwarder forwarder = new DefaultTcpipForwarder(service);
+ forwarder.addPortForwardingEventListener(getPortForwardingEventListenerProxy());
+ return forwarder;
}
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListener.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListener.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListener.java
new file mode 100644
index 0000000..d03dbd9
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListener.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.forward;
+
+import java.io.IOException;
+import java.util.EventListener;
+
+import org.apache.sshd.common.session.Session;
+import org.apache.sshd.common.util.net.SshdSocketAddress;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface PortForwardingEventListener extends EventListener {
+ /**
+ * Signals the attempt to establish a local/remote port forwarding
+ *
+ * @param session The {@link Session} through which the attempt is made
+ * @param local The local address - may be {@code null} on the receiver side
+ * @param remote The remote address - may be {@code null} on the receiver side
+ * @param localForwarding Local/remote port forwarding indicator
+ * @throws IOException If failed to handle the event - in which case
+ * the attempt is aborted and the exception re-thrown to the caller
+ */
+ void establishingExplicitTunnel(
+ Session session, SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding)
+ throws IOException;
+
+ /**
+ * Signals a successful/failed attempt to establish a local/remote port forwarding
+ *
+ * @param session The {@link Session} through which the attempt was made
+ * @param local The local address - may be {@code null} on the receiver side
+ * @param remote The remote address - may be {@code null} on the receiver side
+ * @param localForwarding Local/remote port forwarding indicator
+ * @param boundAddress The bound address - non-{@code null} if successful
+ * @param reason Reason for failure - {@code null} if successful
+ * @throws IOException If failed to handle the event - in which case
+ * the established tunnel is aborted
+ */
+ void establishedExplicitTunnel(
+ Session session, SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding,
+ SshdSocketAddress boundAddress, Throwable reason)
+ throws IOException;
+
+ /**
+ * Signals a request to tear down a local/remote port forwarding
+ *
+ * @param session The {@link Session} through which the request is made
+ * @param address The (bound) address - local/remote according to the forwarding type
+ * @param localForwarding Local/remote port forwarding indicator
+ * @throws IOException If failed to handle the event - in which case
+ * the request is aborted
+ */
+ void tearingDownExplicitTunnel(Session session, SshdSocketAddress address, boolean localForwarding) throws IOException;
+
+ /**
+ * Signals a successful/failed request to tear down a local/remote port forwarding
+ *
+ * @param session The {@link Session} through which the request is made
+ * @param address The (bound) address - local/remote according to the forwarding type
+ * @param localForwarding Local/remote port forwarding indicator
+ * @param reason Reason for failure - {@code null} if successful
+ * @throws IOException If failed to handle the event - <B>Note:</B>
+ * the exception is propagated, but the port forwarding may have
+ * been torn down - no rollback
+ */
+ void tornDownExplicitTunnel(Session session, SshdSocketAddress address, boolean localForwarding, Throwable reason) throws IOException;
+
+ /**
+ * Signals the attempt to establish a dynamic port forwarding
+ *
+ * @param session The {@link Session} through which the attempt is made
+ * @param local The local address
+ * @throws IOException If failed to handle the event - in which case
+ * the attempt is aborted and the exception re-thrown to the caller
+ */
+ void establishingDynamicTunnel(Session session, SshdSocketAddress local) throws IOException;
+
+ /**
+ * Signals a successful/failed attempt to establish a dynamic port forwarding
+ *
+ * @param session The {@link Session} through which the attempt is made
+ * @param local The local address
+ * @param boundAddress The bound address - non-{@code null} if successful
+ * @param reason Reason for failure - {@code null} if successful
+ * @throws IOException If failed to handle the event - in which case
+ * the established tunnel is aborted
+ */
+ void establishedDynamicTunnel(
+ Session session, SshdSocketAddress local, SshdSocketAddress boundAddress, Throwable reason)
+ throws IOException;
+
+ /**
+ * Signals a request to tear down a dynamic forwarding
+ *
+ * @param session The {@link Session} through which the request is made
+ * @param address The (bound) address - local/remote according to the forwarding type
+ * @throws IOException If failed to handle the event - in which case
+ * the request is aborted
+ */
+ void tearingDownDynamicTunnel(Session session, SshdSocketAddress address) throws IOException;
+
+ /**
+ * Signals a successful/failed request to tear down a dynamic port forwarding
+ *
+ * @param session The {@link Session} through which the request is made
+ * @param address The (bound) address - local/remote according to the forwarding type
+ * @param reason Reason for failure - {@code null} if successful
+ * @throws IOException If failed to handle the event - <B>Note:</B>
+ * the exception is propagated, but the port forwarding may have
+ * been torn down - no rollback
+ */
+ void tornDownDynamicTunnel(Session session, SshdSocketAddress address, Throwable reason) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListenerManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListenerManager.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListenerManager.java
new file mode 100644
index 0000000..2e02e48
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/PortForwardingEventListenerManager.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.forward;
+
+/**
+ * Marker interface for classes that allow to add/remove port forwarding
+ * listeners. <B>Note:</B> if adding/removing listeners while tunnels are
+ * being established and/or torn down there are no guarantees as to the order
+ * of the calls to the recently added/removed listener's methods in the interim.
+ * The correct order is guaranteed only as of the <U>next</U> tunnel after
+ * the listener has been added/removed.
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface PortForwardingEventListenerManager {
+ /**
+ * Add a port forwarding listener
+ *
+ * @param listener The {@link PortForwardingEventListener} to add - never {@code null}
+ */
+ void addPortForwardingEventListener(PortForwardingEventListener listener);
+
+ /**
+ * Remove a port forwarding listener
+ *
+ * @param listener The {@link PortForwardingEventListener} to remove - ignored if {@code null}
+ */
+ void removePortForwardingEventListener(PortForwardingEventListener listener);
+
+ /**
+ * @return A proxy listener representing all the currently registered listener
+ * through this manager
+ */
+ PortForwardingEventListener getPortForwardingEventListenerProxy();
+}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java
index ec0329a..07f4405 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java
@@ -24,7 +24,7 @@ import java.io.IOException;
import org.apache.sshd.common.Closeable;
import org.apache.sshd.common.util.net.SshdSocketAddress;
-public interface TcpipForwarder extends PortForwardingManager, Closeable {
+public interface TcpipForwarder extends PortForwardingManager, PortForwardingEventListenerManager, Closeable {
/**
* @param remotePort The remote port
* @return The local {@link SshdSocketAddress} that the remote port is forwarded to
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarderFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarderFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarderFactory.java
index beb64cd..70cd098 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarderFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarderFactory.java
@@ -33,5 +33,4 @@ public interface TcpipForwarderFactory {
* @return the {@link TcpipForwarder} that will listen for connections and set up forwarding
*/
TcpipForwarder create(ConnectionService service);
-
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java
index 51cb5e2..48a78f2 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java
@@ -22,7 +22,6 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -41,6 +40,7 @@ import org.apache.sshd.common.channel.ChannelListener;
import org.apache.sshd.common.channel.RequestHandler;
import org.apache.sshd.common.config.VersionProperties;
import org.apache.sshd.common.file.FileSystemFactory;
+import org.apache.sshd.common.forward.PortForwardingEventListener;
import org.apache.sshd.common.forward.TcpipForwarderFactory;
import org.apache.sshd.common.io.DefaultIoServiceFactoryFactory;
import org.apache.sshd.common.io.IoServiceFactory;
@@ -78,10 +78,15 @@ public abstract class AbstractFactoryManager extends AbstractKexFactoryManager i
protected List<RequestHandler<ConnectionService>> globalRequestHandlers;
protected SessionTimeoutListener sessionTimeoutListener;
protected ScheduledFuture<?> timeoutListenerFuture;
- protected final Collection<SessionListener> sessionListeners = new CopyOnWriteArraySet<>();
+ protected final Collection<SessionListener> sessionListeners =
+ EventListenerUtils.<SessionListener>synchronizedListenersSet();
protected final SessionListener sessionListenerProxy;
- protected final Collection<ChannelListener> channelListeners = new CopyOnWriteArraySet<>();
+ protected final Collection<ChannelListener> channelListeners =
+ EventListenerUtils.<ChannelListener>synchronizedListenersSet();
protected final ChannelListener channelListenerProxy;
+ protected final Collection<PortForwardingEventListener> tunnelListeners =
+ EventListenerUtils.<PortForwardingEventListener>synchronizedListenersSet();
+ protected final PortForwardingEventListener tunnelListenerProxy;
private final Map<String, Object> properties = new ConcurrentHashMap<>();
private final Map<AttributeKey<?>, Object> attributes = new ConcurrentHashMap<>();
@@ -92,6 +97,7 @@ public abstract class AbstractFactoryManager extends AbstractKexFactoryManager i
ClassLoader loader = getClass().getClassLoader();
sessionListenerProxy = EventListenerUtils.proxyWrapper(SessionListener.class, loader, sessionListeners);
channelListenerProxy = EventListenerUtils.proxyWrapper(ChannelListener.class, loader, channelListeners);
+ tunnelListenerProxy = EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, loader, tunnelListeners);
}
@Override
@@ -329,6 +335,40 @@ public abstract class AbstractFactoryManager extends AbstractKexFactoryManager i
return channelListenerProxy;
}
+ @Override
+ public PortForwardingEventListener getPortForwardingEventListenerProxy() {
+ return tunnelListenerProxy;
+ }
+
+ @Override
+ public void addPortForwardingEventListener(PortForwardingEventListener listener) {
+ ValidateUtils.checkNotNull(listener, "addChannelListener(%s) null instance", this);
+ // avoid race conditions on notifications while session is being closed
+ if (!isOpen()) {
+ log.warn("addPortForwardingEventListener({})[{}] ignore registration while session is closing", this, listener);
+ return;
+ }
+
+ if (this.tunnelListeners.add(listener)) {
+ log.trace("addPortForwardingEventListener({})[{}] registered", this, listener);
+ } else {
+ log.trace("addPortForwardingEventListener({})[{}] ignored duplicate", this, listener);
+ }
+ }
+
+ @Override
+ public void removePortForwardingEventListener(PortForwardingEventListener listener) {
+ if (listener == null) {
+ return;
+ }
+
+ if (this.tunnelListeners.remove(listener)) {
+ log.trace("removePortForwardingEventListener({})[{}] removed", this, listener);
+ } else {
+ log.trace("removePortForwardingEventListener({})[{}] not registered", this, listener);
+ }
+ }
+
protected void setupSessionTimeout(final AbstractSessionFactory<?, ?> sessionFactory) {
// set up the the session timeout listener and schedule it
sessionTimeoutListener = createSessionTimeoutListener();
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionServiceFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionServiceFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionServiceFactory.java
new file mode 100644
index 0000000..76e20c4
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionServiceFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.session;
+
+import java.util.Collection;
+import java.util.Objects;
+
+import org.apache.sshd.common.forward.PortForwardingEventListener;
+import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
+import org.apache.sshd.common.util.EventListenerUtils;
+import org.apache.sshd.common.util.logging.AbstractLoggingBean;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public abstract class AbstractConnectionServiceFactory extends AbstractLoggingBean implements PortForwardingEventListenerManager {
+ private final Collection<PortForwardingEventListener> listeners =
+ EventListenerUtils.<PortForwardingEventListener>synchronizedListenersSet();
+ private final PortForwardingEventListener listenerProxy;
+
+ protected AbstractConnectionServiceFactory() {
+ listenerProxy = EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, getClass().getClassLoader(), listeners);
+ }
+
+ @Override
+ public PortForwardingEventListener getPortForwardingEventListenerProxy() {
+ return listenerProxy;
+ }
+
+ @Override
+ public void addPortForwardingEventListener(PortForwardingEventListener listener) {
+ listeners.add(Objects.requireNonNull(listener, "No listener to add"));
+ }
+
+ @Override
+ public void removePortForwardingEventListener(PortForwardingEventListener listener) {
+ if (listener == null) {
+ return;
+ }
+
+ listeners.remove(listener);
+ }
+}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java
index c0b695c..a07046e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.sshd.agent.common.AgentForwardSupport;
import org.apache.sshd.common.Service;
import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
import org.apache.sshd.common.forward.TcpipForwarder;
import org.apache.sshd.server.x11.X11ForwardSupport;
@@ -31,7 +32,7 @@ import org.apache.sshd.server.x11.X11ForwardSupport;
*
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public interface ConnectionService extends Service {
+public interface ConnectionService extends Service, PortForwardingEventListenerManager {
/**
* Register a newly created channel with a new unique identifier
*
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
index 4cf9bf8..b5fe252 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
@@ -30,6 +30,7 @@ import org.apache.sshd.common.auth.MutableUserHolder;
import org.apache.sshd.common.channel.ChannelListenerManager;
import org.apache.sshd.common.cipher.CipherInformation;
import org.apache.sshd.common.compression.CompressionInformation;
+import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
import org.apache.sshd.common.future.KeyExchangeFuture;
import org.apache.sshd.common.io.IoSession;
import org.apache.sshd.common.io.IoWriteFuture;
@@ -50,6 +51,7 @@ public interface Session
SessionListenerManager,
ReservedSessionMessagesManager,
ChannelListenerManager,
+ PortForwardingEventListenerManager,
FactoryManagerHolder,
PropertyResolver,
AttributeStore,
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
index 98caf03..8e60a67 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -42,6 +43,7 @@ import org.apache.sshd.common.channel.ChannelListener;
import org.apache.sshd.common.channel.OpenChannelException;
import org.apache.sshd.common.channel.RequestHandler;
import org.apache.sshd.common.channel.Window;
+import org.apache.sshd.common.forward.PortForwardingEventListener;
import org.apache.sshd.common.forward.TcpipForwarder;
import org.apache.sshd.common.forward.TcpipForwarderFactory;
import org.apache.sshd.common.future.SshFutureListener;
@@ -49,6 +51,7 @@ import org.apache.sshd.common.io.AbstractIoWriteFuture;
import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.session.ConnectionService;
import org.apache.sshd.common.session.Session;
+import org.apache.sshd.common.util.EventListenerUtils;
import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.Int2IntFunction;
import org.apache.sshd.common.util.ValidateUtils;
@@ -63,7 +66,9 @@ import org.apache.sshd.server.x11.X11ForwardSupport;
* @param <S> Type of {@link AbstractSession} being used
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public abstract class AbstractConnectionService<S extends AbstractSession> extends AbstractInnerCloseable implements ConnectionService {
+public abstract class AbstractConnectionService<S extends AbstractSession>
+ extends AbstractInnerCloseable
+ implements ConnectionService {
/**
* Property that can be used to configure max. allowed concurrent active channels
*
@@ -94,11 +99,34 @@ public abstract class AbstractConnectionService<S extends AbstractSession> exten
private final AtomicReference<X11ForwardSupport> x11ForwardHolder = new AtomicReference<>();
private final AtomicReference<TcpipForwarder> tcpipForwarderHolder = new AtomicReference<>();
private final AtomicBoolean allowMoreSessions = new AtomicBoolean(true);
+ private final Collection<PortForwardingEventListener> listeners =
+ EventListenerUtils.<PortForwardingEventListener>synchronizedListenersSet();
+ private final PortForwardingEventListener listenerProxy;
private final S sessionInstance;
protected AbstractConnectionService(S session) {
sessionInstance = ValidateUtils.checkNotNull(session, "No session");
+ listenerProxy = EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, getClass().getClassLoader(), listeners);
+ }
+
+ @Override
+ public PortForwardingEventListener getPortForwardingEventListenerProxy() {
+ return listenerProxy;
+ }
+
+ @Override
+ public void addPortForwardingEventListener(PortForwardingEventListener listener) {
+ listeners.add(Objects.requireNonNull(listener, "No listener to add"));
+ }
+
+ @Override
+ public void removePortForwardingEventListener(PortForwardingEventListener listener) {
+ if (listener == null) {
+ return;
+ }
+
+ listeners.remove(listener);
}
public Collection<Channel> getChannels() {
@@ -140,7 +168,10 @@ public abstract class AbstractConnectionService<S extends AbstractSession> exten
ValidateUtils.checkNotNull(session.getFactoryManager(), "No factory manager");
TcpipForwarderFactory factory =
ValidateUtils.checkNotNull(manager.getTcpipForwarderFactory(), "No forwarder factory");
- return factory.create(this);
+ TcpipForwarder forwarder = factory.create(this);
+ forwarder.addPortForwardingEventListener(getPortForwardingEventListenerProxy());
+ forwarder.addPortForwardingEventListener(session.getPortForwardingEventListenerProxy());
+ return forwarder;
}
@Override
@@ -184,6 +215,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession> exten
if (log.isDebugEnabled()) {
log.debug("getAgentForwardSupport({}) created instance", session);
}
+
return agentForward;
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
index 824b9dd..4b2af70 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
@@ -34,7 +34,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -60,6 +59,7 @@ import org.apache.sshd.common.cipher.CipherInformation;
import org.apache.sshd.common.compression.Compression;
import org.apache.sshd.common.compression.CompressionInformation;
import org.apache.sshd.common.digest.Digest;
+import org.apache.sshd.common.forward.PortForwardingEventListener;
import org.apache.sshd.common.future.DefaultKeyExchangeFuture;
import org.apache.sshd.common.future.DefaultSshFuture;
import org.apache.sshd.common.future.KeyExchangeFuture;
@@ -134,15 +134,24 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
/**
* Session listeners container
*/
- protected final Collection<SessionListener> sessionListeners = new CopyOnWriteArraySet<>();
+ protected final Collection<SessionListener> sessionListeners =
+ EventListenerUtils.<SessionListener>synchronizedListenersSet();
protected final SessionListener sessionListenerProxy;
/**
* Channel events listener container
*/
- protected final Collection<ChannelListener> channelListeners = new CopyOnWriteArraySet<>();
+ protected final Collection<ChannelListener> channelListeners =
+ EventListenerUtils.<ChannelListener>synchronizedListenersSet();
protected final ChannelListener channelListenerProxy;
+ /**
+ * Port forwarding events listener container
+ */
+ protected final Collection<PortForwardingEventListener> tunnelListeners =
+ EventListenerUtils.<PortForwardingEventListener>synchronizedListenersSet();
+ protected final PortForwardingEventListener tunnelListenerProxy;
+
/*
* Key exchange support
*/
@@ -256,10 +265,12 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
ClassLoader loader = getClass().getClassLoader();
sessionListenerProxy = EventListenerUtils.proxyWrapper(SessionListener.class, loader, sessionListeners);
channelListenerProxy = EventListenerUtils.proxyWrapper(ChannelListener.class, loader, channelListeners);
+ tunnelListenerProxy = EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, loader, tunnelListeners);
// Delegate the task of further notifications to the session
addSessionListener(factoryManager.getSessionListenerProxy());
addChannelListener(factoryManager.getChannelListenerProxy());
+ addPortForwardingEventListener(factoryManager.getPortForwardingEventListenerProxy());
}
/**
@@ -2052,6 +2063,40 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
return channelListenerProxy;
}
+ @Override
+ public PortForwardingEventListener getPortForwardingEventListenerProxy() {
+ return tunnelListenerProxy;
+ }
+
+ @Override
+ public void addPortForwardingEventListener(PortForwardingEventListener listener) {
+ ValidateUtils.checkNotNull(listener, "addPortForwardingEventListener(%s) null instance", this);
+ // avoid race conditions on notifications while session is being closed
+ if (!isOpen()) {
+ log.warn("addPortForwardingEventListener({})[{}] ignore registration while session is closing", this, listener);
+ return;
+ }
+
+ if (this.tunnelListeners.add(listener)) {
+ log.trace("addPortForwardingEventListener({})[{}] registered", this, listener);
+ } else {
+ log.trace("addPortForwardingEventListener({})[{}] ignored duplicate", this, listener);
+ }
+ }
+
+ @Override
+ public void removePortForwardingEventListener(PortForwardingEventListener listener) {
+ if (listener == null) {
+ return;
+ }
+
+ if (this.tunnelListeners.remove(listener)) {
+ log.trace("removePortForwardingEventListener({})[{}] removed", this, listener);
+ } else {
+ log.trace("removePortForwardingEventListener({})[{}] not registered", this, listener);
+ }
+ }
+
/**
* Sends a session event to all currently registered session listeners
*
@@ -2261,7 +2306,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
protected abstract void checkKeys() throws IOException;
protected void receiveKexInit(Buffer buffer) throws IOException {
- Map<KexProposalOption, String> proposal = new EnumMap<KexProposalOption, String>(KexProposalOption.class);
+ Map<KexProposalOption, String> proposal = new EnumMap<>(KexProposalOption.class);
byte[] seed = receiveKexInit(buffer, proposal);
receiveKexInit(proposal, seed);
}
@@ -2336,7 +2381,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
protected Pair<TimeoutStatus, String> checkAuthenticationTimeout(long now, long authTimeoutMs) {
long authDiff = now - authTimeoutStart;
if ((!authed) && (authTimeoutMs > 0L) && (authDiff > authTimeoutMs)) {
- return new Pair<TimeoutStatus, String>(TimeoutStatus.AuthTimeout, "Session has timed out waiting for authentication after " + authTimeoutMs + " ms.");
+ return new Pair<>(TimeoutStatus.AuthTimeout, "Session has timed out waiting for authentication after " + authTimeoutMs + " ms.");
} else {
return null;
}
@@ -2356,7 +2401,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
protected Pair<TimeoutStatus, String> checkIdleTimeout(long now, long idleTimeoutMs) {
long idleDiff = now - idleTimeoutStart;
if ((idleTimeoutMs > 0L) && (idleDiff > idleTimeoutMs)) {
- return new Pair<TimeoutStatus, String>(TimeoutStatus.IdleTimeout, "User session has timed out idling after " + idleTimeoutMs + " ms.");
+ return new Pair<>(TimeoutStatus.IdleTimeout, "User session has timed out idling after " + idleTimeoutMs + " ms.");
} else {
return null;
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java b/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java
index f1003cf..51f7dd5 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java
@@ -22,18 +22,108 @@ package org.apache.sshd.common.util;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.EventListener;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
/**
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
public final class EventListenerUtils {
+ /**
+ * A special "comparator" whose only purpose is to ensure
+ * there are no same references in a listener's set
+ */
+ @SuppressWarnings("checkstyle:anoninnerlength")
+ public static final Comparator<EventListener> LISTENER_INSTANCE_COMPARATOR =
+ new Comparator<EventListener>() {
+ @Override
+ public int compare(EventListener l1, EventListener l2) {
+ if (l1 == l2) {
+ return 0;
+ } else if (l1 == null) {
+ return 1;
+ } else if (l2 == null) {
+ return -1;
+ }
+
+ Class<?> c1 = l1.getClass();
+ Class<?> c2 = l2.getClass();
+ boolean checkHashCodes = true;
+ if (Proxy.isProxyClass(c1)) {
+ if (Proxy.isProxyClass(c2)) {
+ checkHashCodes = false; // cannot call hashCode on a proxy
+ } else {
+ return 1;
+ }
+ } else if (Proxy.isProxyClass(c2)) {
+ return -1;
+ }
+
+ if (checkHashCodes) {
+ int nRes = Integer.compare(l1.hashCode(), l2.hashCode());
+ if (nRes != 0) {
+ return nRes;
+ }
+ }
+
+ int nRes = Integer.compare(System.identityHashCode(l1), System.identityHashCode(l2));
+ if (nRes != 0) {
+ return nRes;
+ }
+
+ if (c1 != c2) {
+ return c1.getName().compareTo(c2.getName());
+ }
+
+ String s1 = Objects.toString(l1.toString(), "");
+ String s2 = Objects.toString(l2.toString(), "");
+ nRes = s1.compareTo(s2);
+ if (nRes != 0) {
+ return nRes;
+ }
+ throw new UnsupportedOperationException("Ran out of options to compare instance of " + s1 + " vs. " + s2);
+ }
+ };
+
private EventListenerUtils() {
throw new UnsupportedOperationException("No instance");
}
/**
+ * @param <L> Type of {@link EventListener} contained in the set
+ * @param listeners The listeners to pre-add to the create set - ignored
+ * if (@code null}/empty
+ * @return A (synchronized) {@link Set} for containing the listeners ensuring
+ * that if same listener instance is added repeatedly only <U>one</U>
+ * instance is actually contained
+ */
+ public static <L extends EventListener> Set<L> synchronizedListenersSet(Collection<? extends L> listeners) {
+ Set<L> s = EventListenerUtils.<L>synchronizedListenersSet();
+ if (GenericUtils.size(listeners) > 0) {
+ s.addAll(listeners);
+ }
+
+ return s;
+ }
+
+ /**
+ * @param <L> Type of {@link EventListener} contained in the set
+ * @return A (synchronized) {@link Set} for containing the listeners ensuring
+ * that if same listener instance is added repeatedly only <U>one</U>
+ * instance is actually contained
+ * @see #LISTENER_INSTANCE_COMPARATOR
+ */
+ public static <L extends EventListener> Set<L> synchronizedListenersSet() {
+ return Collections.synchronizedSet(new TreeSet<L>(LISTENER_INSTANCE_COMPARATOR));
+ }
+
+ /**
* Provides proxy wrapper around an {@link Iterable} container of listener
* interface implementation. <b>Note:</b> a listener interface is one whose
* invoked methods return <u>only</u> {@code void}.
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/server/SignalListener.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/SignalListener.java b/sshd-core/src/main/java/org/apache/sshd/server/SignalListener.java
index 88da0c3..f16dbfd 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/SignalListener.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/SignalListener.java
@@ -18,10 +18,12 @@
*/
package org.apache.sshd.server;
+import java.util.EventListener;
+
/**
* Define a listener to receive signals
*/
-public interface SignalListener {
+public interface SignalListener extends EventListener {
/**
* @param signal The received {@link Signal}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/server/StandardEnvironment.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/StandardEnvironment.java b/sshd-core/src/main/java/org/apache/sshd/server/StandardEnvironment.java
index 7a5b1c5..0b4adeb 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/StandardEnvironment.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/StandardEnvironment.java
@@ -21,11 +21,10 @@ package org.apache.sshd.server;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.sshd.common.channel.PtyMode;
+import org.apache.sshd.common.util.EventListenerUtils;
import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.logging.AbstractLoggingBean;
@@ -34,7 +33,7 @@ import org.apache.sshd.common.util.logging.AbstractLoggingBean;
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
public class StandardEnvironment extends AbstractLoggingBean implements Environment {
- private final Map<Signal, Set<SignalListener>> listeners;
+ private final Map<Signal, Collection<SignalListener>> listeners;
private final Map<String, String> env;
private final Map<PtyMode, Integer> ptyModes;
@@ -83,7 +82,7 @@ public class StandardEnvironment extends AbstractLoggingBean implements Environm
public void removeSignalListener(SignalListener listener) {
ValidateUtils.checkNotNull(listener, "No listener instance");
for (Signal s : Signal.SIGNALS) {
- Set<SignalListener> ls = getSignalListeners(s, false);
+ Collection<SignalListener> ls = getSignalListeners(s, false);
if (ls != null) {
ls.remove(listener);
}
@@ -91,7 +90,7 @@ public class StandardEnvironment extends AbstractLoggingBean implements Environm
}
public void signal(Signal signal) {
- Set<SignalListener> ls = getSignalListeners(signal, false);
+ Collection<SignalListener> ls = getSignalListeners(signal, false);
if (log.isDebugEnabled()) {
log.debug("signal({}) - listeners={}", signal, ls);
}
@@ -131,17 +130,17 @@ public class StandardEnvironment extends AbstractLoggingBean implements Environm
*
* @param signal The specified {@link Signal}
* @param create If {@code true} and no current listeners are mapped then
- * creates a new {@link Set}
- * @return The {@link Set} of listeners registered for the signal - may be
+ * creates a new {@link Collection}
+ * @return The {@link Collection} of listeners registered for the signal - may be
* {@code null} in case <tt>create</tt> is {@code false}
*/
- protected Set<SignalListener> getSignalListeners(Signal signal, boolean create) {
- Set<SignalListener> ls = listeners.get(signal);
+ protected Collection<SignalListener> getSignalListeners(Signal signal, boolean create) {
+ Collection<SignalListener> ls = listeners.get(signal);
if ((ls == null) && create) {
synchronized (listeners) {
ls = listeners.get(signal);
if (ls == null) {
- ls = new CopyOnWriteArraySet<>();
+ ls = EventListenerUtils.<SignalListener>synchronizedListenersSet();
listeners.put(signal, ls);
}
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java b/sshd-core/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java
index d515046..0961e1d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/scp/ScpCommandFactory.java
@@ -19,7 +19,6 @@
package org.apache.sshd.server.scp;
import java.util.Collection;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import org.apache.sshd.common.scp.ScpFileOpener;
@@ -107,7 +106,8 @@ public class ScpCommandFactory implements ScpFileOpenerHolder, CommandFactory, C
private ScpFileOpener fileOpener;
private int sendBufferSize = ScpHelper.MIN_SEND_BUFFER_SIZE;
private int receiveBufferSize = ScpHelper.MIN_RECEIVE_BUFFER_SIZE;
- private Collection<ScpTransferEventListener> listeners = new CopyOnWriteArraySet<>();
+ private Collection<ScpTransferEventListener> listeners =
+ EventListenerUtils.<ScpTransferEventListener>synchronizedListenersSet();
private ScpTransferEventListener listenerProxy;
public ScpCommandFactory() {
@@ -258,9 +258,7 @@ public class ScpCommandFactory implements ScpFileOpenerHolder, CommandFactory, C
try {
ScpCommandFactory other = getClass().cast(super.clone());
// clone the listeners set as well
- other.listeners = this.listeners.isEmpty()
- ? new CopyOnWriteArraySet<ScpTransferEventListener>()
- : new CopyOnWriteArraySet<>(this.listeners);
+ other.listeners = EventListenerUtils.<ScpTransferEventListener>synchronizedListenersSet(this.listeners);
other.listenerProxy = EventListenerUtils.proxyWrapper(ScpTransferEventListener.class, getClass().getClassLoader(), other.listeners);
return other;
} catch (CloneNotSupportedException e) {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionService.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionService.java
index 9ed7146..b76bfa3 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionService.java
@@ -26,7 +26,9 @@ import org.apache.sshd.common.session.helpers.AbstractConnectionService;
*
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public class ServerConnectionService extends AbstractConnectionService<AbstractServerSession> implements ServerSessionHolder {
+public class ServerConnectionService
+ extends AbstractConnectionService<AbstractServerSession>
+ implements ServerSessionHolder {
protected ServerConnectionService(AbstractServerSession s) throws SshException {
super(s);
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionServiceFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionServiceFactory.java b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionServiceFactory.java
index 1607e32..7568c81 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionServiceFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerConnectionServiceFactory.java
@@ -22,14 +22,26 @@ import java.io.IOException;
import org.apache.sshd.common.Service;
import org.apache.sshd.common.ServiceFactory;
+import org.apache.sshd.common.forward.PortForwardingEventListener;
+import org.apache.sshd.common.session.AbstractConnectionServiceFactory;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.ValidateUtils;
/**
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public class ServerConnectionServiceFactory implements ServiceFactory {
- public static final ServerConnectionServiceFactory INSTANCE = new ServerConnectionServiceFactory();
+public class ServerConnectionServiceFactory extends AbstractConnectionServiceFactory implements ServiceFactory {
+ public static final ServerConnectionServiceFactory INSTANCE = new ServerConnectionServiceFactory() {
+ @Override
+ public void addPortForwardingEventListener(PortForwardingEventListener listener) {
+ throw new UnsupportedOperationException("addPortForwardingListener(" + listener + ") N/A on default instance");
+ }
+
+ @Override
+ public void removePortForwardingEventListener(PortForwardingEventListener listener) {
+ throw new UnsupportedOperationException("removePortForwardingEventListener(" + listener + ") N/A on default instance");
+ }
+ };
public ServerConnectionServiceFactory() {
super();
@@ -43,6 +55,8 @@ public class ServerConnectionServiceFactory implements ServiceFactory {
@Override
public Service create(Session session) throws IOException {
ValidateUtils.checkTrue(session instanceof AbstractServerSession, "Not a server session: %s", session);
- return new ServerConnectionService((AbstractServerSession) session);
+ ServerConnectionService service = new ServerConnectionService((AbstractServerSession) session);
+ service.addPortForwardingEventListener(getPortForwardingEventListenerProxy());
+ return service;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpEventListenerManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpEventListenerManager.java b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpEventListenerManager.java
index b6d9e28..aa2c01c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpEventListenerManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpEventListenerManager.java
@@ -20,7 +20,6 @@
package org.apache.sshd.server.subsystem.sftp;
import java.util.Collection;
-import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.sshd.common.util.EventListenerUtils;
import org.apache.sshd.common.util.ValidateUtils;
@@ -29,7 +28,8 @@ import org.apache.sshd.common.util.ValidateUtils;
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
public abstract class AbstractSftpEventListenerManager implements SftpEventListenerManager {
- private final Collection<SftpEventListener> sftpEventListeners = new CopyOnWriteArraySet<>();
+ private final Collection<SftpEventListener> sftpEventListeners =
+ EventListenerUtils.<SftpEventListener>synchronizedListenersSet();
private final SftpEventListener sftpEventListenerProxy;
protected AbstractSftpEventListenerManager() {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/549ac4e2/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
index 8fe54d2..8b53016 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
@@ -68,7 +68,6 @@ import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -239,7 +238,7 @@ public class SftpSubsystem
public static final String ACL_SUPPORTED_MASK_PROP = "sftp-acl-supported-mask";
public static final Set<Integer> DEFAULT_ACL_SUPPORTED_MASK =
Collections.unmodifiableSet(
- new HashSet<Integer>(Arrays.asList(
+ new HashSet<>(Arrays.asList(
SftpConstants.SSH_ACL_CAP_ALLOW,
SftpConstants.SSH_ACL_CAP_DENY,
SftpConstants.SSH_ACL_CAP_AUDIT,
@@ -303,7 +302,8 @@ public class SftpSubsystem
private ServerSession serverSession;
private final AtomicBoolean closed = new AtomicBoolean(false);
- private final Collection<SftpEventListener> sftpEventListeners = new CopyOnWriteArraySet<>();
+ private final Collection<SftpEventListener> sftpEventListeners =
+ EventListenerUtils.<SftpEventListener>synchronizedListenersSet();
private final SftpEventListener sftpEventListenerProxy;
/**
@@ -2297,7 +2297,7 @@ public class SftpSubsystem
Map<String, OptionalFeature> extensions = getSupportedClientExtensions();
int numExtensions = GenericUtils.size(extensions);
- List<String> extras = (numExtensions <= 0) ? Collections.<String>emptyList() : new ArrayList<String>(numExtensions);
+ List<String> extras = (numExtensions <= 0) ? Collections.<String>emptyList() : new ArrayList<>(numExtensions);
if (numExtensions > 0) {
for (Map.Entry<String, OptionalFeature> ee : extensions.entrySet()) {
String name = ee.getKey();
@@ -2354,7 +2354,7 @@ public class SftpSubsystem
}
String[] names = GenericUtils.split(override, ',');
- Set<Integer> maskValues = new HashSet<Integer>(names.length);
+ Set<Integer> maskValues = new HashSet<>(names.length);
for (String n : names) {
Integer v = ValidateUtils.checkNotNull(
AclSupportedParser.AclCapabilities.getAclCapabilityValue(n), "Unknown ACL capability: %s", n);
@@ -3004,7 +3004,7 @@ public class SftpSubsystem
}
protected void setFileAttributes(Path file, Map<String, ?> attributes, LinkOption ... options) throws IOException {
- Set<String> unsupported = new TreeSet<String>(String.CASE_INSENSITIVE_ORDER);
+ Set<String> unsupported = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
for (Map.Entry<String, ?> ae : attributes.entrySet()) {
String attribute = ae.getKey();
Object value = ae.getValue();