You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2015/08/25 06:38:15 UTC
[1/4] mina-sshd git commit: [SSHD-555] Add ChannelListener support
Repository: mina-sshd
Updated Branches:
refs/heads/master 051aec3fa -> 40195ab46
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java b/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java
index 7d6b1dc..0a3b2a0 100644
--- a/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java
@@ -36,7 +36,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.sshd.client.SessionFactory;
import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.channel.ChannelExec;
import org.apache.sshd.client.channel.ChannelShell;
@@ -45,10 +44,12 @@ import org.apache.sshd.client.future.AuthFuture;
import org.apache.sshd.client.session.ClientConnectionServiceFactory;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.client.session.ClientSessionImpl;
+import org.apache.sshd.client.session.SessionFactory;
import org.apache.sshd.common.FactoryManager;
import org.apache.sshd.common.FactoryManagerUtils;
import org.apache.sshd.common.NamedFactory;
import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.channel.TestChannelListener;
import org.apache.sshd.common.channel.WindowClosedException;
import org.apache.sshd.common.io.IoSession;
import org.apache.sshd.common.kex.KexProposalOption;
@@ -95,9 +96,10 @@ public class ServerTest extends BaseTestSupport {
sshd.setKeyPairProvider(Utils.createTestHostKeyProvider());
sshd.setShellFactory(new TestEchoShellFactory());
sshd.setPasswordAuthenticator(BogusPasswordAuthenticator.INSTANCE);
- sshd.setSessionFactory(new org.apache.sshd.server.session.SessionFactory());
sshd.start();
port = sshd.getPort();
+
+ client = SshClient.setUpDefaultClient();
}
@After
@@ -120,7 +122,6 @@ public class ServerTest extends BaseTestSupport {
final int MAX_AUTH_REQUESTS = 10;
FactoryManagerUtils.updateProperty(sshd, ServerFactoryManager.MAX_AUTH_REQUESTS, MAX_AUTH_REQUESTS);
- client = SshClient.setUpDefaultClient();
client.setServiceFactories(Arrays.asList(
new ClientUserAuthServiceOld.Factory(),
ClientConnectionServiceFactory.INSTANCE
@@ -133,8 +134,7 @@ public class ServerTest extends BaseTestSupport {
while ((res & ClientSession.CLOSED) == 0) {
nbTrials++;
s.getService(ClientUserAuthServiceOld.class)
- .auth(new org.apache.sshd.deprecated.UserAuthPassword(s, "ssh-connection", "buggy"))
- ;
+ .auth(new org.apache.sshd.deprecated.UserAuthPassword(s, "ssh-connection", "buggy"));
res = s.waitFor(ClientSession.CLOSED | ClientSession.WAIT_AUTH, TimeUnit.SECONDS.toMillis(5L));
if (res == ClientSession.TIMEOUT) {
throw new TimeoutException("Client session timeout signalled");
@@ -151,7 +151,6 @@ public class ServerTest extends BaseTestSupport {
final int MAX_AUTH_REQUESTS = 10;
FactoryManagerUtils.updateProperty(sshd, ServerFactoryManager.MAX_AUTH_REQUESTS, MAX_AUTH_REQUESTS);
- client = SshClient.setUpDefaultClient();
client.setServiceFactories(Arrays.asList(
new ClientUserAuthServiceOld.Factory(),
ClientConnectionServiceFactory.INSTANCE
@@ -164,13 +163,12 @@ public class ServerTest extends BaseTestSupport {
nbTrials++;
assertTrue(nbTrials < 100);
authFuture = s.getService(ClientUserAuthServiceOld.class)
- .auth(new org.apache.sshd.deprecated.UserAuthPassword(s, "ssh-connection", "buggy"))
- ;
+ .auth(new org.apache.sshd.deprecated.UserAuthPassword(s, "ssh-connection", "buggy"));
assertTrue("Authentication wait failed", authFuture.await(5000));
assertTrue("Authentication not done", authFuture.isDone());
assertFalse("Authentication unexpectedly successful", authFuture.isSuccess());
- }
- while (authFuture.isFailure());
+ } while (authFuture.isFailure());
+
assertNotNull("Missing auth future exception", authFuture.getException());
assertTrue("Number trials (" + nbTrials + ") below min.=" + MAX_AUTH_REQUESTS, nbTrials > MAX_AUTH_REQUESTS);
} finally {
@@ -180,12 +178,11 @@ public class ServerTest extends BaseTestSupport {
@Test
public void testAuthenticationTimeout() throws Exception {
- final int AUTH_TIMEOUT = 5000;
+ final long AUTH_TIMEOUT = TimeUnit.SECONDS.toMillis(5L);
FactoryManagerUtils.updateProperty(sshd, FactoryManager.AUTH_TIMEOUT, AUTH_TIMEOUT);
- client = SshClient.setUpDefaultClient();
client.start();
- try (ClientSession s = client.connect("test", "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
+ try (ClientSession s = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
int res = s.waitFor(ClientSession.CLOSED, 2 * AUTH_TIMEOUT);
assertEquals("Session should be closed", ClientSession.CLOSED | ClientSession.WAIT_AUTH, res);
} finally {
@@ -197,10 +194,10 @@ public class ServerTest extends BaseTestSupport {
public void testIdleTimeout() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
TestEchoShellFactory.TestEchoShell.latch = new CountDownLatch(1);
- final int IDLE_TIMEOUT = 2500;
+ final long IDLE_TIMEOUT = 2500;
FactoryManagerUtils.updateProperty(sshd, FactoryManager.IDLE_TIMEOUT, IDLE_TIMEOUT);
- sshd.getSessionFactory().addListener(new SessionListener() {
+ sshd.addSessionListener(new SessionListener() {
@Override
public void sessionCreated(Session session) {
System.out.println("Session created");
@@ -218,27 +215,29 @@ public class ServerTest extends BaseTestSupport {
}
});
- client = SshClient.setUpDefaultClient();
+ TestChannelListener channelListener = new TestChannelListener();
+ sshd.addChannelListener(channelListener);
+
client.start();
- try (ClientSession s = client.connect("test", "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
- s.addPasswordIdentity("test");
- s.auth().verify(5L, TimeUnit.SECONDS);
-
- try (ChannelShell shell = s.createShellChannel();
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ByteArrayOutputStream err = new ByteArrayOutputStream()) {
- shell.setOut(out);
- shell.setErr(err);
- shell.open().verify(9L, TimeUnit.SECONDS);
- int res = s.waitFor(ClientSession.CLOSED, 2 * IDLE_TIMEOUT);
- assertEquals("Session should be closed", ClientSession.CLOSED | ClientSession.AUTHED, res);
- }
+ try (ClientSession s = createTestClientSession();
+ ChannelShell shell = s.createShellChannel();
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+ shell.setOut(out);
+ shell.setErr(err);
+ shell.open().verify(9L, TimeUnit.SECONDS);
+
+ assertTrue("No activated server side channels", GenericUtils.size(channelListener.getActiveChannels()) > 0);
+ assertTrue("No open server side channels", GenericUtils.size(channelListener.getOpenChannels()) > 0);
+
+ int res = s.waitFor(ClientSession.CLOSED, 2 * IDLE_TIMEOUT);
+ assertEquals("Session should be closed", ClientSession.CLOSED | ClientSession.AUTHED, res);
} finally {
client.stop();
}
- assertTrue(latch.await(1, TimeUnit.SECONDS));
- assertTrue(TestEchoShellFactory.TestEchoShell.latch.await(1, TimeUnit.SECONDS));
+ assertTrue("Session latch not signalled in time", latch.await(1, TimeUnit.SECONDS));
+ assertTrue("Shell latch not signalled in time", TestEchoShellFactory.TestEchoShell.latch.await(1, TimeUnit.SECONDS));
}
/**
@@ -254,9 +253,13 @@ public class ServerTest extends BaseTestSupport {
sshd.setCommandFactory(new StreamCommand.Factory());
- FactoryManagerUtils.updateProperty(sshd, FactoryManager.IDLE_TIMEOUT, 5000);
- FactoryManagerUtils.updateProperty(sshd, FactoryManager.DISCONNECT_TIMEOUT, 2000);
- sshd.getSessionFactory().addListener(new SessionListener() {
+ final long IDLE_TIMEOUT_VALUE = TimeUnit.SECONDS.toMillis(5L);
+ FactoryManagerUtils.updateProperty(sshd, FactoryManager.IDLE_TIMEOUT, IDLE_TIMEOUT_VALUE);
+
+ final long DISCONNECT_TIMEOUT_VALUE = TimeUnit.SECONDS.toMillis(2L);
+ FactoryManagerUtils.updateProperty(sshd, FactoryManager.DISCONNECT_TIMEOUT, DISCONNECT_TIMEOUT_VALUE);
+
+ sshd.addSessionListener(new SessionListener() {
@Override
public void sessionCreated(Session session) {
System.out.println("Session created");
@@ -274,36 +277,43 @@ public class ServerTest extends BaseTestSupport {
}
});
- client = SshClient.setUpDefaultClient();
+ TestChannelListener channelListener = new TestChannelListener();
+ sshd.addChannelListener(channelListener);
+
client.start();
- try (ClientSession s = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
- s.addPasswordIdentity(getCurrentTestName());
- s.auth().verify(5L, TimeUnit.SECONDS);
+ try (ClientSession s = createTestClientSession();
+ ChannelExec shell = s.createExecChannel("normal");
+ // Create a pipe that will block reading when the buffer is full
+ PipedInputStream pis = new PipedInputStream();
+ PipedOutputStream pos = new PipedOutputStream(pis)) {
- try (ChannelExec shell = s.createExecChannel("normal");
- // Create a pipe that will block reading when the buffer is full
- PipedInputStream pis = new PipedInputStream();
- PipedOutputStream pos = new PipedOutputStream(pis)) {
+ shell.setOut(pos);
+ shell.open().verify(5L, TimeUnit.SECONDS);
- shell.setOut(pos);
- shell.open().verify(5L, TimeUnit.SECONDS);
+ assertTrue("No activated server side channels", GenericUtils.size(channelListener.getActiveChannels()) > 0);
+ assertTrue("No open server side channels", GenericUtils.size(channelListener.getOpenChannels()) > 0);
- try (AbstractSession serverSession = sshd.getActiveSessions().iterator().next();
- Channel channel = serverSession.getService(AbstractConnectionService.class).getChannels().iterator().next()) {
+ try (AbstractSession serverSession = sshd.getActiveSessions().iterator().next();
+ Channel channel = serverSession.getService(AbstractConnectionService.class).getChannels().iterator().next()) {
- while (channel.getRemoteWindow().getSize() > 0) {
- Thread.sleep(1);
- }
+ final long MAX_TIMEOUT_VALUE = IDLE_TIMEOUT_VALUE + DISCONNECT_TIMEOUT_VALUE + TimeUnit.SECONDS.toMillis(3L);
+ for (long totalNanoTime = 0L; channel.getRemoteWindow().getSize() > 0; ) {
+ long nanoStart = System.nanoTime();
+ Thread.sleep(1L);
+ long nanoEnd = System.nanoTime(), nanoDuration = nanoEnd - nanoStart;
- LoggerFactory.getLogger(getClass()).info("Waiting for session idle timeouts");
-
- long t0 = System.currentTimeMillis();
- latch.await(1, TimeUnit.MINUTES);
- long t1 = System.currentTimeMillis(), diff = t1 - t0;
- assertTrue("Wait time too low: " + diff, diff > 7000);
- assertTrue("Wait time too high: " + diff, diff < 10000);
+ totalNanoTime += nanoDuration;
+ assertTrue("Waiting for too long on remote window size to reach zero", totalNanoTime < TimeUnit.MILLISECONDS.toNanos(MAX_TIMEOUT_VALUE));
}
+
+ LoggerFactory.getLogger(getClass()).info("Waiting for session idle timeouts");
+
+ long t0 = System.currentTimeMillis();
+ latch.await(1, TimeUnit.MINUTES);
+ long t1 = System.currentTimeMillis(), diff = t1 - t0;
+ assertTrue("Wait time too low: " + diff, diff > IDLE_TIMEOUT_VALUE);
+ assertTrue("Wait time too high: " + diff, diff < MAX_TIMEOUT_VALUE);
}
} finally {
client.stop();
@@ -311,11 +321,11 @@ public class ServerTest extends BaseTestSupport {
}
@Test
- public void testLanguage() throws Exception {
- client = SshClient.setUpDefaultClient();
- client.setSessionFactory(new SessionFactory() {
+ public void testLanguageNegotiation() throws Exception {
+ client.setSessionFactory(new SessionFactory(client) {
@Override
- protected AbstractSession createSession(IoSession ioSession) throws Exception {
+ @SuppressWarnings("synthetic-access")
+ protected ClientSessionImpl createSession(IoSession ioSession) throws Exception {
return new ClientSessionImpl(client, ioSession) {
@Override
protected Map<KexProposalOption, String> createProposal(String hostKeyTypes) {
@@ -327,9 +337,10 @@ public class ServerTest extends BaseTestSupport {
};
}
});
+
client.start();
- try (ClientSession s = client.connect("test", "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
- s.close(false);
+ try (ClientSession s = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
+ // do nothing
} finally {
client.stop();
}
@@ -338,7 +349,7 @@ public class ServerTest extends BaseTestSupport {
@Test
public void testKexCompletedEvent() throws Exception {
final AtomicInteger serverEventCount = new AtomicInteger(0);
- sshd.getSessionFactory().addListener(new SessionListener() {
+ sshd.addSessionListener(new SessionListener() {
@Override
public void sessionCreated(Session session) {
// ignored
@@ -357,10 +368,9 @@ public class ServerTest extends BaseTestSupport {
}
});
- client = SshClient.setUpDefaultClient();
client.start();
final AtomicInteger clientEventCount = new AtomicInteger(0);
- client.getSessionFactory().addListener(new SessionListener() {
+ client.addSessionListener(new SessionListener() {
@Override
public void sessionCreated(Session session) {
// ignored
@@ -379,9 +389,7 @@ public class ServerTest extends BaseTestSupport {
}
});
- try (ClientSession s = client.connect("test", "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
- s.addPasswordIdentity("test");
- s.auth().verify(5L, TimeUnit.SECONDS);
+ try (ClientSession s = createTestClientSession()) {
assertEquals("Mismatched client events count", 1, clientEventCount.get());
assertEquals("Mismatched server events count", 1, serverEventCount.get());
s.close(false);
@@ -391,11 +399,10 @@ public class ServerTest extends BaseTestSupport {
}
@Test // see https://issues.apache.org/jira/browse/SSHD-456
- public void testServerStillListensIfSessionListenerThrowsException() throws InterruptedException {
+ public void testServerStillListensIfSessionListenerThrowsException() throws Exception {
final Map<String, SocketAddress> eventsMap = new TreeMap<String, SocketAddress>(String.CASE_INSENSITIVE_ORDER);
- sshd.getSessionFactory().addListener(new SessionListener() {
- private final Logger log = LoggerFactory.getLogger(getClass());
-
+ final Logger log = LoggerFactory.getLogger(getClass());
+ sshd.addSessionListener(new SessionListener() {
@Override
public void sessionCreated(Session session) {
throwException("SessionCreated", session);
@@ -426,7 +433,6 @@ public class ServerTest extends BaseTestSupport {
}
});
- client = SshClient.setUpDefaultClient();
client.start();
int curCount = 0;
@@ -438,19 +444,18 @@ public class ServerTest extends BaseTestSupport {
}
try {
- try (ClientSession s = client.connect("test", "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
- s.addPasswordIdentity("test");
- s.auth().verify(5L, TimeUnit.SECONDS);
+ try (ClientSession s = createTestClientSession()) {
+ log.info("Retry #" + retryCount + " successful");
}
synchronized (eventsMap) {
- assertTrue("Unexpected premature success: " + eventsMap, eventsMap.size() >= 3);
+ assertTrue("Unexpected premature success at retry # " + retryCount + ": " + eventsMap, eventsMap.size() >= 3);
}
} catch (IOException e) {
// expected - ignored
synchronized (eventsMap) {
int nextCount = eventsMap.size();
- assertTrue("No session event generated", nextCount > curCount);
+ assertTrue("No session event generated at retry #" + retryCount, nextCount > curCount);
}
}
}
@@ -506,6 +511,8 @@ public class ServerTest extends BaseTestSupport {
}
});
+ TestChannelListener channelListener = new TestChannelListener();
+ sshd.addChannelListener(channelListener);
@SuppressWarnings("synthetic-access")
Map<String, String> expected = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER) {
@@ -518,37 +525,54 @@ public class ServerTest extends BaseTestSupport {
}
};
- client = SshClient.setUpDefaultClient();
client.start();
- try (ClientSession s = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
- s.addPasswordIdentity(getCurrentTestName());
- s.auth().verify(5L, TimeUnit.SECONDS);
+ try (ClientSession s = createTestClientSession();
+ ChannelExec shell = s.createExecChannel(getCurrentTestName())) {
+ for (Map.Entry<String, String> ee : expected.entrySet()) {
+ shell.setEnv(ee.getKey(), ee.getValue());
+ }
- try (ChannelExec shell = s.createExecChannel(getCurrentTestName())) {
- for (Map.Entry<String, String> ee : expected.entrySet()) {
- shell.setEnv(ee.getKey(), ee.getValue());
- }
- shell.open().verify(5L, TimeUnit.SECONDS);
- shell.waitFor(ClientChannel.CLOSED, TimeUnit.SECONDS.toMillis(17L));
+ shell.open().verify(5L, TimeUnit.SECONDS);
- Integer status = shell.getExitStatus();
- assertNotNull("No exit status", status);
- assertEquals("Bad exit status", 0, status.intValue());
- }
+ assertTrue("No activated server side channels", GenericUtils.size(channelListener.getActiveChannels()) > 0);
+ assertTrue("No open server side channels", GenericUtils.size(channelListener.getOpenChannels()) > 0);
- Environment cmdEnv = envHolder.get();
- assertNotNull("No environment set", cmdEnv);
+ shell.waitFor(ClientChannel.CLOSED, TimeUnit.SECONDS.toMillis(17L));
- Map<String, String> vars = cmdEnv.getEnv();
- assertTrue("Mismatched vars count", GenericUtils.size(vars) >= GenericUtils.size(expected));
- for (Map.Entry<String, String> ee : expected.entrySet()) {
- String key = ee.getKey(), expValue = ee.getValue(), actValue = vars.get(key);
- assertEquals("Mismatched value for " + key, expValue, actValue);
- }
+ Integer status = shell.getExitStatus();
+ assertNotNull("No exit status", status);
+ assertEquals("Bad exit status", 0, status.intValue());
} finally {
client.stop();
}
+ assertTrue("Still activated server side channels", GenericUtils.isEmpty(channelListener.getActiveChannels()));
+
+ Environment cmdEnv = envHolder.get();
+ assertNotNull("No environment set", cmdEnv);
+
+ Map<String, String> vars = cmdEnv.getEnv();
+ assertTrue("Mismatched vars count", GenericUtils.size(vars) >= GenericUtils.size(expected));
+ for (Map.Entry<String, String> ee : expected.entrySet()) {
+ String key = ee.getKey(), expValue = ee.getValue(), actValue = vars.get(key);
+ assertEquals("Mismatched value for " + key, expValue, actValue);
+ }
+ }
+
+ private ClientSession createTestClientSession() throws Exception {
+ ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession();
+ try {
+ session.addPasswordIdentity(getCurrentTestName());
+ session.auth().verify(5L, TimeUnit.SECONDS);
+
+ ClientSession returnValue = session;
+ session = null; // avoid 'finally' close
+ return returnValue;
+ } finally {
+ if (session != null) {
+ session.close();
+ }
+ }
}
public static class TestEchoShellFactory extends EchoShellFactory {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-git/src/test/java/org/apache/sshd/git/util/Utils.java
----------------------------------------------------------------------
diff --git a/sshd-git/src/test/java/org/apache/sshd/git/util/Utils.java b/sshd-git/src/test/java/org/apache/sshd/git/util/Utils.java
index d220326..37948b8 100644
--- a/sshd-git/src/test/java/org/apache/sshd/git/util/Utils.java
+++ b/sshd-git/src/test/java/org/apache/sshd/git/util/Utils.java
@@ -52,7 +52,7 @@ public class Utils {
URL url = Utils.class.getClassLoader().getResource(resource);
try {
return new File(url.toURI());
- } catch(URISyntaxException e) {
+ } catch (URISyntaxException e) {
return new File(url.getPath());
}
}
[3/4] mina-sshd git commit: [SSHD-555] Add ChannelListener support
Posted by lg...@apache.org.
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/session/SessionTimeoutListener.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/SessionTimeoutListener.java b/sshd-core/src/main/java/org/apache/sshd/common/session/SessionTimeoutListener.java
index bdde515..2d74ecd 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/SessionTimeoutListener.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/SessionTimeoutListener.java
@@ -32,10 +32,17 @@ import org.apache.sshd.common.util.logging.AbstractLoggingBean;
public class SessionTimeoutListener extends AbstractLoggingBean implements SessionListener, Runnable {
private final Set<AbstractSession> sessions = new CopyOnWriteArraySet<AbstractSession>();
+ public SessionTimeoutListener() {
+ super();
+ }
+
@Override
public void sessionCreated(Session session) {
- if (session instanceof AbstractSession && (session.getAuthTimeout() > 0 || session.getIdleTimeout() > 0)) {
+ if ((session instanceof AbstractSession) && ((session.getAuthTimeout() > 0L) || (session.getIdleTimeout() > 0L))) {
sessions.add((AbstractSession) session);
+ log.debug("sessionCreated({}) tracking", session);
+ } else {
+ log.trace("sessionCreated({}) not tracked", session);
}
}
@@ -46,7 +53,11 @@ public class SessionTimeoutListener extends AbstractLoggingBean implements Sessi
@Override
public void sessionClosed(Session s) {
- sessions.remove(s);
+ if (sessions.remove(s)) {
+ log.debug("sessionClosed({}) un-tracked", s);
+ } else {
+ log.trace("sessionClosed({}) not tracked", s);
+ }
}
@Override
@@ -55,7 +66,7 @@ public class SessionTimeoutListener extends AbstractLoggingBean implements Sessi
try {
session.checkForTimeouts();
} catch (Exception e) {
- log.warn("An error occurred while checking session=" + session + " timeouts", e);
+ log.warn(e.getClass().getSimpleName() + " while checking session=" + session + " timeouts: " + e.getMessage(), e);
}
}
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java b/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java
index 6f2304f..f1003cf 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/EventListenerUtils.java
@@ -103,20 +103,27 @@ public final class EventListenerUtils {
* @see #proxyWrapper(Class, ClassLoader, Iterable)
*/
public static <T extends EventListener> T proxyWrapper(Class<T> listenerType, ClassLoader loader, final Iterable<? extends T> listeners) {
- if ((listenerType == null) || (!listenerType.isInterface())) {
- throw new IllegalArgumentException("Target proxy is not an interface");
- }
-
- if (listeners == null) {
- throw new IllegalArgumentException("No listeners container provided");
- }
+ ValidateUtils.checkNotNull(listenerType, "No listener type specified");
+ ValidateUtils.checkTrue(listenerType.isInterface(), "Target proxy is not an interface: %s", listenerType.getSimpleName());
+ ValidateUtils.checkNotNull(listeners, "No listeners container provided");
Object wrapper = Proxy.newProxyInstance(loader, new Class<?>[]{listenerType}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ Throwable err = null;
for (T l : listeners) {
- method.invoke(l, args);
+ try {
+ method.invoke(l, args);
+ } catch (Throwable t) {
+ Throwable e = GenericUtils.peelException(t);
+ err = GenericUtils.accumulateException(err, e);
+ }
+ }
+
+ if (err != null) {
+ throw err;
}
+
return null; // we assume always void return value...
}
});
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/util/GenericUtils.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/GenericUtils.java b/sshd-core/src/main/java/org/apache/sshd/common/util/GenericUtils.java
index 2563fbb..87631d7 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/GenericUtils.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/GenericUtils.java
@@ -19,6 +19,8 @@
package org.apache.sshd.common.util;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.UndeclaredThrowableException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -32,6 +34,9 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import javax.management.MBeanException;
+import javax.management.ReflectionException;
+
/**
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
@@ -333,6 +338,47 @@ public final class GenericUtils {
}
/**
+ * Attempts to get to the "effective" exception being thrown,
+ * by taking care of some known exceptions that wrap the original thrown
+ * one.
+ * @param t The original {@link Throwable} - ignored if {@code null}
+ * @return The effective exception - same as input if not a wrapper
+ */
+ public static Throwable peelException(Throwable t) {
+ if (t == null) {
+ return t;
+ } else if (t instanceof UndeclaredThrowableException) {
+ Throwable wrapped = ((UndeclaredThrowableException) t).getUndeclaredThrowable();
+ // according to the Javadoc it may be null, in which case 'getCause'
+ // might contain the information we need
+ if (wrapped != null) {
+ return peelException(wrapped);
+ }
+
+ wrapped = t.getCause();
+ if (wrapped != t) { // make sure it is a real cause
+ return peelException(wrapped);
+ }
+ } else if (t instanceof InvocationTargetException) {
+ Throwable target = ((InvocationTargetException) t).getTargetException();
+ if (target != null) {
+ return peelException(target);
+ }
+ } else if (t instanceof ReflectionException) {
+ Throwable target = ((ReflectionException) t).getTargetException();
+ if (target != null) {
+ return peelException(target);
+ }
+ } else if (t instanceof MBeanException) {
+ Throwable target = ((MBeanException) t).getTargetException();
+ if (target != null) {
+ return peelException(target);
+ }
+ }
+
+ return t; // no special handling required or available
+ }
+ /**
* @param t The original {@link Throwable} - ignored if {@code null}
* @return If {@link Throwable#getCause()} is non-{@code null} then
* the cause, otherwise the original exception - {@code null} if
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
index 469d8a6..a18f741 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
@@ -158,5 +158,4 @@ public interface ServerFactoryManager extends FactoryManager {
* or {@code null} if subsystems are not supported on this server
*/
List<NamedFactory<Command>> getSubsystemFactories();
-
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/server/SshServer.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/SshServer.java b/sshd-core/src/main/java/org/apache/sshd/server/SshServer.java
index a2bfb2c..6baf31b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/SshServer.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/SshServer.java
@@ -276,7 +276,6 @@ public class SshServer extends AbstractFactoryManager implements ServerFactoryMa
if (sessionFactory == null) {
sessionFactory = createSessionFactory();
}
- sessionFactory.setServer(this);
acceptor = createAcceptor();
setupSessionTimeout(sessionFactory);
@@ -377,7 +376,7 @@ public class SshServer extends AbstractFactoryManager implements ServerFactoryMa
}
protected SessionFactory createSessionFactory() {
- return new SessionFactory();
+ return new SessionFactory(this);
}
@Override
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/server/auth/CachingPublicKeyAuthenticator.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/auth/CachingPublicKeyAuthenticator.java b/sshd-core/src/main/java/org/apache/sshd/server/auth/CachingPublicKeyAuthenticator.java
index 6af7c6f..998813c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/auth/CachingPublicKeyAuthenticator.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/auth/CachingPublicKeyAuthenticator.java
@@ -47,7 +47,7 @@ public class CachingPublicKeyAuthenticator implements PublickeyAuthenticator, Se
if (map == null) {
map = new ConcurrentHashMap<>();
cache.put(session, map);
- session.addListener(this);
+ session.addSessionListener(this);
}
Boolean result = map.get(key);
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
index 3b7be62..8161e67 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
@@ -24,6 +24,8 @@ import org.apache.sshd.client.future.DefaultOpenFuture;
import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.channel.AbstractChannel;
+import org.apache.sshd.common.channel.ChannelListener;
+import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.buffer.Buffer;
/**
@@ -62,8 +64,22 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
}
protected OpenFuture doInit(Buffer buffer) {
+ ChannelListener listener = getChannelListenerProxy();
OpenFuture f = new DefaultOpenFuture(this);
- f.setOpened();
+ try {
+ listener.channelOpenSuccess(this);
+ f.setOpened();
+ } catch (RuntimeException t) {
+ Throwable e = GenericUtils.peelException(t);
+ try {
+ listener.channelOpenFailure(this, e);
+ } catch (Throwable ignored) {
+ log.warn("doInit({}) failed ({}) to inform listener of open failure={}: {}",
+ this, ignored.getClass().getSimpleName(), e.getClass().getSimpleName(), ignored.getMessage());
+ }
+ f.setException(e);
+ }
+
return f;
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
index 520dcdd..a4a020d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
@@ -447,7 +447,7 @@ public class ChannelSession extends AbstractServerChannel {
return false;
}
- ServerFactoryManager manager = ((ServerSession) session).getFactoryManager();
+ ServerFactoryManager manager = ((ServerSession) getSession()).getFactoryManager();
Factory<Command> factory = manager.getShellFactory();
if (factory == null) {
log.debug("handleShell - no shell factory");
@@ -472,7 +472,7 @@ public class ChannelSession extends AbstractServerChannel {
}
String commandLine = buffer.getString();
- ServerFactoryManager manager = ((ServerSession) session).getFactoryManager();
+ ServerFactoryManager manager = ((ServerSession) getSession()).getFactoryManager();
CommandFactory factory = manager.getCommandFactory();
if (factory == null) {
log.warn("No command factory for command: {}", commandLine);
@@ -498,7 +498,7 @@ public class ChannelSession extends AbstractServerChannel {
protected boolean handleSubsystem(Buffer buffer) throws IOException {
String subsystem = buffer.getString();
- ServerFactoryManager manager = ((ServerSession) session).getFactoryManager();
+ ServerFactoryManager manager = ((ServerSession) getSession()).getFactoryManager();
List<NamedFactory<Command>> factories = manager.getSubsystemFactories();
if (GenericUtils.isEmpty(factories)) {
log.warn("No factories for subsystem: {}", subsystem);
@@ -536,14 +536,15 @@ public class ChannelSession extends AbstractServerChannel {
addEnvVariable(Environment.ENV_USER, session.getUsername());
// If the shell wants to be aware of the session, let's do that
if (command instanceof SessionAware) {
- ((SessionAware) command).setSession((ServerSession) session);
+ ((SessionAware) command).setSession((ServerSession) getSession());
}
if (command instanceof ChannelSessionAware) {
((ChannelSessionAware) command).setChannelSession(this);
}
// If the shell wants to be aware of the file system, let's do that too
if (command instanceof FileSystemAware) {
- FileSystemFactory factory = ((ServerSession) session).getFactoryManager().getFileSystemFactory();
+ ServerFactoryManager manager = ((ServerSession) getSession()).getFactoryManager();
+ FileSystemFactory factory = manager.getFileSystemFactory();
((FileSystemAware) command).setFileSystem(factory.createFileSystem(session));
}
// If the shell wants to use non-blocking io
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
index 4fa0670..29ba7f5 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
@@ -31,6 +31,7 @@ import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.SshdSocketAddress;
import org.apache.sshd.common.channel.Channel;
import org.apache.sshd.common.channel.ChannelFactory;
+import org.apache.sshd.common.channel.ChannelListener;
import org.apache.sshd.common.channel.ChannelOutputStream;
import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.future.SshFutureListener;
@@ -40,6 +41,7 @@ import org.apache.sshd.common.io.IoHandler;
import org.apache.sshd.common.io.IoSession;
import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.session.Session;
+import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.Readable;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
@@ -112,7 +114,7 @@ public class TcpipServerChannel extends AbstractServerChannel {
int originatorPort = buffer.getInt();
if (log.isDebugEnabled()) {
log.debug("Receiving request for direct tcpip: hostToConnect={}, portToConnect={}, originatorIpAddress={}, originatorPort={}",
- hostToConnect, portToConnect, originatorIpAddress, originatorPort);
+ hostToConnect, portToConnect, originatorIpAddress, originatorPort);
}
final SshdSocketAddress address;
@@ -172,31 +174,71 @@ public class TcpipServerChannel extends AbstractServerChannel {
close(true);
}
};
+
connector = manager.getIoServiceFactory().createConnector(handler);
IoConnectFuture future = connector.connect(address.toInetSocketAddress());
future.addListener(new SshFutureListener<IoConnectFuture>() {
- @SuppressWarnings("synthetic-access")
@Override
public void operationComplete(IoConnectFuture future) {
- if (future.isConnected()) {
- ioSession = future.getSession();
- f.setOpened();
- } else if (future.getException() != null) {
- closeImmediately0();
- if (future.getException() instanceof ConnectException) {
- f.setException(new OpenChannelException(
- SshConstants.SSH_OPEN_CONNECT_FAILED,
- future.getException().getMessage(),
- future.getException()));
- } else {
- f.setException(future.getException());
- }
- }
+ handleChannelConnectResult(f, future);
}
});
return f;
}
+ protected void handleChannelConnectResult(OpenFuture f, IoConnectFuture future) {
+ ChannelListener listener = getChannelListenerProxy();
+ try {
+ if (future.isConnected()) {
+ handleChannelOpenSuccess(f, future.getSession());
+ return;
+ }
+
+ Throwable problem = GenericUtils.peelException(future.getException());
+ if (problem != null) {
+ handleChannelOpenFailure(f, problem);
+ }
+ } catch (RuntimeException t) {
+ Throwable e = GenericUtils.peelException(t);
+ try {
+ listener.channelOpenFailure(this, e);
+ } catch (Throwable ignored) {
+ log.warn("handleChannelConnectResult({})[exception] failed ({}) to inform listener of open failure={}: {}",
+ this, ignored.getClass().getSimpleName(), e.getClass().getSimpleName(), ignored.getMessage());
+ }
+ f.setException(e);
+ }
+ }
+
+ protected void handleChannelOpenSuccess(OpenFuture f, IoSession session) {
+ ioSession = session;
+
+ ChannelListener listener = getChannelListenerProxy();
+ listener.channelOpenSuccess(this);
+ f.setOpened();
+ }
+
+ protected void handleChannelOpenFailure(OpenFuture f, Throwable problem) {
+ ChannelListener listener = getChannelListenerProxy();
+ try {
+ listener.channelOpenFailure(this, problem);
+ } catch (Throwable ignored) {
+ log.warn("handleChannelOpenFailure({}) failed ({}) to inform listener of open failure={}: {}",
+ this, ignored.getClass().getSimpleName(), problem.getClass().getSimpleName(), ignored.getMessage());
+ }
+
+ closeImmediately0();
+
+ if (problem instanceof ConnectException) {
+ f.setException(new OpenChannelException(
+ SshConstants.SSH_OPEN_CONNECT_FAILED,
+ problem.getMessage(),
+ problem));
+ } else {
+ f.setException(problem);
+ }
+
+ }
private void closeImmediately0() {
// We need to close the channel immediately to remove it from the
// server session's channel table and *not* send a packet to the
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/server/session/SessionFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/session/SessionFactory.java b/sshd-core/src/main/java/org/apache/sshd/server/session/SessionFactory.java
index b358e8b..835d91f 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/session/SessionFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/session/SessionFactory.java
@@ -19,7 +19,6 @@
package org.apache.sshd.server.session;
import org.apache.sshd.common.io.IoSession;
-import org.apache.sshd.common.session.AbstractSession;
import org.apache.sshd.common.session.AbstractSessionFactory;
import org.apache.sshd.server.ServerFactoryManager;
@@ -30,29 +29,18 @@ import org.apache.sshd.server.ServerFactoryManager;
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
* @see org.apache.sshd.server.SshServer#setSessionFactory(SessionFactory)
*/
-public class SessionFactory extends AbstractSessionFactory {
+public class SessionFactory extends AbstractSessionFactory<ServerFactoryManager, ServerSessionImpl> {
- private ServerFactoryManager manager;
-
- public SessionFactory() {
- super();
- }
-
- public SessionFactory(ServerFactoryManager manager) {
- this.manager = manager;
+ public SessionFactory(ServerFactoryManager server) {
+ super(server);
}
- public ServerFactoryManager getServer() {
- return manager;
- }
-
- public void setServer(ServerFactoryManager server) {
- this.manager = server;
+ public final ServerFactoryManager getServer() {
+ return getFactoryManager();
}
@Override
- protected AbstractSession doCreateSession(IoSession ioSession) throws Exception {
+ protected ServerSessionImpl doCreateSession(IoSession ioSession) throws Exception {
return new ServerSessionImpl(getServer(), ioSession);
}
-
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/test/java/org/apache/sshd/AuthenticationTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/AuthenticationTest.java b/sshd-core/src/test/java/org/apache/sshd/AuthenticationTest.java
index 91879a0..25a2de8 100644
--- a/sshd-core/src/test/java/org/apache/sshd/AuthenticationTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/AuthenticationTest.java
@@ -30,7 +30,6 @@ import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.common.FactoryManagerUtils;
import org.apache.sshd.common.io.IoSession;
import org.apache.sshd.common.keyprovider.KeyPairProvider;
-import org.apache.sshd.common.session.AbstractSession;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.deprecated.ClientUserAuthServiceOld;
import org.apache.sshd.deprecated.UserAuthKeyboardInteractive;
@@ -70,9 +69,9 @@ public class AuthenticationTest extends BaseTestSupport {
sshd.setPublickeyAuthenticator(AcceptAllPublickeyAuthenticator.INSTANCE);
FactoryManagerUtils.updateProperty(sshd, ServerFactoryManager.WELCOME_BANNER, WELCOME);
FactoryManagerUtils.updateProperty(sshd, ServerFactoryManager.AUTH_METHODS, "publickey,password publickey,keyboard-interactive");
- sshd.setSessionFactory(new SessionFactory() {
+ sshd.setSessionFactory(new SessionFactory(sshd) {
@Override
- protected AbstractSession doCreateSession(IoSession ioSession) throws Exception {
+ protected ServerSessionImpl doCreateSession(IoSession ioSession) throws Exception {
return new TestSession(getServer(), ioSession);
}
});
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java b/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java
index fd2590c..18203fb 100644
--- a/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java
@@ -52,9 +52,9 @@ public class KeepAliveTest extends BaseTestSupport {
private SshServer sshd;
private int port;
- private static final int HEARTBEAT = 2000;
- private static final int TIMEOUT = 4000;
- private static final int WAIT = 8000;
+ private static final long HEARTBEAT = TimeUnit.SECONDS.toMillis(2L);
+ private static final long TIMEOUT = 2L * HEARTBEAT;
+ private static final long WAIT = 2L * TIMEOUT;
@Before
public void setUp() throws Exception {
@@ -76,27 +76,7 @@ public class KeepAliveTest extends BaseTestSupport {
}
@Test
- public void testClient() throws Exception {
- SshClient client = SshClient.setUpDefaultClient();
- client.start();
-
- try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
- session.addPasswordIdentity(getCurrentTestName());
- session.auth().verify(5L, TimeUnit.SECONDS);
-
- try (ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL)) {
- int state = channel.waitFor(ClientChannel.CLOSED, WAIT);
- assertEquals("Wrong channel state", ClientChannel.CLOSED | ClientChannel.EOF, state);
-
- channel.close(false);
- }
- } finally {
- client.stop();
- }
- }
-
- @Test
- public void testClientNew() throws Exception {
+ public void testIdleClient() throws Exception {
SshClient client = SshClient.setUpDefaultClient();
client.start();
@@ -137,27 +117,6 @@ public class KeepAliveTest extends BaseTestSupport {
}
@Test
- public void testClientWithHeartBeatNew() throws Exception {
- SshClient client = SshClient.setUpDefaultClient();
- FactoryManagerUtils.updateProperty(client, ClientFactoryManager.HEARTBEAT_INTERVAL, HEARTBEAT);
- client.start();
-
- try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
- session.addPasswordIdentity(getCurrentTestName());
- session.auth().verify(5L, TimeUnit.SECONDS);
-
- try (ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL)) {
- int state = channel.waitFor(ClientChannel.CLOSED, WAIT);
- assertEquals("Wrong channel state", ClientChannel.TIMEOUT, state);
-
- channel.close(false);
- }
- } finally {
- client.stop();
- }
- }
-
- @Test
public void testShellClosedOnClientTimeout() throws Exception {
TestEchoShellFactory.TestEchoShell.latch = new CountDownLatch(1);
@@ -187,36 +146,6 @@ public class KeepAliveTest extends BaseTestSupport {
}
}
- @Test
- public void testShellClosedOnClientTimeoutNew() throws Exception {
- TestEchoShellFactory.TestEchoShell.latch = new CountDownLatch(1);
-
- SshClient client = SshClient.setUpDefaultClient();
- client.start();
-
- try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
- session.addPasswordIdentity(getCurrentTestName());
- session.auth().verify(5L, TimeUnit.SECONDS);
-
- try (ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ByteArrayOutputStream err = new ByteArrayOutputStream()) {
-
- channel.setOut(out);
- channel.setErr(err);
- channel.open().verify(9L, TimeUnit.SECONDS);
-
- assertTrue("Latch time out", TestEchoShellFactory.TestEchoShell.latch.await(10L, TimeUnit.SECONDS));
- int state = channel.waitFor(ClientChannel.CLOSED, WAIT);
- assertEquals("Wrong channel state", ClientChannel.CLOSED | ClientChannel.EOF | ClientChannel.OPENED, state);
-
- channel.close(false);
- }
- } finally {
- client.stop();
- }
- }
-
public static class TestEchoShellFactory extends EchoShellFactory {
@Override
public Command create() {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java b/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
index 07fd3cb..26f9645 100644
--- a/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
@@ -206,7 +206,7 @@ public class KeyReExchangeTest extends BaseTestSupport {
sb.append("\n");
final AtomicInteger exchanges = new AtomicInteger();
- session.addListener(new SessionListener() {
+ session.addSessionListener(new SessionListener() {
@Override
public void sessionCreated(Session session) {
// ignored
[2/4] mina-sshd git commit: [SSHD-555] Add ChannelListener support
Posted by lg...@apache.org.
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
index dbe8b2b..36cbbe3 100644
--- a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
@@ -36,11 +36,15 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.sshd.client.auth.UserAuth;
import org.apache.sshd.client.auth.UserAuthKeyboardInteractive;
@@ -54,15 +58,22 @@ import org.apache.sshd.client.channel.ClientChannel;
import org.apache.sshd.client.future.AuthFuture;
import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.client.subsystem.SubsystemClient;
+import org.apache.sshd.client.subsystem.sftp.SftpClient;
+import org.apache.sshd.common.Factory;
import org.apache.sshd.common.FactoryManager;
import org.apache.sshd.common.FactoryManagerUtils;
import org.apache.sshd.common.NamedFactory;
+import org.apache.sshd.common.NamedResource;
import org.apache.sshd.common.RuntimeSshException;
import org.apache.sshd.common.Service;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.channel.AbstractChannel;
import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.channel.ChannelListener;
+import org.apache.sshd.common.channel.ChannelListenerManager;
+import org.apache.sshd.common.channel.TestChannelListener;
import org.apache.sshd.common.cipher.BuiltinCiphers;
import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.future.SshFutureListener;
@@ -75,9 +86,11 @@ import org.apache.sshd.common.keyprovider.KeyPairProvider;
import org.apache.sshd.common.session.AbstractSession;
import org.apache.sshd.common.session.ConnectionService;
import org.apache.sshd.common.session.Session;
+import org.apache.sshd.common.session.SessionListener;
import org.apache.sshd.common.subsystem.sftp.SftpConstants;
import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.Transformer;
+import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
import org.apache.sshd.common.util.io.NoCloseOutputStream;
@@ -95,6 +108,7 @@ import org.apache.sshd.server.session.ServerConnectionServiceFactory;
import org.apache.sshd.server.session.ServerSession;
import org.apache.sshd.server.session.ServerUserAuthService;
import org.apache.sshd.server.session.ServerUserAuthServiceFactory;
+import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory;
import org.apache.sshd.util.AsyncEchoShellFactory;
import org.apache.sshd.util.BaseTestSupport;
import org.apache.sshd.util.BogusPasswordAuthenticator;
@@ -106,6 +120,8 @@ import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* TODO Add javadoc
@@ -121,6 +137,28 @@ public class ClientTest extends BaseTestSupport {
private CountDownLatch authLatch;
private CountDownLatch channelLatch;
+ private final AtomicReference<ClientSession> clientSessionHolder = new AtomicReference<ClientSession>(null);
+ @SuppressWarnings("synthetic-access")
+ private final SessionListener clientSessionListener = new SessionListener() {
+ @Override
+ public void sessionCreated(Session session) {
+ assertObjectInstanceOf("Non client session creation notification", ClientSession.class, session);
+ assertNull("Multiple creation notifications", clientSessionHolder.getAndSet((ClientSession) session));
+ }
+
+ @Override
+ public void sessionEvent(Session session, Event event) {
+ assertObjectInstanceOf("Non client session event notification: " + event, ClientSession.class, session);
+ assertSame("Mismatched client session event instance: " + event, clientSessionHolder.get(), session);
+ }
+
+ @Override
+ public void sessionClosed(Session session) {
+ assertObjectInstanceOf("Non client session closure notification", ClientSession.class, session);
+ assertSame("Mismatched client session closure instance", clientSessionHolder.getAndSet(null), session);
+ }
+ };
+
public ClientTest() {
super();
}
@@ -185,6 +223,8 @@ public class ClientTest extends BaseTestSupport {
port = sshd.getPort();
client = SshClient.setUpDefaultClient();
+ clientSessionHolder.set(null); // just making sure
+ client.addSessionListener(clientSessionListener);
}
@After
@@ -195,6 +235,167 @@ public class ClientTest extends BaseTestSupport {
if (client != null) {
client.stop();
}
+ clientSessionHolder.set(null); // just making sure
+ }
+
+ @Test
+ public void testClientStillActiveIfListenerExceptions() throws Exception {
+ final Map<String, Integer> eventsMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+ final Collection<String> failuresSet = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
+ final Logger log = LoggerFactory.getLogger(getClass());
+ client.addChannelListener(new ChannelListener() {
+ @Override
+ public void channelOpenSuccess(Channel channel) {
+ handleChannelEvent("OpenSuccess", channel);
+ }
+
+ @Override
+ public void channelOpenFailure(Channel channel, Throwable reason) {
+ assertObjectInstanceOf("Mismatched failure reason type", ChannelFailureException.class, reason);
+
+ String name = ((NamedResource) reason).getName();
+ synchronized(failuresSet) {
+ assertTrue("Re-signalled failure location: " + name, failuresSet.add(name));
+ }
+ }
+
+ @Override
+ public void channelInitialized(Channel channel) {
+ handleChannelEvent("Initialized", channel);
+ }
+
+ @Override
+ public void channelClosed(Channel channel) {
+ handleChannelEvent("Closed", channel);
+ }
+
+ private void handleChannelEvent(String name, Channel channel) {
+ int id = channel.getId();
+ synchronized(eventsMap) {
+ if (eventsMap.put(name, id) != null) {
+ return; // already generated an exception for this event
+ }
+ }
+
+ log.info("handleChannelEvent({})[{}]", name, id);
+ throw new ChannelFailureException(name);
+ }
+ });
+
+ client.start();
+
+ try (ClientSession session = createTestClientSession();
+ PipedOutputStream pipedIn = new PipedOutputStream();
+ InputStream inPipe = new PipedInputStream(pipedIn);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+ // we expect failures either on channel init or open - the one on close is ignored...
+ for (int retryCount = 0; retryCount <= 3; retryCount++) {
+ try {
+ try(ChannelShell channel = session.createShellChannel()) {
+ channel.setIn(inPipe);
+ channel.setOut(out);
+ channel.setErr(err);
+ channel.open().verify(3L, TimeUnit.SECONDS);
+ break; // 1st success means all methods have been invoked
+ }
+ } catch (IOException e) {
+ synchronized(eventsMap) {
+ eventsMap.remove("Closed"); // since it is called anyway but does not cause an IOException
+ assertTrue("Unexpected failure at retry #" + retryCount, eventsMap.size() < 3);
+ }
+ }
+ }
+ } finally {
+ client.stop();
+ }
+
+ assertEquals("Mismatched total failures count on test end", 3, eventsMap.size());
+ assertEquals("Mismatched open failures count on test end", 1, failuresSet.size());
+ }
+
+ @Test
+ public void testSimpleClientListener() throws Exception {
+ final AtomicReference<Channel> channelHolder = new AtomicReference<>(null);
+ client.addChannelListener(new ChannelListener() {
+ @Override
+ public void channelOpenSuccess(Channel channel) {
+ assertSame("Mismatched opened channel instances", channel, channelHolder.get());
+ }
+
+ @Override
+ public void channelOpenFailure(Channel channel, Throwable reason) {
+ assertSame("Mismatched failed open channel instances", channel, channelHolder.get());
+ }
+
+ @Override
+ public void channelInitialized(Channel channel) {
+ assertNull("Multiple channel initialization notifications", channelHolder.getAndSet(channel));
+ }
+
+ @Override
+ public void channelClosed(Channel channel) {
+ assertSame("Mismatched closed channel instances", channel, channelHolder.getAndSet(null));
+ }
+ });
+ sshd.setSubsystemFactories(Arrays.<NamedFactory<Command>>asList(new SftpSubsystemFactory()));
+
+ client.start();
+
+ try (final ClientSession session = createTestClientSession()) {
+ testClientListener(channelHolder, ChannelShell.class, new Factory<ChannelShell>() {
+ @Override
+ public ChannelShell create() {
+ try {
+ return session.createShellChannel();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ testClientListener(channelHolder, ChannelExec.class, new Factory<ChannelExec>() {
+ @Override
+ public ChannelExec create() {
+ try {
+ return session.createExecChannel(getCurrentTestName());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ testClientListener(channelHolder, SftpClient.class, new Factory<SftpClient>() {
+ @Override
+ public SftpClient create() {
+ try {
+ return session.createSftpClient();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } finally {
+ client.stop();
+ }
+ }
+
+ private <C extends Closeable> void testClientListener(AtomicReference<Channel> channelHolder, Class<C> channelType, Factory<? extends C> factory) throws Exception {
+ assertNull(channelType.getSimpleName() + ": Unexpected currently active channel", channelHolder.get());
+
+ try(C instance = factory.create()) {
+ Channel expectedChannel;
+ if (instance instanceof Channel) {
+ expectedChannel = (Channel) instance;
+ } else if (instance instanceof SubsystemClient) {
+ expectedChannel = ((SubsystemClient) instance).getClientChannel();
+ } else {
+ throw new UnsupportedOperationException("Unknown test instance type" + instance.getClass().getSimpleName());
+ }
+
+ Channel actualChannel = channelHolder.get();
+ assertSame("Mismatched listener " + channelType.getSimpleName() + " instances", expectedChannel, actualChannel);
+ }
+
+ assertNull(channelType.getSimpleName() + ": Active channel closure not signalled", channelHolder.get());
}
@Test
@@ -205,23 +406,21 @@ public class ClientTest extends BaseTestSupport {
FactoryManagerUtils.updateProperty(client, FactoryManager.WINDOW_SIZE, 1024);
client.start();
- try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
- session.addPasswordIdentity(getCurrentTestName());
- session.auth().verify(5L, TimeUnit.SECONDS);
+ try (ClientSession session = createTestClientSession();
+ final ChannelShell channel = session.createShellChannel()) {
- try (final ChannelShell channel = session.createShellChannel()) {
- channel.setStreaming(ClientChannel.Streaming.Async);
- channel.open().verify(5L, TimeUnit.SECONDS);
+ channel.setStreaming(ClientChannel.Streaming.Async);
+ channel.open().verify(5L, TimeUnit.SECONDS);
- final byte[] message = "0123456789\n".getBytes(StandardCharsets.UTF_8);
- final int nbMessages = 1000;
+ final byte[] message = "0123456789\n".getBytes(StandardCharsets.UTF_8);
+ final int nbMessages = 1000;
- try (final ByteArrayOutputStream baosOut = new ByteArrayOutputStream();
- final ByteArrayOutputStream baosErr = new ByteArrayOutputStream()) {
- final AtomicInteger writes = new AtomicInteger(nbMessages);
+ try (final ByteArrayOutputStream baosOut = new ByteArrayOutputStream();
+ final ByteArrayOutputStream baosErr = new ByteArrayOutputStream()) {
+ final AtomicInteger writes = new AtomicInteger(nbMessages);
- channel.getAsyncIn().write(new ByteArrayBuffer(message))
- .addListener(new SshFutureListener<IoWriteFuture>() {
+ channel.getAsyncIn().write(new ByteArrayBuffer(message))
+ .addListener(new SshFutureListener<IoWriteFuture>() {
@Override
public void operationComplete(IoWriteFuture future) {
try {
@@ -242,8 +441,8 @@ public class ClientTest extends BaseTestSupport {
}
}
});
- channel.getAsyncOut().read(new ByteArrayBuffer())
- .addListener(new SshFutureListener<IoReadFuture>() {
+ channel.getAsyncOut().read(new ByteArrayBuffer())
+ .addListener(new SshFutureListener<IoReadFuture>() {
@Override
public void operationComplete(IoReadFuture future) {
try {
@@ -261,8 +460,8 @@ public class ClientTest extends BaseTestSupport {
}
}
});
- channel.getAsyncErr().read(new ByteArrayBuffer())
- .addListener(new SshFutureListener<IoReadFuture>() {
+ channel.getAsyncErr().read(new ByteArrayBuffer())
+ .addListener(new SshFutureListener<IoReadFuture>() {
@Override
public void operationComplete(IoReadFuture future) {
try {
@@ -281,210 +480,201 @@ public class ClientTest extends BaseTestSupport {
}
});
- channel.waitFor(ClientChannel.CLOSED, 0);
+ channel.waitFor(ClientChannel.CLOSED, 0);
- assertEquals(nbMessages * message.length, baosOut.size());
- }
+ assertEquals("Mismatched sent and received data size", nbMessages * message.length, baosOut.size());
}
client.close(true);
} finally {
client.stop();
}
+ assertNull("Session closure not signalled", clientSessionHolder.get());
}
@Test
public void testCommandDeadlock() throws Exception {
client.start();
- try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
- session.addPasswordIdentity(getCurrentTestName());
- session.auth().verify(5L, TimeUnit.SECONDS);
-
- try (ChannelExec channel = session.createExecChannel(getCurrentTestName());
- OutputStream stdout = new NoCloseOutputStream(System.out);
- OutputStream stderr = new NoCloseOutputStream(System.err)) {
-
- channel.setOut(stdout);
- channel.setErr(stderr);
- channel.open().verify(9L, TimeUnit.SECONDS);
- Thread.sleep(125L);
- try {
- byte[] data = "a".getBytes(StandardCharsets.UTF_8);
- for (int i = 0; i < 100; i++) {
- channel.getInvertedIn().write(data);
- channel.getInvertedIn().flush();
- }
- } catch (SshException e) {
- // That's ok, the channel is being closed by the other side
+ try (ClientSession session = createTestClientSession();
+ ChannelExec channel = session.createExecChannel(getCurrentTestName());
+ OutputStream stdout = new NoCloseOutputStream(System.out);
+ OutputStream stderr = new NoCloseOutputStream(System.err)) {
+
+ channel.setOut(stdout);
+ channel.setErr(stderr);
+ channel.open().verify(9L, TimeUnit.SECONDS);
+ Thread.sleep(125L);
+ try {
+ byte[] data = "a".getBytes(StandardCharsets.UTF_8);
+ OutputStream invertedStream = channel.getInvertedIn();
+ for (int i = 0; i < 100; i++) {
+ invertedStream.write(data);
+ invertedStream.flush();
}
- assertEquals(ClientChannel.CLOSED, channel.waitFor(ClientChannel.CLOSED, 0) & ClientChannel.CLOSED);
- session.close(false).await();
+ } catch (SshException e) {
+ // That's ok, the channel is being closed by the other side
}
+ assertEquals(ClientChannel.CLOSED, channel.waitFor(ClientChannel.CLOSED, 0) & ClientChannel.CLOSED);
+ session.close(false).await();
} finally {
client.stop();
}
+
+ assertNull("Session closure not signalled", clientSessionHolder.get());
}
@Test
public void testClient() throws Exception {
client.start();
- try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
- session.addPasswordIdentity(getCurrentTestName());
- session.auth().verify(5L, TimeUnit.SECONDS);
-
- try (ClientChannel channel = session.createShellChannel();
- ByteArrayOutputStream sent = new ByteArrayOutputStream();
- PipedOutputStream pipedIn = new PipedOutputStream();
- PipedInputStream pipedOut = new PipedInputStream(pipedIn)) {
+ try (ClientSession session = createTestClientSession();
+ ClientChannel channel = session.createShellChannel();
+ ByteArrayOutputStream sent = new ByteArrayOutputStream();
+ PipedOutputStream pipedIn = new PipedOutputStream();
+ PipedInputStream pipedOut = new PipedInputStream(pipedIn)) {
- channel.setIn(pipedOut);
+ channel.setIn(pipedOut);
- try (OutputStream teeOut = new TeeOutputStream(sent, pipedIn);
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+ try (OutputStream teeOut = new TeeOutputStream(sent, pipedIn);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream()) {
- channel.setOut(out);
- channel.setErr(err);
- channel.open();
+ channel.setOut(out);
+ channel.setErr(err);
+ channel.open();
- teeOut.write("this is my command\n".getBytes(StandardCharsets.UTF_8));
- teeOut.flush();
+ teeOut.write("this is my command\n".getBytes(StandardCharsets.UTF_8));
+ teeOut.flush();
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < 1000; i++) {
- sb.append("0123456789");
- }
- sb.append("\n");
- teeOut.write(sb.toString().getBytes(StandardCharsets.UTF_8));
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 1000; i++) {
+ sb.append("0123456789");
+ }
+ sb.append("\n");
+ teeOut.write(sb.toString().getBytes(StandardCharsets.UTF_8));
- teeOut.write("exit\n".getBytes(StandardCharsets.UTF_8));
- teeOut.flush();
+ teeOut.write("exit\n".getBytes(StandardCharsets.UTF_8));
+ teeOut.flush();
- channel.waitFor(ClientChannel.CLOSED, 0);
+ channel.waitFor(ClientChannel.CLOSED, 0);
- channel.close(false);
- client.stop();
+ channel.close(false);
+ client.stop();
- assertArrayEquals(sent.toByteArray(), out.toByteArray());
- }
+ assertArrayEquals("Mismatched sent and received data", sent.toByteArray(), out.toByteArray());
}
} finally {
client.stop();
}
+
+ assertNull("Session closure not signalled", clientSessionHolder.get());
}
@Test
public void testClientInverted() throws Exception {
client.start();
- try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
- session.addPasswordIdentity(getCurrentTestName());
- session.auth().verify(5L, TimeUnit.SECONDS);
-
- try (ClientChannel channel = session.createShellChannel();
- ByteArrayOutputStream sent = new ByteArrayOutputStream();
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ByteArrayOutputStream err = new ByteArrayOutputStream()) {
-
- channel.setOut(out);
- channel.setErr(err);
- channel.open().verify(9L, TimeUnit.SECONDS);
+ try (ClientSession session = createTestClientSession();
+ ClientChannel channel = session.createShellChannel();
+ ByteArrayOutputStream sent = new ByteArrayOutputStream();
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream()) {
- try (OutputStream pipedIn = new TeeOutputStream(sent, channel.getInvertedIn())) {
- pipedIn.write("this is my command\n".getBytes(StandardCharsets.UTF_8));
- pipedIn.flush();
+ channel.setOut(out);
+ channel.setErr(err);
+ channel.open().verify(9L, TimeUnit.SECONDS);
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < 1000; i++) {
- sb.append("0123456789");
- }
- sb.append("\n");
- pipedIn.write(sb.toString().getBytes(StandardCharsets.UTF_8));
+ try (OutputStream pipedIn = new TeeOutputStream(sent, channel.getInvertedIn())) {
+ pipedIn.write("this is my command\n".getBytes(StandardCharsets.UTF_8));
+ pipedIn.flush();
- pipedIn.write("exit\n".getBytes(StandardCharsets.UTF_8));
- pipedIn.flush();
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 1000; i++) {
+ sb.append("0123456789");
}
+ sb.append("\n");
+ pipedIn.write(sb.toString().getBytes(StandardCharsets.UTF_8));
- channel.waitFor(ClientChannel.CLOSED, 0);
+ pipedIn.write("exit\n".getBytes(StandardCharsets.UTF_8));
+ pipedIn.flush();
+ }
- channel.close(false);
- client.stop();
+ channel.waitFor(ClientChannel.CLOSED, 0);
- assertArrayEquals(sent.toByteArray(), out.toByteArray());
- }
+ channel.close(false);
+ client.stop();
+
+ assertArrayEquals("Mismatched sent and received data", sent.toByteArray(), out.toByteArray());
} finally {
client.stop();
}
+
+ assertNull("Session closure not signalled", clientSessionHolder.get());
}
@Test
public void testClientWithCustomChannel() throws Exception {
client.start();
- try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
- session.addPasswordIdentity(getCurrentTestName());
- session.auth().verify(5L, TimeUnit.SECONDS);
-
- try (ChannelShell channel = new ChannelShell();
- ByteArrayOutputStream sent = new ByteArrayOutputStream();
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ByteArrayOutputStream err = new ByteArrayOutputStream()) {
-
- session.getService(ConnectionService.class).registerChannel(channel);
- channel.setOut(out);
- channel.setErr(err);
- channel.open().verify(5L, TimeUnit.SECONDS);
- channel.close(false).await();
- }
+ try (ClientSession session = createTestClientSession();
+ ChannelShell channel = new ChannelShell();
+ ByteArrayOutputStream sent = new ByteArrayOutputStream();
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+
+ session.getService(ConnectionService.class).registerChannel(channel);
+ channel.setOut(out);
+ channel.setErr(err);
+ channel.open().verify(5L, TimeUnit.SECONDS);
+ channel.close(false).await();
} finally {
client.stop();
}
+
+ assertNull("Session closure not signalled", clientSessionHolder.get());
}
@Test
public void testClientClosingStream() throws Exception {
client.start();
- try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
- session.addPasswordIdentity(getCurrentTestName());
- session.auth().verify(5L, TimeUnit.SECONDS);
-
- try (ClientChannel channel = session.createShellChannel();
- ByteArrayOutputStream sent = new ByteArrayOutputStream();
- PipedOutputStream pipedIn = new PipedOutputStream();
- InputStream inPipe = new PipedInputStream(pipedIn);
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ByteArrayOutputStream err = new ByteArrayOutputStream()) {
-
- channel.setIn(inPipe);
- channel.setOut(out);
- channel.setErr(err);
- channel.open();
-
- try (OutputStream teeOut = new TeeOutputStream(sent, pipedIn)) {
- teeOut.write("this is my command\n".getBytes(StandardCharsets.UTF_8));
- teeOut.flush();
-
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < 1000; i++) {
- sb.append("0123456789");
- }
- sb.append("\n");
- teeOut.write(sb.toString().getBytes(StandardCharsets.UTF_8));
+ try (ClientSession session = createTestClientSession();
+ ClientChannel channel = session.createShellChannel();
+ ByteArrayOutputStream sent = new ByteArrayOutputStream();
+ PipedOutputStream pipedIn = new PipedOutputStream();
+ InputStream inPipe = new PipedInputStream(pipedIn);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+
+ channel.setIn(inPipe);
+ channel.setOut(out);
+ channel.setErr(err);
+ channel.open();
+
+ try (OutputStream teeOut = new TeeOutputStream(sent, pipedIn)) {
+ teeOut.write("this is my command\n".getBytes(StandardCharsets.UTF_8));
+ teeOut.flush();
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 1000; i++) {
+ sb.append("0123456789");
}
+ sb.append("\n");
+ teeOut.write(sb.toString().getBytes(StandardCharsets.UTF_8));
+ }
- channel.waitFor(ClientChannel.CLOSED, 0);
+ channel.waitFor(ClientChannel.CLOSED, 0);
- channel.close(false);
- client.stop();
+ channel.close(false);
+ client.stop();
- assertArrayEquals(sent.toByteArray(), out.toByteArray());
- }
+ assertArrayEquals("Mismatched sent and received data", sent.toByteArray(), out.toByteArray());
} finally {
client.stop();
}
+
+ assertNull("Session closure not signalled", clientSessionHolder.get());
}
@Test
@@ -496,79 +686,73 @@ public class ClientTest extends BaseTestSupport {
// FactoryManagerUtils.updateProperty(sshd, SshServer.MAX_PACKET_SIZE, 0x1000);
client.start();
- try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
- session.addPasswordIdentity(getCurrentTestName());
- session.auth().verify(5L, TimeUnit.SECONDS);
-
- try (ClientChannel channel = session.createShellChannel();
- ByteArrayOutputStream sent = new ByteArrayOutputStream();
- PipedOutputStream pipedIn = new PipedOutputStream();
- InputStream inPipe = new PipedInputStream(pipedIn);
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ByteArrayOutputStream err = new ByteArrayOutputStream()) {
-
- channel.setIn(inPipe);
- channel.setOut(out);
- channel.setErr(err);
- channel.open().verify(9L, TimeUnit.SECONDS);
-
-
- int bytes = 0;
- byte[] data = "01234567890123456789012345678901234567890123456789\n".getBytes(StandardCharsets.UTF_8);
- long t0 = System.currentTimeMillis();
- try (OutputStream teeOut = new TeeOutputStream(sent, pipedIn)) {
- for (int i = 0; i < 10000; i++) {
- teeOut.write(data);
- teeOut.flush();
- bytes += data.length;
- if ((bytes & 0xFFF00000) != ((bytes - data.length) & 0xFFF00000)) {
- System.out.println("Bytes written: " + bytes);
- }
- }
- teeOut.write("exit\n".getBytes(StandardCharsets.UTF_8));
+ try (ClientSession session = createTestClientSession();
+ ClientChannel channel = session.createShellChannel();
+ ByteArrayOutputStream sent = new ByteArrayOutputStream();
+ PipedOutputStream pipedIn = new PipedOutputStream();
+ InputStream inPipe = new PipedInputStream(pipedIn);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+
+ channel.setIn(inPipe);
+ channel.setOut(out);
+ channel.setErr(err);
+ channel.open().verify(9L, TimeUnit.SECONDS);
+
+ int bytes = 0;
+ byte[] data = "01234567890123456789012345678901234567890123456789\n".getBytes(StandardCharsets.UTF_8);
+ long t0 = System.currentTimeMillis();
+ try (OutputStream teeOut = new TeeOutputStream(sent, pipedIn)) {
+ for (int i = 0; i < 10000; i++) {
+ teeOut.write(data);
teeOut.flush();
+ bytes += data.length;
+ if ((bytes & 0xFFF00000) != ((bytes - data.length) & 0xFFF00000)) {
+ System.out.println("Bytes written: " + bytes);
+ }
}
- long t1 = System.currentTimeMillis();
-
- System.out.println("Sent " + (bytes / 1024) + " Kb in " + (t1 - t0) + " ms");
+ teeOut.write("exit\n".getBytes(StandardCharsets.UTF_8));
+ teeOut.flush();
+ }
- System.out.println("Waiting for channel to be closed");
+ long t1 = System.currentTimeMillis();
+ System.out.println("Sent " + (bytes / 1024) + " Kb in " + (t1 - t0) + " ms");
- channel.waitFor(ClientChannel.CLOSED, 0);
-
- channel.close(false);
- client.stop();
+ System.out.println("Waiting for channel to be closed");
+ channel.waitFor(ClientChannel.CLOSED, 0);
+ channel.close(false);
+ client.stop();
- assertArrayEquals(sent.toByteArray(), out.toByteArray());
- //assertArrayEquals(sent.toByteArray(), out.toByteArray());
- }
+ assertArrayEquals("Mismatched sent and received data", sent.toByteArray(), out.toByteArray());
} finally {
client.stop();
}
+
+ assertNull("Session closure not signalled", clientSessionHolder.get());
}
@Test(expected = SshException.class)
public void testOpenChannelOnClosedSession() throws Exception {
client.start();
- try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
- session.addPasswordIdentity(getCurrentTestName());
- session.auth().verify(5L, TimeUnit.SECONDS);
+ try (ClientSession session = createTestClientSession();
+ ClientChannel channel = session.createShellChannel()) {
- try (ClientChannel channel = session.createShellChannel()) {
- session.close(false);
+ session.close(false);
+ assertNull("Session closure not signalled", clientSessionHolder.get());
- try (PipedOutputStream pipedIn = new PipedOutputStream();
- InputStream inPipe = new PipedInputStream(pipedIn);
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+ try (PipedOutputStream pipedIn = new PipedOutputStream();
+ InputStream inPipe = new PipedInputStream(pipedIn);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream()) {
- channel.setIn(inPipe);
- channel.setOut(out);
- channel.setErr(err);
- channel.open();
- }
+ channel.setIn(inPipe);
+ channel.setOut(out);
+ channel.setErr(err);
+ channel.open();
}
+ } finally {
+ client.stop();
}
}
@@ -578,10 +762,12 @@ public class ClientTest extends BaseTestSupport {
client.start();
try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
+ assertNotNull("Client session creation not signalled", clientSessionHolder.get());
session.addPasswordIdentity(getCurrentTestName());
AuthFuture authFuture = session.auth();
CloseFuture closeFuture = session.close(false);
+
authLatch.countDown();
assertTrue("Authentication writing not completed in time", authFuture.await(11L, TimeUnit.SECONDS));
assertTrue("Session closing not complete in time", closeFuture.await(8L, TimeUnit.SECONDS));
@@ -590,35 +776,35 @@ public class ClientTest extends BaseTestSupport {
} finally {
client.stop();
}
+
+ assertNull("Session closure not signalled", clientSessionHolder.get());
}
@Test
public void testCloseCleanBeforeChannelOpened() throws Exception {
client.start();
- try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
- session.addPasswordIdentity(getCurrentTestName());
- session.auth().verify(5L, TimeUnit.SECONDS);
-
- try (ClientChannel channel = session.createShellChannel();
- InputStream inp = new ByteArrayInputStream(GenericUtils.EMPTY_BYTE_ARRAY);
- OutputStream out = new ByteArrayOutputStream();
- OutputStream err = new ByteArrayOutputStream()) {
+ try (ClientSession session = createTestClientSession();
+ ClientChannel channel = session.createShellChannel();
+ InputStream inp = new ByteArrayInputStream(GenericUtils.EMPTY_BYTE_ARRAY);
+ OutputStream out = new ByteArrayOutputStream();
+ OutputStream err = new ByteArrayOutputStream()) {
- channel.setIn(inp);
- channel.setOut(out);
- channel.setErr(err);
+ channel.setIn(inp);
+ channel.setOut(out);
+ channel.setErr(err);
- OpenFuture openFuture = channel.open();
- CloseFuture closeFuture = session.close(false);
- assertTrue("Channel not open in time", openFuture.await(11L, TimeUnit.SECONDS));
- assertTrue("Session closing not complete in time", closeFuture.await(8L, TimeUnit.SECONDS));
- assertTrue("Not open", openFuture.isOpened());
- assertTrue("Not closed", closeFuture.isClosed());
- }
+ OpenFuture openFuture = channel.open();
+ CloseFuture closeFuture = session.close(false);
+ assertTrue("Channel not open in time", openFuture.await(11L, TimeUnit.SECONDS));
+ assertTrue("Session closing not complete in time", closeFuture.await(8L, TimeUnit.SECONDS));
+ assertTrue("Not open", openFuture.isOpened());
+ assertTrue("Not closed", closeFuture.isClosed());
} finally {
client.stop();
}
+
+ assertNull("Session closure not signalled", clientSessionHolder.get());
}
@Test
@@ -626,27 +812,25 @@ public class ClientTest extends BaseTestSupport {
channelLatch = new CountDownLatch(1);
client.start();
- try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
- session.addPasswordIdentity(getCurrentTestName());
- session.auth().verify(5L, TimeUnit.SECONDS);
+ try (ClientSession session = createTestClientSession();
+ ClientChannel channel = session.createShellChannel();
+ InputStream inp = new ByteArrayInputStream(GenericUtils.EMPTY_BYTE_ARRAY);
+ OutputStream out = new ByteArrayOutputStream();
+ OutputStream err = new ByteArrayOutputStream()) {
- try (ClientChannel channel = session.createShellChannel();
- InputStream inp = new ByteArrayInputStream(GenericUtils.EMPTY_BYTE_ARRAY);
- OutputStream out = new ByteArrayOutputStream();
- OutputStream err = new ByteArrayOutputStream()) {
+ channel.setIn(inp);
+ channel.setOut(out);
+ channel.setErr(err);
- channel.setIn(inp);
- channel.setOut(out);
- channel.setErr(err);
+ OpenFuture openFuture = channel.open();
+ CloseFuture closeFuture = session.close(true);
+ assertNull("Session closure not signalled", clientSessionHolder.get());
- OpenFuture openFuture = channel.open();
- CloseFuture closeFuture = session.close(true);
- channelLatch.countDown();
- assertTrue("Channel not open in time", openFuture.await(11L, TimeUnit.SECONDS));
- assertTrue("Session closing not complete in time", closeFuture.await(8L, TimeUnit.SECONDS));
- assertNotNull("No open exception", openFuture.getException());
- assertTrue("Not closed", closeFuture.isClosed());
- }
+ channelLatch.countDown();
+ assertTrue("Channel not open in time", openFuture.await(11L, TimeUnit.SECONDS));
+ assertTrue("Session closing not complete in time", closeFuture.await(8L, TimeUnit.SECONDS));
+ assertNotNull("No open exception", openFuture.getException());
+ assertTrue("Not closed", closeFuture.isClosed());
} finally {
client.stop();
}
@@ -657,12 +841,14 @@ public class ClientTest extends BaseTestSupport {
client.start();
try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
+ assertNotNull("Client session creation not signalled", clientSessionHolder.get());
KeyPair pair = Utils.createTestHostKeyProvider().loadKey(KeyPairProvider.SSH_RSA);
session.addPublicKeyIdentity(pair);
session.auth().verify(5L, TimeUnit.SECONDS);
} finally {
client.stop();
}
+ assertNull("Session closure not signalled", clientSessionHolder.get());
}
@Test
@@ -671,15 +857,20 @@ public class ClientTest extends BaseTestSupport {
client.start();
try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
+ assertNotNull("Client session creation not signalled", clientSessionHolder.get());
session.addPublicKeyIdentity(Utils.createTestHostKeyProvider().loadKey(KeyPairProvider.SSH_RSA));
session.auth().verify(5L, TimeUnit.SECONDS);
} finally {
client.stop();
}
+ assertNull("Session closure not signalled", clientSessionHolder.get());
}
@Test
public void testPublicKeyAuthNewWithFailureOnFirstIdentity() throws Exception {
+ SimpleGeneratorHostKeyProvider provider = new SimpleGeneratorHostKeyProvider();
+ provider.setAlgorithm("RSA");
+
final KeyPair pair = Utils.createTestHostKeyProvider().loadKey(KeyPairProvider.SSH_RSA);
sshd.setPublickeyAuthenticator(new PublickeyAuthenticator() {
@Override
@@ -690,16 +881,15 @@ public class ClientTest extends BaseTestSupport {
client.setUserAuthFactories(Arrays.<NamedFactory<UserAuth>>asList(UserAuthPublicKeyFactory.INSTANCE));
client.start();
- SimpleGeneratorHostKeyProvider provider = new SimpleGeneratorHostKeyProvider();
- provider.setAlgorithm("RSA");
-
try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
+ assertNotNull("Client session creation not signalled", clientSessionHolder.get());
session.addPublicKeyIdentity(provider.loadKey(KeyPairProvider.SSH_RSA));
session.addPublicKeyIdentity(pair);
session.auth().verify(5L, TimeUnit.SECONDS);
} finally {
client.stop();
}
+ assertNull("Session closure not signalled", clientSessionHolder.get());
}
@Test
@@ -707,12 +897,12 @@ public class ClientTest extends BaseTestSupport {
client.setUserAuthFactories(Arrays.<NamedFactory<UserAuth>>asList(new UserAuthPasswordFactory()));
client.start();
- try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
- session.addPasswordIdentity(getCurrentTestName());
- session.auth().verify(5L, TimeUnit.SECONDS);
+ try (ClientSession session = createTestClientSession()) {
+ // nothing extra
} finally {
client.stop();
}
+ assertNull("Session closure not signalled", clientSessionHolder.get());
}
@Test
@@ -721,12 +911,14 @@ public class ClientTest extends BaseTestSupport {
client.start();
try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
+ assertNotNull("Client session creation not signalled", clientSessionHolder.get());
session.addPasswordIdentity(getClass().getSimpleName());
session.addPasswordIdentity(getCurrentTestName());
session.auth().verify(5L, TimeUnit.SECONDS);
} finally {
client.stop();
}
+ assertNull("Session closure not signalled", clientSessionHolder.get());
}
@Test
@@ -734,12 +926,12 @@ public class ClientTest extends BaseTestSupport {
client.setUserAuthFactories(Arrays.<NamedFactory<UserAuth>>asList(UserAuthKeyboardInteractiveFactory.INSTANCE));
client.start();
- try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
- session.addPasswordIdentity(getCurrentTestName());
- session.auth().verify(5L, TimeUnit.SECONDS);
+ try (ClientSession session = createTestClientSession()) {
+ // nothing extra
} finally {
client.stop();
}
+ assertNull("Session closure not signalled", clientSessionHolder.get());
}
@Test
@@ -748,12 +940,14 @@ public class ClientTest extends BaseTestSupport {
client.start();
try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
+ assertNotNull("Client session creation not signalled", clientSessionHolder.get());
session.addPasswordIdentity(getClass().getSimpleName());
session.addPasswordIdentity(getCurrentTestName());
session.auth().verify(5L, TimeUnit.SECONDS);
} finally {
client.stop();
}
+ assertNull("Session closure not signalled", clientSessionHolder.get());
}
@Test // see SSHD-504
@@ -837,6 +1031,7 @@ public class ClientTest extends BaseTestSupport {
try {
for (int index = 0; index < xformers.size(); index++) {
try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7, TimeUnit.SECONDS).getSession()) {
+ assertNotNull("Client session creation not signalled at iteration #" + index, clientSessionHolder.get());
String password = "bad-" + getCurrentTestName() + "-" + index;
session.addPasswordIdentity(password);
@@ -845,13 +1040,15 @@ public class ClientTest extends BaseTestSupport {
assertFalse("Unexpected success for password=" + password, future.isSuccess());
session.removePasswordIdentity(password);
}
+
+ assertNull("Session closure not signalled at iteration #" + index, clientSessionHolder.get());
}
- try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
- session.addPasswordIdentity(getCurrentTestName());
- session.auth().verify(5L, TimeUnit.SECONDS);
+ try (ClientSession session = createTestClientSession()) {
assertTrue("Mismatched prompts evaluation results", mismatchedPrompts.isEmpty());
}
+
+ assertNull("Final session closure not signalled", clientSessionHolder.get());
} finally {
client.stop();
}
@@ -879,6 +1076,8 @@ public class ClientTest extends BaseTestSupport {
client.start();
try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
+ assertNotNull("Client session creation not signalled", clientSessionHolder.get());
+
AuthFuture future = session.auth();
future.await();
assertTrue("Unexpected authentication success", future.isFailure());
@@ -886,6 +1085,8 @@ public class ClientTest extends BaseTestSupport {
} finally {
client.stop();
}
+
+ assertNull("Session closure not signalled", clientSessionHolder.get());
}
@Test
@@ -899,6 +1100,7 @@ public class ClientTest extends BaseTestSupport {
client.start();
try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
+ assertNotNull("Client session creation not signalled", clientSessionHolder.get());
session.setUserInteraction(new UserInteraction() {
@Override
public void welcome(String banner) {
@@ -912,6 +1114,7 @@ public class ClientTest extends BaseTestSupport {
return new String[]{getCurrentTestName()};
}
});
+
AuthFuture future = session.auth();
future.await();
assertTrue("Authentication not marked as success", future.isSuccess());
@@ -920,6 +1123,8 @@ public class ClientTest extends BaseTestSupport {
} finally {
client.stop();
}
+
+ assertNull("Session closure not signalled", clientSessionHolder.get());
}
@Test
@@ -932,6 +1137,7 @@ public class ClientTest extends BaseTestSupport {
client.start();
try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
+ assertNotNull("Client session creation not signalled", clientSessionHolder.get());
session.setUserInteraction(new UserInteraction() {
@Override
public void welcome(String banner) {
@@ -945,6 +1151,7 @@ public class ClientTest extends BaseTestSupport {
return new String[]{"bad#" + attemptId};
}
});
+
AuthFuture future = session.auth();
assertTrue("Authentication not completed in time", future.await(11L, TimeUnit.SECONDS));
assertTrue("Authentication not, marked as failure", future.isFailure());
@@ -952,6 +1159,8 @@ public class ClientTest extends BaseTestSupport {
} finally {
client.stop();
}
+
+ assertNull("Session closure not signalled", clientSessionHolder.get());
}
@Test
@@ -960,36 +1169,35 @@ public class ClientTest extends BaseTestSupport {
try {
client.start();
- try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
- session.addPasswordIdentity(getCurrentTestName());
- session.auth().verify(5L, TimeUnit.SECONDS);
-
- try (ClientChannel channel = session.createShellChannel();
- PipedOutputStream pipedIn = new PipedOutputStream();
- InputStream inPipe = new PipedInputStream(pipedIn);
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ByteArrayOutputStream err = new ByteArrayOutputStream()) {
-
- channel.setIn(inPipe);
- channel.setOut(out);
- channel.setErr(err);
- channel.open().verify(9L, TimeUnit.SECONDS);
-
- // ((AbstractSession) session).disconnect(SshConstants.SSH2_DISCONNECT_BY_APPLICATION, "Cancel");
- AbstractSession cs = (AbstractSession) session;
- Buffer buffer = cs.createBuffer(SshConstants.SSH_MSG_DISCONNECT);
- buffer.putInt(SshConstants.SSH2_DISCONNECT_BY_APPLICATION);
- buffer.putString("Cancel");
- buffer.putString("");
- IoWriteFuture f = cs.writePacket(buffer);
- assertTrue("Packet writing not completed in time", f.await(11L, TimeUnit.SECONDS));
- suspend(cs.getIoSession());
-
- TestEchoShellFactory.TestEchoShell.latch.await();
- }
+ try (ClientSession session = createTestClientSession();
+ ClientChannel channel = session.createShellChannel();
+ PipedOutputStream pipedIn = new PipedOutputStream();
+ InputStream inPipe = new PipedInputStream(pipedIn);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+
+ channel.setIn(inPipe);
+ channel.setOut(out);
+ channel.setErr(err);
+ channel.open().verify(9L, TimeUnit.SECONDS);
+
+ // ((AbstractSession) session).disconnect(SshConstants.SSH2_DISCONNECT_BY_APPLICATION, "Cancel");
+ AbstractSession cs = (AbstractSession) session;
+ Buffer buffer = cs.createBuffer(SshConstants.SSH_MSG_DISCONNECT);
+ buffer.putInt(SshConstants.SSH2_DISCONNECT_BY_APPLICATION);
+ buffer.putString("Cancel");
+ buffer.putString("");
+
+ IoWriteFuture f = cs.writePacket(buffer);
+ assertTrue("Packet writing not completed in time", f.await(11L, TimeUnit.SECONDS));
+ suspend(cs.getIoSession());
+
+ TestEchoShellFactory.TestEchoShell.latch.await();
} finally {
client.stop();
}
+
+ assertNull("Session closure not signalled", clientSessionHolder.get());
} finally {
TestEchoShellFactory.TestEchoShell.latch = null;
}
@@ -1015,11 +1223,13 @@ public class ClientTest extends BaseTestSupport {
client.start();
try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
+ assertNotNull("Client session creation not signalled", clientSessionHolder.get());
session.waitFor(ClientSession.WAIT_AUTH, TimeUnit.SECONDS.toMillis(10L));
- assertTrue(ok.get());
+ assertTrue("Server key verifier invoked ?", ok.get());
} finally {
client.stop();
}
+ assertNull("Session closure not signalled", clientSessionHolder.get());
}
@Test
@@ -1028,9 +1238,7 @@ public class ClientTest extends BaseTestSupport {
client.getCipherFactories().add(BuiltinCiphers.none);
client.start();
- try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
- session.addPasswordIdentity(getCurrentTestName());
- session.auth().verify(5L, TimeUnit.SECONDS);
+ try (ClientSession session = createTestClientSession()) {
assertTrue("Failed to switch to NONE cipher on time", session.switchToNoneCipher().await(5L, TimeUnit.SECONDS));
try (ClientChannel channel = session.createSubsystemChannel(SftpConstants.SFTP_SUBSYSTEM_NAME)) {
@@ -1039,6 +1247,7 @@ public class ClientTest extends BaseTestSupport {
} finally {
client.stop();
}
+ assertNull("Session closure not signalled", clientSessionHolder.get());
}
@Test
@@ -1046,10 +1255,7 @@ public class ClientTest extends BaseTestSupport {
client.start();
Collection<ClientChannel> channels = new LinkedList<>();
- try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession()) {
- session.addPasswordIdentity(getCurrentTestName());
- session.auth().verify(5L, TimeUnit.SECONDS);
-
+ try (ClientSession session = createTestClientSession()) {
channels.add(session.createChannel(ClientChannel.CHANNEL_SUBSYSTEM, SftpConstants.SFTP_SUBSYSTEM_NAME));
channels.add(session.createChannel(ClientChannel.CHANNEL_EXEC, getCurrentTestName()));
channels.add(session.createChannel(ClientChannel.CHANNEL_SHELL, getClass().getSimpleName()));
@@ -1069,6 +1275,87 @@ public class ClientTest extends BaseTestSupport {
}
client.stop();
}
+
+ assertNull("Session closure not signalled", clientSessionHolder.get());
+ }
+
+ /**
+ * Makes sure that the {@link ChannelListener}s added to the client, session
+ * and channel are <U>cumulative</U> - i.e., all of them invoked
+ * @throws Exception If failed
+ */
+ @Test
+ public void testChannelListenersPropagation() throws Exception {
+ Map<String,TestChannelListener> clientListeners = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+ addChannelListener(clientListeners, client, new TestChannelListener(client.getClass().getSimpleName()));
+
+ client.start();
+ try (ClientSession session = createTestClientSession()) {
+ addChannelListener(clientListeners, session, new TestChannelListener(session.getClass().getSimpleName()));
+ assertListenerSizes("ClientSessionOpen", clientListeners, 0, 0);
+
+ try (ClientChannel channel = session.createSubsystemChannel(SftpConstants.SFTP_SUBSYSTEM_NAME)) {
+ channel.open().verify(5L, TimeUnit.SECONDS);
+
+ TestChannelListener channelListener = new TestChannelListener(channel.getClass().getSimpleName());
+ // need to emulate them since we are adding the listener AFTER the channel is open
+ channelListener.channelInitialized(channel);
+ channelListener.channelOpenSuccess(channel);
+ channel.addChannelListener(channelListener);
+ assertListenerSizes("ClientChannelOpen", clientListeners, 1, 1);
+ }
+
+ assertListenerSizes("ClientChannelClose", clientListeners, 0, 1);
+ } finally {
+ client.stop();
+ }
+
+ assertListenerSizes("ClientStop", clientListeners, 0, 1);
+ }
+
+ private static void assertListenerSizes(String phase, Map<String,? extends TestChannelListener> listeners, int activeSize, int openSize) {
+ assertListenerSizes(phase, listeners.values(), activeSize, openSize);
+ }
+
+ private static void assertListenerSizes(String phase, Collection<? extends TestChannelListener> listeners, int activeSize, int openSize) {
+ if (GenericUtils.isEmpty(listeners)) {
+ return;
+ }
+
+ for (TestChannelListener l : listeners) {
+ if (activeSize >= 0) {
+ assertEquals(phase + ": mismatched active channels size for " + l.getName() + " listener", activeSize, GenericUtils.size(l.getActiveChannels()));
+ }
+
+ if (openSize >= 0) {
+ assertEquals(phase + ": mismatched open channels size for " + l.getName() + " listener", openSize, GenericUtils.size(l.getOpenChannels()));
+ }
+
+ assertEquals(phase + ": unexpected failed channels size for " + l.getName() + " listener", 0, GenericUtils.size(l.getFailedChannels()));
+ }
+ }
+
+ private static <L extends ChannelListener & NamedResource> void addChannelListener(Map<String,L> listeners, ChannelListenerManager manager, L listener) {
+ String name = listener.getName();
+ assertNull("Duplicate listener named " + name, listeners.put(name, listener));
+ manager.addChannelListener(listener);
+ }
+
+ private ClientSession createTestClientSession() throws IOException {
+ ClientSession session = client.connect(getCurrentTestName(), "localhost", port).verify(7L, TimeUnit.SECONDS).getSession();
+ try {
+ assertNotNull("Client session creation not signalled", clientSessionHolder.get());
+ session.addPasswordIdentity(getCurrentTestName());
+ session.auth().verify(5L, TimeUnit.SECONDS);
+
+ ClientSession returnValue = session;
+ session = null; // avoid 'finally' close
+ return returnValue;
+ } finally {
+ if (session != null) {
+ session.close();
+ }
+ }
}
private void suspend(IoSession ioSession) {
@@ -1099,6 +1386,25 @@ public class ClientTest extends BaseTestSupport {
}
}
+ public static class ChannelFailureException extends RuntimeException implements NamedResource {
+ private static final long serialVersionUID = 1L; // we're not serializing it
+ private final String name;
+
+ public ChannelFailureException(String name) {
+ this.name = ValidateUtils.checkNotNullAndNotEmpty(name, "No event name provided");
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String toString() {
+ return getName();
+ }
+ }
+
public static void main(String[] args) throws Exception {
SshClient.main(args);
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/test/java/org/apache/sshd/common/channel/TestChannelListener.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/channel/TestChannelListener.java b/sshd-core/src/test/java/org/apache/sshd/common/channel/TestChannelListener.java
new file mode 100644
index 0000000..0f7c18b
--- /dev/null
+++ b/sshd-core/src/test/java/org/apache/sshd/common/channel/TestChannelListener.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.channel;
+
+import java.util.Collection;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.apache.sshd.common.NamedResource;
+import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.channel.ChannelListener;
+import org.apache.sshd.common.util.logging.AbstractLoggingBean;
+import org.junit.Assert;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public class TestChannelListener extends AbstractLoggingBean implements ChannelListener, NamedResource {
+ private final String name;
+ private final Collection<Channel> activeChannels = new CopyOnWriteArraySet<>();
+ private final Collection<Channel> openChannels = new CopyOnWriteArraySet<>();
+ private final Collection<Channel> failedChannels = new CopyOnWriteArraySet<>();
+
+ public TestChannelListener() {
+ this("");
+ }
+
+ public TestChannelListener(String discriminator) {
+ super(discriminator);
+ name = discriminator;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ public Collection<Channel> getActiveChannels() {
+ return activeChannels;
+ }
+
+ @Override
+ public void channelInitialized(Channel channel) {
+ Assert.assertTrue("Same channel instance re-initialized: " + channel, activeChannels.add(channel));
+ log.info("channelInitialized({})", channel);
+ }
+
+ public Collection<Channel> getOpenChannels() {
+ return openChannels;
+ }
+
+ @Override
+ public void channelOpenSuccess(Channel channel) {
+ Assert.assertTrue("Open channel not activated: " + channel, activeChannels.contains(channel));
+ Assert.assertTrue("Same channel instance re-opened: " + channel, openChannels.add(channel));
+ log.info("channelOpenSuccess({})", channel);
+ }
+
+ public Collection<Channel> getFailedChannels() {
+ return failedChannels;
+ }
+
+ @Override
+ public void channelOpenFailure(Channel channel, Throwable reason) {
+ Assert.assertTrue("Failed channel not activated: " + channel, activeChannels.contains(channel));
+ Assert.assertTrue("Same channel instance re-failed: " + channel, failedChannels.add(channel));
+ log.warn("channelOpenFailure({}) {} : {}", channel, reason.getClass().getSimpleName(), reason.getMessage());
+ }
+
+ @Override
+ public void channelClosed(Channel channel) {
+ Assert.assertTrue("Unknown closed channel instance: " + channel, activeChannels.remove(channel));
+ log.info("channelClosed({})", channel);
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "[" + getName() + "]";
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/test/java/org/apache/sshd/common/io/nio2/Nio2ServiceTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/io/nio2/Nio2ServiceTest.java b/sshd-core/src/test/java/org/apache/sshd/common/io/nio2/Nio2ServiceTest.java
index f55a262..002196c 100644
--- a/sshd-core/src/test/java/org/apache/sshd/common/io/nio2/Nio2ServiceTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/common/io/nio2/Nio2ServiceTest.java
@@ -55,7 +55,6 @@ public class Nio2ServiceTest extends BaseTestSupport {
sshd.setKeyPairProvider(Utils.createTestHostKeyProvider());
sshd.setShellFactory(new TestEchoShellFactory());
sshd.setPasswordAuthenticator(BogusPasswordAuthenticator.INSTANCE);
- sshd.setSessionFactory(new org.apache.sshd.server.session.SessionFactory());
sshd.start();
int port = sshd.getPort();
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/test/java/org/apache/sshd/common/signature/AbstractSignatureFactoryTestSupport.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/signature/AbstractSignatureFactoryTestSupport.java b/sshd-core/src/test/java/org/apache/sshd/common/signature/AbstractSignatureFactoryTestSupport.java
index b6a9407..b958ceb 100644
--- a/sshd-core/src/test/java/org/apache/sshd/common/signature/AbstractSignatureFactoryTestSupport.java
+++ b/sshd-core/src/test/java/org/apache/sshd/common/signature/AbstractSignatureFactoryTestSupport.java
@@ -68,7 +68,6 @@ public abstract class AbstractSignatureFactoryTestSupport extends BaseTestSuppor
public void setUp() throws Exception {
sshd = SshServer.setUpDefaultServer();
sshd.setPasswordAuthenticator(BogusPasswordAuthenticator.INSTANCE);
- sshd.setSessionFactory(new org.apache.sshd.server.session.SessionFactory());
}
@After
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/test/java/org/apache/sshd/deprecated/ClientUserAuthServiceOld.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/deprecated/ClientUserAuthServiceOld.java b/sshd-core/src/test/java/org/apache/sshd/deprecated/ClientUserAuthServiceOld.java
index bff5738..a969502 100644
--- a/sshd-core/src/test/java/org/apache/sshd/deprecated/ClientUserAuthServiceOld.java
+++ b/sshd-core/src/test/java/org/apache/sshd/deprecated/ClientUserAuthServiceOld.java
@@ -188,10 +188,10 @@ public class ClientUserAuthServiceOld extends CloseableUtils.AbstractCloseable i
@Override
protected void preClose() {
- super.preClose();
if (!authFuture.isDone()) {
authFuture.setException(new SshException("Session is closed"));
}
+ super.preClose();
}
public AuthFuture auth(UserAuth userAuth) throws IOException {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/test/java/org/apache/sshd/server/PublickeyAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/server/PublickeyAuthenticatorTest.java b/sshd-core/src/test/java/org/apache/sshd/server/PublickeyAuthenticatorTest.java
index 97a4b26..59ab3fb 100644
--- a/sshd-core/src/test/java/org/apache/sshd/server/PublickeyAuthenticatorTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/server/PublickeyAuthenticatorTest.java
@@ -82,7 +82,7 @@ public class PublickeyAuthenticatorTest extends BaseTestSupport {
Object result;
try {
result = method.invoke(authenticator, invArgs);
- } catch(InvocationTargetException e) {
+ } catch (InvocationTargetException e) {
Throwable t = e.getTargetException(); // peel of the real exception
System.err.println("Failed (" + t.getClass().getSimpleName() + ")"
+ " to invoke with user=" + useUsername
[4/4] mina-sshd git commit: [SSHD-555] Add ChannelListener support
Posted by lg...@apache.org.
[SSHD-555] Add ChannelListener support
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/40195ab4
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/40195ab4
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/40195ab4
Branch: refs/heads/master
Commit: 40195ab466bb7ae97fd88c21b9a3394e78566ba9
Parents: 051aec3
Author: Lyor Goldstein <lg...@vmware.com>
Authored: Tue Aug 25 07:38:01 2015 +0300
Committer: Lyor Goldstein <lg...@vmware.com>
Committed: Tue Aug 25 07:38:01 2015 +0300
----------------------------------------------------------------------
.../agent/local/ChannelAgentForwarding.java | 22 +-
.../sshd/agent/unix/AgentForwardedChannel.java | 2 -
.../sshd/agent/unix/ChannelAgentForwarding.java | 35 +-
.../org/apache/sshd/client/SessionFactory.java | 46 -
.../java/org/apache/sshd/client/SshClient.java | 13 +-
.../java/org/apache/sshd/client/SshKeyScan.java | 4 +-
.../client/channel/AbstractClientChannel.java | 15 +-
.../sshd/client/channel/ChannelSession.java | 1 -
.../sshd/client/session/ClientSession.java | 1 -
.../sshd/client/session/ClientSessionImpl.java | 4 +-
.../client/session/ClientUserAuthService.java | 4 +-
.../sshd/client/session/SessionFactory.java | 46 +
.../sshd/client/subsystem/SubsystemClient.java | 6 +
.../subsystem/sftp/DefaultSftpClient.java | 6 +
.../client/subsystem/sftp/SftpFileSystem.java | 6 +
.../sshd/common/AbstractFactoryManager.java | 99 ++-
.../org/apache/sshd/common/FactoryManager.java | 4 +-
.../sshd/common/channel/AbstractChannel.java | 94 +-
.../org/apache/sshd/common/channel/Channel.java | 6 +-
.../common/channel/ChannelAsyncInputStream.java | 4 +
.../sshd/common/channel/ChannelListener.java | 74 ++
.../common/channel/ChannelListenerManager.java | 47 +
.../common/channel/ChannelPipedInputStream.java | 25 +-
.../sshd/common/future/DefaultSshFuture.java | 17 +-
.../sshd/common/session/AbstractSession.java | 168 ++--
.../common/session/AbstractSessionFactory.java | 55 +-
.../org/apache/sshd/common/session/Session.java | 17 +-
.../common/session/SessionListenerManager.java | 53 ++
.../common/session/SessionTimeoutListener.java | 17 +-
.../sshd/common/util/EventListenerUtils.java | 23 +-
.../apache/sshd/common/util/GenericUtils.java | 46 +
.../sshd/server/ServerFactoryManager.java | 1 -
.../java/org/apache/sshd/server/SshServer.java | 3 +-
.../auth/CachingPublicKeyAuthenticator.java | 2 +-
.../server/channel/AbstractServerChannel.java | 18 +-
.../sshd/server/channel/ChannelSession.java | 11 +-
.../sshd/server/forward/TcpipServerChannel.java | 74 +-
.../sshd/server/session/SessionFactory.java | 24 +-
.../org/apache/sshd/AuthenticationTest.java | 5 +-
.../java/org/apache/sshd/KeepAliveTest.java | 79 +-
.../java/org/apache/sshd/KeyReExchangeTest.java | 2 +-
.../java/org/apache/sshd/client/ClientTest.java | 874 +++++++++++++------
.../common/channel/TestChannelListener.java | 95 ++
.../sshd/common/io/nio2/Nio2ServiceTest.java | 1 -
.../AbstractSignatureFactoryTestSupport.java | 1 -
.../deprecated/ClientUserAuthServiceOld.java | 2 +-
.../sshd/server/PublickeyAuthenticatorTest.java | 2 +-
.../java/org/apache/sshd/server/ServerTest.java | 224 ++---
.../java/org/apache/sshd/git/util/Utils.java | 2 +-
49 files changed, 1638 insertions(+), 742 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
index a93d12b..19baa6b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
@@ -22,13 +22,18 @@ import java.io.IOException;
import java.io.OutputStream;
import org.apache.sshd.agent.SshAgent;
+import org.apache.sshd.agent.SshAgentFactory;
import org.apache.sshd.agent.common.AbstractAgentClient;
import org.apache.sshd.client.future.DefaultOpenFuture;
import org.apache.sshd.client.future.OpenFuture;
+import org.apache.sshd.common.FactoryManager;
import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.channel.ChannelListener;
import org.apache.sshd.common.channel.ChannelOutputStream;
import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
import org.apache.sshd.server.channel.AbstractServerChannel;
@@ -48,13 +53,24 @@ public class ChannelAgentForwarding extends AbstractServerChannel {
@Override
protected OpenFuture doInit(Buffer buffer) {
final OpenFuture f = new DefaultOpenFuture(this);
+ ChannelListener listener = getChannelListenerProxy();
try {
out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
- agent = session.getFactoryManager().getAgentFactory().createClient(session.getFactoryManager());
+ FactoryManager manager = session.getFactoryManager();
+ SshAgentFactory factory = ValidateUtils.checkNotNull(manager.getAgentFactory(), "No agent factory");
+ agent = factory.createClient(manager);
client = new AgentClient();
- f.setOpened();
- } catch (Exception e) {
+ listener.channelOpenSuccess(this);
+ f.setOpened();
+ } catch (Throwable t) {
+ Throwable e = GenericUtils.peelException(t);
+ try {
+ listener.channelOpenFailure(this, e);
+ } catch (Throwable ignored) {
+ log.warn("doInit({}) failed ({}) to inform listener of open failure={}: {}",
+ this, ignored.getClass().getSimpleName(), e.getClass().getSimpleName(), ignored.getMessage());
+ }
f.setException(e);
}
return f;
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
index eb33231..8ed5c07 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
@@ -77,7 +77,5 @@ public class AgentForwardedChannel extends AbstractClientChannel implements Runn
if (result < Status.APR_SUCCESS) {
AgentServerProxy.throwException(result);
}
-
}
-
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
index 1cccc8f..113f33d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
@@ -29,10 +29,12 @@ import org.apache.sshd.client.future.DefaultOpenFuture;
import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.common.FactoryManagerUtils;
import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.channel.ChannelListener;
import org.apache.sshd.common.channel.ChannelOutputStream;
import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.threads.ThreadUtils;
import org.apache.sshd.server.channel.AbstractServerChannel;
@@ -45,6 +47,18 @@ import org.apache.tomcat.jni.Status;
* The client side channel that will receive requests forwards by the SSH server.
*/
public class ChannelAgentForwarding extends AbstractServerChannel {
+ /**
+ * Property that can be set on the factory manager in order to control
+ * the buffer size used to forward data from the established channel
+ *
+ * @see #MIN_FORWARDER_BUF_SIZE
+ * @see #MAX_FORWARDER_BUF_SIZE
+ * @see #DEFAULT_FORWARDER_BUF_SIZE
+ */
+ public static final String FORWARDER_BUFFER_SIZE = "channel-agent-fwd-buf-size";
+ public static final int MIN_FORWARDER_BUF_SIZE = Byte.MAX_VALUE;
+ public static final int DEFAULT_FORWARDER_BUF_SIZE = 1024;
+ public static final int MAX_FORWARDER_BUF_SIZE = Short.MAX_VALUE;
private String authSocket;
private long pool;
@@ -61,6 +75,7 @@ public class ChannelAgentForwarding extends AbstractServerChannel {
@Override
protected OpenFuture doInit(Buffer buffer) {
final OpenFuture f = new DefaultOpenFuture(this);
+ ChannelListener listener = getChannelListenerProxy();
try {
out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
authSocket = FactoryManagerUtils.getString(session, SshAgent.SSH_AUTHSOCKET_ENV_NAME);
@@ -74,12 +89,17 @@ public class ChannelAgentForwarding extends AbstractServerChannel {
ExecutorService service = getExecutorService();
forwardService = (service == null) ? ThreadUtils.newSingleThreadExecutor("ChannelAgentForwarding[" + authSocket + "]") : service;
shutdownForwarder = (service == forwardService) ? isShutdownOnExit() : true;
+
+ final int copyBufSize = FactoryManagerUtils.getIntProperty(getSession(), FORWARDER_BUFFER_SIZE, DEFAULT_FORWARDER_BUF_SIZE);
+ ValidateUtils.checkTrue(copyBufSize >= MIN_FORWARDER_BUF_SIZE, "Copy buf size below min.: %d", copyBufSize);
+ ValidateUtils.checkTrue(copyBufSize <= MAX_FORWARDER_BUF_SIZE, "Copy buf size above max.: %d", copyBufSize);
+
forwarder = forwardService.submit(new Runnable() {
@SuppressWarnings("synthetic-access")
@Override
public void run() {
try {
- byte[] buf = new byte[1024];
+ byte[] buf = new byte[copyBufSize];
while (true) {
int len = Socket.recv(handle, buf, 0, buf.length);
if (len > 0) {
@@ -92,11 +112,20 @@ public class ChannelAgentForwarding extends AbstractServerChannel {
}
}
});
- f.setOpened();
- } catch (Exception e) {
+ listener.channelOpenSuccess(this);
+ f.setOpened();
+ } catch (Exception t) {
+ Throwable e = GenericUtils.peelException(t);
+ try {
+ listener.channelOpenFailure(this, e);
+ } catch (Throwable ignored) {
+ log.warn("doInit({}) failed ({}) to inform listener of open failure={}: {}",
+ this, ignored.getClass().getSimpleName(), e.getClass().getSimpleName(), ignored.getMessage());
+ }
f.setException(e);
}
+
return f;
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/client/SessionFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/SessionFactory.java b/sshd-core/src/main/java/org/apache/sshd/client/SessionFactory.java
deleted file mode 100644
index 76527a2..0000000
--- a/sshd-core/src/main/java/org/apache/sshd/client/SessionFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sshd.client;
-
-import org.apache.sshd.client.session.ClientSessionImpl;
-import org.apache.sshd.common.io.IoSession;
-import org.apache.sshd.common.session.AbstractSession;
-import org.apache.sshd.common.session.AbstractSessionFactory;
-
-/**
- * A factory of client sessions.
- * This class can be used as a way to customize the creation of client sessions.
- *
- * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
- * @see org.apache.sshd.client.SshClient#setSessionFactory(SessionFactory)
- */
-public class SessionFactory extends AbstractSessionFactory {
-
- protected ClientFactoryManager client;
-
- public void setClient(ClientFactoryManager client) {
- this.client = client;
- }
-
- @Override
- protected AbstractSession doCreateSession(IoSession ioSession) throws Exception {
- return new ClientSessionImpl(client, ioSession);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/client/SshClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/SshClient.java b/sshd-core/src/main/java/org/apache/sshd/client/SshClient.java
index c98ce14..6e21329 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/SshClient.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/SshClient.java
@@ -55,6 +55,7 @@ import org.apache.sshd.client.future.DefaultConnectFuture;
import org.apache.sshd.client.session.ClientConnectionServiceFactory;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.client.session.ClientUserAuthServiceFactory;
+import org.apache.sshd.client.session.SessionFactory;
import org.apache.sshd.common.AbstractFactoryManager;
import org.apache.sshd.common.Closeable;
import org.apache.sshd.common.Factory;
@@ -230,7 +231,6 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
setupSessionTimeout(sessionFactory);
- sessionFactory.setClient(this);
connector = createConnector();
}
@@ -276,17 +276,12 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
}
public ConnectFuture connect(String username, String host, int port) throws IOException {
- assert host != null;
- assert port >= 0;
- if (connector == null) {
- throw new IllegalStateException("SshClient not started. Please call start() method before connecting to a server");
- }
- SocketAddress address = new InetSocketAddress(host, port);
+ ValidateUtils.checkTrue(port >= 0, "Invalid port: %d", port);
+ SocketAddress address = new InetSocketAddress(ValidateUtils.checkNotNullAndNotEmpty(host, "No host"), port);
return connect(username, address);
}
public ConnectFuture connect(final String username, SocketAddress address) {
- assert address != null;
if (connector == null) {
throw new IllegalStateException("SshClient not started. Please call start() method before connecting to a server");
}
@@ -313,7 +308,7 @@ public class SshClient extends AbstractFactoryManager implements ClientFactoryMa
}
protected SessionFactory createSessionFactory() {
- return new SessionFactory();
+ return new SessionFactory(this);
}
@Override
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/client/SshKeyScan.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/SshKeyScan.java b/sshd-core/src/main/java/org/apache/sshd/client/SshKeyScan.java
index 2a21e75..8e1b49a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/SshKeyScan.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/SshKeyScan.java
@@ -301,7 +301,7 @@ public class SshKeyScan extends AbstractSimplifiedLog
}
try {
- session.addListener(this);
+ session.addSessionListener(this);
if (isEnabled(Level.FINER)) {
log(Level.FINER, "Authenticating with key type=" + kt + " to " + remoteLocation);
}
@@ -324,7 +324,7 @@ public class SshKeyScan extends AbstractSimplifiedLog
}
}
} finally {
- session.removeListener(this);
+ session.removeSessionListener(this);
}
}
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
index 1ad6023..8eccb4e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
@@ -32,9 +32,11 @@ import org.apache.sshd.common.channel.AbstractChannel;
import org.apache.sshd.common.channel.Channel;
import org.apache.sshd.common.channel.ChannelAsyncInputStream;
import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
+import org.apache.sshd.common.channel.ChannelListener;
import org.apache.sshd.common.channel.ChannelRequestHandler;
import org.apache.sshd.common.io.IoInputStream;
import org.apache.sshd.common.io.IoOutputStream;
+import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
import org.apache.sshd.common.util.io.IoUtils;
@@ -250,11 +252,22 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
public void handleOpenSuccess(int recipient, int rwSize, int packetSize, Buffer buffer) {
this.recipient = recipient;
this.remoteWindow.init(rwSize, packetSize, session.getFactoryManager().getProperties());
+ ChannelListener listener = getChannelListenerProxy();
try {
doOpen();
+
+ listener.channelOpenSuccess(this);
this.opened.set(true);
this.openFuture.setOpened();
- } catch (Exception e) {
+ } catch (Throwable t) {
+ Throwable e = GenericUtils.peelException(t);
+ try {
+ listener.channelOpenFailure(this, e);
+ } catch (Throwable ignored) {
+ log.warn("handleOpenSuccess({}) failed ({}) to inform listener of open failure={}: {}",
+ this, ignored.getClass().getSimpleName(), e.getClass().getSimpleName(), ignored.getMessage());
+ }
+
this.openFuture.setException(e);
this.closeFuture.setClosed();
this.doCloseImmediately();
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
index 77b2c70..9709f57 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
@@ -171,5 +171,4 @@ public class ChannelSession extends AbstractClientChannel {
}
}
}
-
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSession.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSession.java
index 8059eae..b90eade 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSession.java
@@ -340,5 +340,4 @@ public interface ClientSession extends Session {
*/
@SuppressWarnings("rawtypes")
SshFuture switchToNoneCipher() throws IOException;
-
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
index 51f01bb..2b112ea 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
@@ -585,14 +585,14 @@ public class ClientSessionImpl extends AbstractSession implements ClientSession
}
@Override
- protected void sendEvent(SessionListener.Event event) throws IOException {
+ protected void sendSessionEvent(SessionListener.Event event) throws IOException {
if (event == SessionListener.Event.KeyEstablished) {
sendInitialServiceRequest();
}
synchronized (lock) {
lock.notifyAll();
}
- super.sendEvent(event);
+ super.sendSessionEvent(event);
}
protected void sendInitialServiceRequest() throws IOException {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java
index 7ddc0d4..0ef430c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java
@@ -220,10 +220,10 @@ public class ClientUserAuthService extends CloseableUtils.AbstractCloseable impl
@Override
protected void preClose() {
- super.preClose();
if (!authFuture.isDone()) {
authFuture.setException(new SshException("Session is closed"));
}
- }
+ super.preClose();
+ }
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/client/session/SessionFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/SessionFactory.java b/sshd-core/src/main/java/org/apache/sshd/client/session/SessionFactory.java
new file mode 100644
index 0000000..9e43bed
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/client/session/SessionFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.session;
+
+import org.apache.sshd.client.ClientFactoryManager;
+import org.apache.sshd.common.io.IoSession;
+import org.apache.sshd.common.session.AbstractSessionFactory;
+
+/**
+ * A factory of client sessions.
+ * This class can be used as a way to customize the creation of client sessions.
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ * @see org.apache.sshd.client.SshClient#setSessionFactory(SessionFactory)
+ */
+public class SessionFactory extends AbstractSessionFactory<ClientFactoryManager, ClientSessionImpl> {
+
+ public SessionFactory(ClientFactoryManager client) {
+ super(client);
+ }
+
+ public final ClientFactoryManager getClient() {
+ return getFactoryManager();
+ }
+
+ @Override
+ protected ClientSessionImpl doCreateSession(IoSession ioSession) throws Exception {
+ return new ClientSessionImpl(getClient(), ioSession);
+ }
+}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/client/subsystem/SubsystemClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/SubsystemClient.java b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/SubsystemClient.java
index 122c0ab..809b3af 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/SubsystemClient.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/SubsystemClient.java
@@ -21,6 +21,7 @@ package org.apache.sshd.client.subsystem;
import java.nio.channels.Channel;
+import org.apache.sshd.client.channel.ClientChannel;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.common.NamedResource;
@@ -32,4 +33,9 @@ public interface SubsystemClient extends NamedResource, Channel {
* @return The underlying {@link ClientSession} used
*/
ClientSession getClientSession();
+
+ /**
+ * @return The underlying {@link ClientChannel} used
+ */
+ ClientChannel getClientChannel();
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/DefaultSftpClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/DefaultSftpClient.java b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/DefaultSftpClient.java
index 5f2c0d4..a4cf4a6 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/DefaultSftpClient.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/DefaultSftpClient.java
@@ -34,6 +34,7 @@ import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.sshd.client.channel.ChannelSubsystem;
+import org.apache.sshd.client.channel.ClientChannel;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.common.FactoryManagerUtils;
import org.apache.sshd.common.SshException;
@@ -107,6 +108,11 @@ public class DefaultSftpClient extends AbstractSftpClient {
}
@Override
+ public ClientChannel getClientChannel() {
+ return channel;
+ }
+
+ @Override
public Map<String, byte[]> getServerExtensions() {
return exposedExtensions;
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileSystem.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileSystem.java b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileSystem.java
index de91de3..3532e64 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileSystem.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileSystem.java
@@ -38,6 +38,7 @@ import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.sshd.client.channel.ClientChannel;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.common.FactoryManagerUtils;
import org.apache.sshd.common.file.util.BaseFileSystem;
@@ -208,6 +209,11 @@ public class SftpFileSystem extends BaseFileSystem<SftpPath> {
}
@Override
+ public ClientChannel getClientChannel() {
+ return delegate.getClientChannel();
+ }
+
+ @Override
public Map<String, byte[]> getServerExtensions() {
return delegate.getServerExtensions();
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/AbstractFactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/AbstractFactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/common/AbstractFactoryManager.java
index 461149a..962ade1 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/AbstractFactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/AbstractFactoryManager.java
@@ -18,15 +18,18 @@
*/
package org.apache.sshd.common;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.sshd.agent.SshAgentFactory;
import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.channel.ChannelListener;
import org.apache.sshd.common.channel.RequestHandler;
import org.apache.sshd.common.cipher.Cipher;
import org.apache.sshd.common.compression.Compression;
@@ -42,9 +45,11 @@ import org.apache.sshd.common.mac.Mac;
import org.apache.sshd.common.random.Random;
import org.apache.sshd.common.session.AbstractSessionFactory;
import org.apache.sshd.common.session.ConnectionService;
+import org.apache.sshd.common.session.SessionListener;
import org.apache.sshd.common.session.SessionTimeoutListener;
import org.apache.sshd.common.signature.Signature;
import org.apache.sshd.common.util.CloseableUtils;
+import org.apache.sshd.common.util.EventListenerUtils;
import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.threads.ThreadUtils;
import org.apache.sshd.server.forward.ForwardingFilter;
@@ -56,7 +61,7 @@ import org.apache.sshd.server.forward.ForwardingFilter;
*/
public abstract class AbstractFactoryManager extends CloseableUtils.AbstractInnerCloseable implements FactoryManager {
- protected Map<String, Object> properties = new HashMap<String, Object>();
+ protected Map<String, Object> properties = new HashMap<>();
protected IoServiceFactoryFactory ioServiceFactoryFactory;
protected IoServiceFactory ioServiceFactory;
protected List<NamedFactory<KeyExchange>> keyExchangeFactories;
@@ -77,9 +82,15 @@ public abstract class AbstractFactoryManager extends CloseableUtils.AbstractInne
protected List<RequestHandler<ConnectionService>> globalRequestHandlers;
protected SessionTimeoutListener sessionTimeoutListener;
protected ScheduledFuture<?> timeoutListenerFuture;
+ protected final Collection<SessionListener> sessionListeners = new CopyOnWriteArraySet<>();
+ protected final SessionListener sessionListenerProxy;
+ protected final Collection<ChannelListener> channelListeners = new CopyOnWriteArraySet<>();
+ protected final ChannelListener channelListenerProxy;
protected AbstractFactoryManager() {
- super();
+ ClassLoader loader = getClass().getClassLoader();
+ sessionListenerProxy = EventListenerUtils.proxyWrapper(SessionListener.class, loader, sessionListeners);
+ channelListenerProxy = EventListenerUtils.proxyWrapper(ChannelListener.class, loader, channelListeners);
}
@Override
@@ -271,16 +282,76 @@ public abstract class AbstractFactoryManager extends CloseableUtils.AbstractInne
this.globalRequestHandlers = globalRequestHandlers;
}
- protected void setupSessionTimeout(final AbstractSessionFactory sessionFactory) {
+ @Override
+ public void addSessionListener(SessionListener listener) {
+ ValidateUtils.checkNotNull(listener, "addSessionListener(%s) null instance", this);
+ // avoid race conditions on notifications while manager is being closed
+ if (!isOpen()) {
+ log.warn("addSessionListener({})[{}] ignore registration while manager is closing", this, listener);
+ return;
+ }
+
+ if (this.sessionListeners.add(listener)) {
+ log.trace("addSessionListener({})[{}] registered", this, listener);
+ } else {
+ log.trace("addSessionListener({})[{}] ignored duplicate", this, listener);
+ }
+ }
+
+ @Override
+ public void removeSessionListener(SessionListener listener) {
+ if (this.sessionListeners.remove(listener)) {
+ log.trace("removeSessionListener({})[{}] removed", this, listener);
+ } else {
+ log.trace("removeSessionListener({})[{}] not registered", this, listener);
+ }
+ }
+
+ @Override
+ public SessionListener getSessionListenerProxy() {
+ return sessionListenerProxy;
+ }
+
+ @Override
+ public void addChannelListener(ChannelListener listener) {
+ ValidateUtils.checkNotNull(listener, "addChannelListener(%s) null instance", this);
+ // avoid race conditions on notifications while manager is being closed
+ if (!isOpen()) {
+ log.warn("addChannelListener({})[{}] ignore registration while session is closing", this, listener);
+ return;
+ }
+
+ if (this.channelListeners.add(listener)) {
+ log.trace("addChannelListener({})[{}] registered", this, listener);
+ } else {
+ log.trace("addChannelListener({})[{}] ignored duplicate", this, listener);
+ }
+ }
+
+ @Override
+ public void removeChannelListener(ChannelListener listener) {
+ if (this.channelListeners.remove(listener)) {
+ log.trace("removeChannelListener({})[{}] removed", this, listener);
+ } else {
+ log.trace("removeChannelListener({})[{}] not registered", this, listener);
+ }
+ }
+
+ @Override
+ public ChannelListener getChannelListenerProxy() {
+ return channelListenerProxy;
+ }
+
+ protected void setupSessionTimeout(final AbstractSessionFactory<?, ?> sessionFactory) {
// set up the the session timeout listener and schedule it
sessionTimeoutListener = createSessionTimeoutListener();
- sessionFactory.addListener(sessionTimeoutListener);
+ addSessionListener(sessionTimeoutListener);
timeoutListenerFuture = getScheduledExecutorService()
.scheduleAtFixedRate(sessionTimeoutListener, 1, 1, TimeUnit.SECONDS);
}
- protected void removeSessionTimeout(final AbstractSessionFactory sessionFactory) {
+ protected void removeSessionTimeout(final AbstractSessionFactory<?, ?> sessionFactory) {
stopSessionTimeoutListener(sessionFactory);
}
@@ -288,19 +359,25 @@ public abstract class AbstractFactoryManager extends CloseableUtils.AbstractInne
return new SessionTimeoutListener();
}
- protected void stopSessionTimeoutListener(final AbstractSessionFactory sessionFactory) {
+ protected void stopSessionTimeoutListener(final AbstractSessionFactory<?, ?> sessionFactory) {
// cancel the timeout monitoring task
if (timeoutListenerFuture != null) {
- timeoutListenerFuture.cancel(true);
- timeoutListenerFuture = null;
+ try {
+ timeoutListenerFuture.cancel(true);
+ } finally {
+ timeoutListenerFuture = null;
+ }
}
// remove the sessionTimeoutListener completely; should the SSH server/client be restarted, a new one
// will be created.
- if (sessionFactory != null && sessionTimeoutListener != null) {
- sessionFactory.removeListener(sessionTimeoutListener);
+ if (sessionTimeoutListener != null) {
+ try {
+ removeSessionListener(sessionTimeoutListener);
+ } finally {
+ sessionTimeoutListener = null;
+ }
}
- sessionTimeoutListener = null;
}
protected void checkConfig() {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
index 734fb6a..ee8f168 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ScheduledExecutorService;
import org.apache.sshd.agent.SshAgentFactory;
import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.channel.ChannelListenerManager;
import org.apache.sshd.common.channel.RequestHandler;
import org.apache.sshd.common.cipher.Cipher;
import org.apache.sshd.common.compression.Compression;
@@ -35,6 +36,7 @@ import org.apache.sshd.common.keyprovider.KeyPairProvider;
import org.apache.sshd.common.mac.Mac;
import org.apache.sshd.common.random.Random;
import org.apache.sshd.common.session.ConnectionService;
+import org.apache.sshd.common.session.SessionListenerManager;
import org.apache.sshd.common.signature.Signature;
import org.apache.sshd.server.forward.ForwardingFilter;
@@ -44,7 +46,7 @@ import org.apache.sshd.server.forward.ForwardingFilter;
*
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public interface FactoryManager {
+public interface FactoryManager extends SessionListenerManager, ChannelListenerManager {
/**
* Key used to retrieve the value of the window size in the
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index e468da4..04ed9ad 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -38,8 +39,10 @@ import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.session.ConnectionService;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.CloseableUtils;
+import org.apache.sshd.common.util.EventListenerUtils;
import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.Int2IntFunction;
+import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.buffer.BufferUtils;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
@@ -80,6 +83,11 @@ public abstract class AbstractChannel
protected AtomicReference<GracefulState> gracefulState = new AtomicReference<GracefulState>(GracefulState.Opened);
protected final DefaultCloseFuture gracefulFuture = new DefaultCloseFuture(lock);
protected final List<RequestHandler<Channel>> handlers = new ArrayList<RequestHandler<Channel>>();
+ /**
+ * Channel events listener
+ */
+ protected final Collection<ChannelListener> channelListeners = new CopyOnWriteArraySet<>();
+ protected final ChannelListener channelListenerProxy;
protected AbstractChannel() {
this("");
@@ -87,6 +95,7 @@ public abstract class AbstractChannel
protected AbstractChannel(String discriminator) {
super(discriminator);
+ channelListenerProxy = EventListenerUtils.proxyWrapper(ChannelListener.class, getClass().getClassLoader(), channelListeners);
}
public void addRequestHandler(RequestHandler<Channel> handler) {
@@ -195,10 +204,20 @@ public abstract class AbstractChannel
}
@Override
- public void init(ConnectionService service, Session session, int id) {
+ public void init(ConnectionService service, Session session, int id) throws IOException {
this.service = service;
this.session = session;
this.id = id;
+
+ ChannelListener listener = session.getChannelListenerProxy();
+ try {
+ listener.channelInitialized(this);
+ } catch (RuntimeException t) {
+ Throwable e = GenericUtils.peelException(t);
+ throw new IOException("Failed (" + e.getClass().getSimpleName() + ") to notify channel " + toString() + " initialization: " + e.getMessage(), e);
+ }
+ // delegate the rest of the notifications to the channel
+ addChannelListener(listener);
configureWindow();
}
@@ -209,6 +228,36 @@ public abstract class AbstractChannel
}
@Override
+ public void addChannelListener(ChannelListener listener) {
+ ValidateUtils.checkNotNull(listener, "addChannelListener(%s) null instance", this);
+ // avoid race conditions on notifications while channel is being closed
+ if (!isOpen()) {
+ log.warn("addChannelListener({})[{}] ignore registration while channel is closing", this, listener);
+ return;
+ }
+
+ if (this.channelListeners.add(listener)) {
+ log.trace("addChannelListener({})[{}] registered", this, listener);
+ } else {
+ log.trace("addChannelListener({})[{}] ignored duplicate", this, listener);
+ }
+ }
+
+ @Override
+ public void removeChannelListener(ChannelListener listener) {
+ if (this.channelListeners.remove(listener)) {
+ log.trace("removeChannelListener({})[{}] removed", this, listener);
+ } else {
+ log.trace("removeChannelListener({})[{}] not registered", this, listener);
+ }
+ }
+
+ @Override
+ public ChannelListener getChannelListenerProxy() {
+ return channelListenerProxy;
+ }
+
+ @Override
public void handleClose() throws IOException {
log.debug("Received SSH_MSG_CHANNEL_CLOSE on channel {}", this);
if (gracefulState.compareAndSet(GracefulState.Opened, GracefulState.CloseReceived)) {
@@ -252,8 +301,9 @@ public abstract class AbstractChannel
gracefulFuture.setClosed();
} else if (!gracefulFuture.isClosed()) {
log.debug("Send SSH_MSG_CHANNEL_CLOSE on channel {}", AbstractChannel.this);
- Buffer buffer = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_CLOSE);
- buffer.putInt(recipient);
+ Session s = getSession();
+ Buffer buffer = s.createBuffer(SshConstants.SSH_MSG_CHANNEL_CLOSE);
+ buffer.putInt(getRecipient());
try {
long timeout = FactoryManagerUtils.getLongProperty(getSession(), FactoryManager.CHANNEL_CLOSE_TIMEOUT, DEFAULT_CHANNEL_CLOSE_TIMEOUT);
session.writePacket(buffer, timeout, TimeUnit.MILLISECONDS).addListener(new SshFutureListener<IoWriteFuture>() {
@@ -292,6 +342,20 @@ public abstract class AbstractChannel
}
@Override
+ protected void preClose() {
+ ChannelListener listener = getChannelListenerProxy();
+ try {
+ listener.channelClosed(this);
+ } catch (RuntimeException t) {
+ Throwable e = GenericUtils.peelException(t);
+ log.warn(e.getClass().getSimpleName() + " while signal channel " + toString() + " closed: " + e.getMessage(), e);
+ } finally {
+ // clear the listeners since we are closing the channel (quicker GC)
+ this.channelListeners.clear();
+ }
+ }
+
+ @Override
protected void doCloseImmediately() {
if (service != null) {
service.unregisterChannel(AbstractChannel.this);
@@ -301,9 +365,10 @@ public abstract class AbstractChannel
protected void writePacket(Buffer buffer) throws IOException {
if (!isClosing()) {
- session.writePacket(buffer);
+ Session s = getSession();
+ s.writePacket(buffer);
} else {
- log.debug("Discarding output packet because channel is being closed");
+ log.debug("writePacket({}) Discarding output packet because channel is being closed", this);
}
}
@@ -326,8 +391,9 @@ public abstract class AbstractChannel
// Only accept extended data for stderr
if (ex != 1) {
log.debug("Send SSH_MSG_CHANNEL_FAILURE on channel {}", this);
- buffer = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_FAILURE);
- buffer.putInt(recipient);
+ Session s = getSession();
+ buffer = s.createBuffer(SshConstants.SSH_MSG_CHANNEL_FAILURE);
+ buffer.putInt(getRecipient());
writePacket(buffer);
return;
}
@@ -376,27 +442,29 @@ public abstract class AbstractChannel
protected void sendEof() throws IOException {
log.debug("Send SSH_MSG_CHANNEL_EOF on channel {}", this);
- Buffer buffer = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_EOF);
- buffer.putInt(recipient);
+ Session s = getSession();
+ Buffer buffer = s.createBuffer(SshConstants.SSH_MSG_CHANNEL_EOF);
+ buffer.putInt(getRecipient());
writePacket(buffer);
}
protected void configureWindow() {
- localWindow.init(session);
+ localWindow.init(getSession());
}
protected void sendWindowAdjust(int len) throws IOException {
if (log.isDebugEnabled()) {
log.debug("Send SSH_MSG_CHANNEL_WINDOW_ADJUST on channel {}", Integer.valueOf(id));
}
- Buffer buffer = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_WINDOW_ADJUST);
- buffer.putInt(recipient);
+ Session s = getSession();
+ Buffer buffer = s.createBuffer(SshConstants.SSH_MSG_CHANNEL_WINDOW_ADJUST);
+ buffer.putInt(getRecipient());
buffer.putInt(len);
writePacket(buffer);
}
@Override
public String toString() {
- return getClass().getSimpleName() + "[id=" + id + ", recipient=" + recipient + "]";
+ return getClass().getSimpleName() + "[id=" + getId() + ", recipient=" + getRecipient() + "]";
}
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java
index 1d55d8a..e4b0635 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java
@@ -28,11 +28,12 @@ import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.buffer.Buffer;
/**
- * TODO Add javadoc
+ * Represents a channel opened over an SSH session - holds information that is
+ * common both to server and client channels.
*
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public interface Channel extends Closeable {
+public interface Channel extends ChannelListenerManager, Closeable {
/**
* @return Local channel identifier
@@ -109,5 +110,4 @@ public interface Channel extends Closeable {
* @throws IOException If failed to handle the success
*/
void handleOpenFailure(Buffer buffer) throws IOException;
-
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
index b1798be..eaef733 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
@@ -33,6 +33,9 @@ import org.apache.sshd.common.util.Readable;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
public class ChannelAsyncInputStream extends CloseableUtils.AbstractCloseable implements IoInputStream {
private final Channel channel;
@@ -76,6 +79,7 @@ public class ChannelAsyncInputStream extends CloseableUtils.AbstractCloseable im
}
}
}
+ super.preClose();
}
@Override
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java
new file mode 100644
index 0000000..4a4078c
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.channel;
+
+import java.util.EventListener;
+
+/**
+ * Provides a simple listener for client / server channels being established
+ * or torn down. <B>Note:</B> for server-side listeners, some of the established
+ * channels may be <U>client</U> - especially where connection proxy or forwarding
+ * is concerned
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface ChannelListener extends EventListener {
+ /**
+ * Called to inform about initial setup of a channel via the
+ * {@link Channel#init(org.apache.sshd.common.session.ConnectionService, org.apache.sshd.common.session.Session, int)}
+ * method. <B>Note:</B> this method is guaranteed to be called
+ * before either of the {@link #channelOpenSuccess(Channel)} or
+ * {@link #channelOpenFailure(Channel, Throwable)} will be called
+ *
+ * @param channel The initialized {@link Channel}
+ */
+ void channelInitialized(Channel channel);
+
+ /**
+ * Called to inform about a channel being successfully opened for a
+ * session. <B>Note:</B> when the call is made, the channel is known
+ * to be open but nothing beyond that.
+ *
+ * @param channel The newly opened {@link Channel}
+ */
+ void channelOpenSuccess(Channel channel);
+
+ /**
+ * Called to inform about the failure to open a channel
+ *
+ * @param channel The failed {@link Channel}
+ * @param reason The {@link Throwable} reason - <B>Note:</B> if the
+ * {@link #channelOpenSuccess(Channel)} notification throws an exception
+ * it will cause this method to be invoked
+ */
+ void channelOpenFailure(Channel channel, Throwable reason);
+
+ /**
+ * Called to inform about a channel being closed. <B>Note:</B> when the call
+ * is made there are no guarantees about the channel's actual state
+ * except that it either has been already closed or may be in the process
+ * of being closed. <B>Note:</B> this method is guaranteed to be called
+ * regardless of whether {@link #channelOpenSuccess(Channel)} or
+ * {@link #channelOpenFailure(Channel, Throwable)} have been called
+ *
+ * @param channel The referenced {@link Channel}
+ */
+ void channelClosed(Channel channel);
+}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListenerManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListenerManager.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListenerManager.java
new file mode 100644
index 0000000..511379b
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListenerManager.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.channel;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface ChannelListenerManager {
+ /**
+ * Add a channel listener
+ *
+ * @param listener The {@link ChannelListener} to add - not {@code null}
+ */
+ void addChannelListener(ChannelListener listener);
+
+ /**
+ * Remove a channel listener
+ *
+ * @param listener The {@link ChannelListener} to remove
+ */
+ void removeChannelListener(ChannelListener listener);
+
+ /**
+ * @return A (never {@code null} proxy {@link ChannelListener} that represents
+ * all the currently registered listeners. Any method invocation on the proxy
+ * is replicated to the currently registered listeners
+ */
+ ChannelListener getChannelListenerProxy();
+
+}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
index a904bc2..3df0526 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
import java.io.InterruptedIOException;
import java.net.SocketException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -44,20 +45,20 @@ public class ChannelPipedInputStream extends InputStream implements ChannelPiped
private final Window localWindow;
private final Buffer buffer = new ByteArrayBuffer();
private final byte[] b = new byte[1];
- private boolean closed;
- private boolean eofSent;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final AtomicBoolean eofSent = new AtomicBoolean(false);
private final Lock lock = new ReentrantLock();
private final Condition dataAvailable = lock.newCondition();
- private long timeout;
-
/**
* {@link ChannelPipedOutputStream} is already closed and so we will not receive additional data.
* This is different from the {@link #closed}, which indicates that the reader of this {@link InputStream}
* will not be reading data any more.
*/
- private boolean writerClosed;
+ private final AtomicBoolean writerClosed = new AtomicBoolean(false);
+
+ private long timeout;
public ChannelPipedInputStream(Window localWindow) {
this.localWindow = ValidateUtils.checkNotNull(localWindow, "No local window provided");
@@ -77,7 +78,7 @@ public class ChannelPipedInputStream extends InputStream implements ChannelPiped
lock.lock();
try {
int avail = buffer.available();
- if (avail == 0 && writerClosed) {
+ if ((avail == 0) && writerClosed.get()) {
return -1;
}
return avail;
@@ -103,14 +104,14 @@ public class ChannelPipedInputStream extends InputStream implements ChannelPiped
lock.lock();
try {
for (int index = 0;; index++) {
- if ((closed && writerClosed && eofSent) || (closed && !writerClosed)) {
+ if ((closed.get() && writerClosed.get() && eofSent.get()) || (closed.get() && (!writerClosed.get()))) {
throw new IOException("Pipe closed after " + index + " cycles");
}
if (buffer.available() > 0) {
break;
}
- if (writerClosed) {
- eofSent = true;
+ if (writerClosed.get()) {
+ eofSent.set(true);
return -1; // no more data to read
}
@@ -146,7 +147,7 @@ public class ChannelPipedInputStream extends InputStream implements ChannelPiped
public void eof() {
lock.lock();
try {
- writerClosed = true;
+ writerClosed.set(true);
dataAvailable.signalAll();
} finally {
lock.unlock();
@@ -159,7 +160,7 @@ public class ChannelPipedInputStream extends InputStream implements ChannelPiped
try {
dataAvailable.signalAll();
} finally {
- closed = true;
+ closed.set(true);
lock.unlock();
}
}
@@ -168,7 +169,7 @@ public class ChannelPipedInputStream extends InputStream implements ChannelPiped
public void receive(byte[] bytes, int off, int len) throws IOException {
lock.lock();
try {
- if (writerClosed || closed) {
+ if (writerClosed.get() || closed.get()) {
throw new IOException("Pipe closed");
}
buffer.putRawBytes(bytes, off, len);
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java b/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
index 2b911e1..92c915c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
@@ -150,10 +150,19 @@ public class DefaultSshFuture<T extends SshFuture> extends AbstractLoggingBean i
Class<?> actualType = value.getClass();
if (expectedType.isAssignableFrom(actualType)) {
return expectedType.cast(value);
- } else if (IOException.class.isAssignableFrom(actualType)) {
- throw (IOException) value;
- } else if (Throwable.class.isAssignableFrom(actualType)) {
- Throwable t = (Throwable) value;
+ }
+
+ if (Throwable.class.isAssignableFrom(actualType)) {
+ Throwable t = GenericUtils.peelException((Throwable) value);
+ if (t != value) {
+ value = t;
+ actualType = value.getClass();
+ }
+
+ if (IOException.class.isAssignableFrom(actualType)) {
+ throw (IOException) value;
+ }
+
throw new SshException("Failed (" + t.getClass().getSimpleName() + ") to execute: " + t.getMessage(), GenericUtils.resolveExceptionCause(t));
} else { // what else can it be ????
throw new StreamCorruptedException("Unknown result type: " + actualType.getName());
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
index 9ccc2da..8b1b59d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
@@ -21,13 +21,13 @@ package org.apache.sshd.common.session;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.charset.StandardCharsets;
+import java.util.Collection;
import java.util.EnumMap;
import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -43,6 +43,7 @@ import org.apache.sshd.common.NamedResource;
import org.apache.sshd.common.Service;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.channel.ChannelListener;
import org.apache.sshd.common.cipher.Cipher;
import org.apache.sshd.common.compression.Compression;
import org.apache.sshd.common.digest.Digest;
@@ -65,21 +66,12 @@ import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.buffer.BufferUtils;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
-import static org.apache.sshd.common.SshConstants.SSH_MSG_DEBUG;
-import static org.apache.sshd.common.SshConstants.SSH_MSG_DISCONNECT;
-import static org.apache.sshd.common.SshConstants.SSH_MSG_IGNORE;
-import static org.apache.sshd.common.SshConstants.SSH_MSG_KEXINIT;
-import static org.apache.sshd.common.SshConstants.SSH_MSG_NEWKEYS;
-import static org.apache.sshd.common.SshConstants.SSH_MSG_SERVICE_ACCEPT;
-import static org.apache.sshd.common.SshConstants.SSH_MSG_SERVICE_REQUEST;
-import static org.apache.sshd.common.SshConstants.SSH_MSG_UNIMPLEMENTED;
-
/**
* <P>
* The AbstractSession handles all the basic SSH protocol such as key exchange, authentication,
* encoding and decoding. Both server side and client side sessions should inherit from this
* abstract class. Some basic packet processing methods are defined but the actual call to these
- * methods should be done from the {@link #handleMessage(org.apache.sshd.common.util.buffer.Buffer)}
+ * methods should be done from the {@link #handleMessage(Buffer)}
* method, which is dependent on the state and side of this session.
* </P>
*
@@ -128,8 +120,15 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
/**
* Session listeners container
*/
- protected final List<SessionListener> listeners = new CopyOnWriteArrayList<SessionListener>();
+ protected final Collection<SessionListener> sessionListeners = new CopyOnWriteArraySet<>();
protected final SessionListener sessionListenerProxy;
+
+ /**
+ * Channel events listener
+ */
+ protected final Collection<ChannelListener> channelListeners = new CopyOnWriteArraySet<>();
+ protected final ChannelListener channelListenerProxy;
+
//
// Key exchange support
//
@@ -202,7 +201,11 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
this.isServer = isServer;
this.factoryManager = ValidateUtils.checkNotNull(factoryManager, "No factory manager provided", GenericUtils.EMPTY_OBJECT_ARRAY);
this.ioSession = ioSession;
- sessionListenerProxy = EventListenerUtils.proxyWrapper(SessionListener.class, getClass().getClassLoader(), listeners);
+
+ ClassLoader loader = getClass().getClassLoader();
+ sessionListenerProxy = EventListenerUtils.proxyWrapper(SessionListener.class, loader, sessionListeners);
+ channelListenerProxy = EventListenerUtils.proxyWrapper(ChannelListener.class, loader, channelListeners);
+
random = factoryManager.getRandomFactory().create();
authTimeoutMs = getLongProperty(FactoryManager.AUTH_TIMEOUT, authTimeoutMs);
authTimeoutTimestamp = System.currentTimeMillis() + authTimeoutMs;
@@ -301,7 +304,7 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
@Override
public void setAuthenticated() throws IOException {
this.authed = true;
- sendEvent(SessionListener.Event.Authenticated);
+ sendSessionEvent(SessionListener.Event.Authenticated);
}
/**
@@ -353,22 +356,22 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
protected void doHandleMessage(Buffer buffer) throws Exception {
int cmd = buffer.getUByte();
switch (cmd) {
- case SSH_MSG_DISCONNECT: {
+ case SshConstants.SSH_MSG_DISCONNECT: {
handleDisconnect(buffer);
break;
}
- case SSH_MSG_IGNORE: {
+ case SshConstants.SSH_MSG_IGNORE: {
log.debug("Received SSH_MSG_IGNORE");
break;
}
- case SSH_MSG_UNIMPLEMENTED: {
+ case SshConstants.SSH_MSG_UNIMPLEMENTED: {
int code = buffer.getInt();
if (log.isDebugEnabled()) {
log.debug("Received SSH_MSG_UNIMPLEMENTED #{}", Integer.valueOf(code));
}
break;
}
- case SSH_MSG_DEBUG: {
+ case SshConstants.SSH_MSG_DEBUG: {
boolean display = buffer.getBoolean();
String msg = buffer.getString();
if (log.isDebugEnabled()) {
@@ -376,16 +379,16 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
}
break;
}
- case SSH_MSG_SERVICE_REQUEST:
+ case SshConstants.SSH_MSG_SERVICE_REQUEST:
handleServiceRequest(buffer);
break;
- case SSH_MSG_SERVICE_ACCEPT:
+ case SshConstants.SSH_MSG_SERVICE_ACCEPT:
handleServiceAccept();
break;
- case SSH_MSG_KEXINIT:
+ case SshConstants.SSH_MSG_KEXINIT:
handleKexInit(buffer);
break;
- case SSH_MSG_NEWKEYS:
+ case SshConstants.SSH_MSG_NEWKEYS:
handleNewKeys(cmd);
break;
default:
@@ -420,7 +423,7 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
private void handleServiceRequest(Buffer buffer) throws IOException {
String service = buffer.getString();
log.debug("Received SSH_MSG_SERVICE_REQUEST '{}'", service);
- validateKexState(SSH_MSG_SERVICE_REQUEST, KexState.DONE);
+ validateKexState(SshConstants.SSH_MSG_SERVICE_REQUEST, KexState.DONE);
try {
startService(service);
} catch (Exception e) {
@@ -434,39 +437,39 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
writePacket(response);
}
- private void handleServiceAccept() throws java.io.IOException {
+ private void handleServiceAccept() throws IOException {
log.debug("Received SSH_MSG_SERVICE_ACCEPT");
- validateKexState(SSH_MSG_SERVICE_ACCEPT, org.apache.sshd.common.kex.KexState.DONE);
+ validateKexState(SshConstants.SSH_MSG_SERVICE_ACCEPT, KexState.DONE);
serviceAccept();
}
- private void handleKexInit(org.apache.sshd.common.util.buffer.Buffer buffer) throws Exception {
+ private void handleKexInit(Buffer buffer) throws Exception {
log.debug("Received SSH_MSG_KEXINIT");
receiveKexInit(buffer);
- if (kexState.compareAndSet(org.apache.sshd.common.kex.KexState.DONE, org.apache.sshd.common.kex.KexState.RUN)) {
+ if (kexState.compareAndSet(KexState.DONE, KexState.RUN)) {
sendKexInit();
- } else if (!kexState.compareAndSet(org.apache.sshd.common.kex.KexState.INIT, org.apache.sshd.common.kex.KexState.RUN)) {
+ } else if (!kexState.compareAndSet(KexState.INIT, KexState.RUN)) {
throw new IllegalStateException("Received SSH_MSG_KEXINIT while key exchange is running");
}
- java.util.Map<org.apache.sshd.common.kex.KexProposalOption, String> result = negotiate();
- String kexAlgorithm = result.get(org.apache.sshd.common.kex.KexProposalOption.ALGORITHMS);
- kex = org.apache.sshd.common.util.ValidateUtils.checkNotNull(org.apache.sshd.common.NamedFactory.Utils.create(factoryManager.getKeyExchangeFactories(), kexAlgorithm),
+ Map<KexProposalOption, String> result = negotiate();
+ String kexAlgorithm = result.get(KexProposalOption.ALGORITHMS);
+ kex = ValidateUtils.checkNotNull(NamedFactory.Utils.create(factoryManager.getKeyExchangeFactories(), kexAlgorithm),
"Unknown negotiated KEX algorithm: %s",
kexAlgorithm);
- kex.init(this, serverVersion.getBytes(java.nio.charset.StandardCharsets.UTF_8), clientVersion.getBytes(java.nio.charset.StandardCharsets.UTF_8), i_s, i_c);
+ kex.init(this, serverVersion.getBytes(StandardCharsets.UTF_8), clientVersion.getBytes(StandardCharsets.UTF_8), i_s, i_c);
- sendEvent(org.apache.sshd.common.session.SessionListener.Event.KexCompleted);
+ sendSessionEvent(SessionListener.Event.KexCompleted);
}
private void handleNewKeys(int cmd) throws Exception {
log.debug("Received SSH_MSG_NEWKEYS");
- validateKexState(cmd, org.apache.sshd.common.kex.KexState.KEYS);
+ validateKexState(cmd, KexState.KEYS);
receiveNewKeys();
if (reexchangeFuture != null) {
reexchangeFuture.setValue(Boolean.TRUE);
}
- sendEvent(org.apache.sshd.common.session.SessionListener.Event.KeyEstablished);
+ sendSessionEvent(SessionListener.Event.KeyEstablished);
synchronized (pendingPackets) {
if (!pendingPackets.isEmpty()) {
log.debug("Dequeing pending packets");
@@ -477,7 +480,7 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
}
}
}
- kexState.set(org.apache.sshd.common.kex.KexState.DONE);
+ kexState.set(KexState.DONE);
}
synchronized (lock) {
lock.notifyAll();
@@ -495,7 +498,7 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
* Handle any exceptions that occurred on this session.
* The session will be closed and a disconnect packet will be
* sent before if the given exception is an
- * {@link org.apache.sshd.common.SshException}.
+ * {@link SshException}.
*
* @param t the exception to process
*/
@@ -534,10 +537,19 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
}
@Override
- protected void doCloseImmediately() {
- super.doCloseImmediately();
+ protected void preClose() {
// Fire 'close' event
- sessionListenerProxy.sessionClosed(this);
+ SessionListener listener = getSessionListenerProxy();
+ try {
+ listener.sessionClosed(this);
+ } catch (RuntimeException t) {
+ Throwable e = GenericUtils.peelException(t);
+ log.warn(e.getClass().getSimpleName() + " while signal session " + toString() + " closed: " + e.getMessage(), e);
+ } finally {
+ // clear the listeners since we are closing the session (quicker GC)
+ this.sessionListeners.clear();
+ this.channelListeners.clear();
+ }
}
protected Service[] getServices() {
@@ -561,7 +573,7 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
*
* @param buffer the buffer to encode and send
* @return a future that can be used to check when the packet has actually been sent
- * @throws java.io.IOException if an error occured when encoding sending the packet
+ * @throws IOException if an error occurred when encoding sending the packet
*/
@Override
public IoWriteFuture writePacket(Buffer buffer) throws IOException {
@@ -630,7 +642,7 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
*
* @param buffer the buffer containing the global request
* @return <code>true</code> if the request was successful, <code>false</code> otherwise.
- * @throws java.io.IOException if an error occured when encoding sending the packet
+ * @throws IOException if an error occurred when encoding sending the packet
*/
@Override
public Buffer request(Buffer buffer) throws IOException {
@@ -873,7 +885,7 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
/**
* Read the other side identification.
* This method is specific to the client or server side, but both should call
- * {@link #doReadIdentification(org.apache.sshd.common.util.buffer.Buffer, boolean)} and
+ * {@link #doReadIdentification(Buffer, boolean)} and
* store the result in the needed property.
*
* @param buffer the buffer containing the remote identification
@@ -1397,18 +1409,68 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
}
@Override
- public void addListener(SessionListener listener) {
- ValidateUtils.checkNotNull(listener, "addListener(%s) null instance", this);
- this.listeners.add(listener);
+ public void addSessionListener(SessionListener listener) {
+ ValidateUtils.checkNotNull(listener, "addSessionListener(%s) null instance", this);
+ // avoid race conditions on notifications while session is being closed
+ if (!isOpen()) {
+ log.warn("addSessionListener({})[{}] ignore registration while session is closing", this, listener);
+ return;
+ }
+
+ if (this.sessionListeners.add(listener)) {
+ log.trace("addSessionListener({})[{}] registered", this, listener);
+ } else {
+ log.trace("addSessionListener({})[{}] ignored duplicate", this, listener);
+ }
+ }
+
+ @Override
+ public void removeSessionListener(SessionListener listener) {
+ if (this.sessionListeners.remove(listener)) {
+ log.trace("removeSessionListener({})[{}] removed", this, listener);
+ } else {
+ log.trace("removeSessionListener({})[{}] not registered", this, listener);
+ }
+ }
+
+ @Override
+ public SessionListener getSessionListenerProxy() {
+ return sessionListenerProxy;
+ }
+
+ @Override
+ public void addChannelListener(ChannelListener listener) {
+ ValidateUtils.checkNotNull(listener, "addChannelListener(%s) null instance", this);
+ // avoid race conditions on notifications while session is being closed
+ if (!isOpen()) {
+ log.warn("addChannelListener({})[{}] ignore registration while session is closing", this, listener);
+ return;
+ }
+
+ if (this.channelListeners.add(listener)) {
+ log.trace("addChannelListener({})[{}] registered", this, listener);
+ } else {
+ log.trace("addChannelListener({})[{}] ignored duplicate", this, listener);
+ }
+ }
+
+ @Override
+ public void removeChannelListener(ChannelListener listener) {
+ if (this.channelListeners.remove(listener)) {
+ log.trace("removeChannelListener({})[{}] removed", this, listener);
+ } else {
+ log.trace("removeChannelListener({})[{}] not registered", this, listener);
+ }
}
@Override
- public void removeListener(SessionListener listener) {
- this.listeners.remove(listener);
+ public ChannelListener getChannelListenerProxy() {
+ return channelListenerProxy;
}
- protected void sendEvent(SessionListener.Event event) throws IOException {
- sessionListenerProxy.sessionEvent(this, event);
+ protected void sendSessionEvent(SessionListener.Event event) throws IOException {
+ SessionListener listener = getSessionListenerProxy();
+ listener.sessionEvent(this, event);
}
@Override
@@ -1509,11 +1571,11 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
protected void checkForTimeouts() throws IOException {
if (!isClosing()) {
long now = System.currentTimeMillis();
- if (!authed && authTimeoutMs > 0 && now > authTimeoutTimestamp) {
+ if ((!authed) && (authTimeoutMs > 0L) && (now > authTimeoutTimestamp)) {
timeoutStatus.set(TimeoutStatus.AuthTimeout);
disconnect(SshConstants.SSH2_DISCONNECT_PROTOCOL_ERROR, "Session has timed out waiting for authentication after " + authTimeoutMs + " ms.");
}
- if (idleTimeoutMs > 0 && idleTimeoutTimestamp > 0 && now > idleTimeoutTimestamp) {
+ if ((idleTimeoutMs > 0) && (idleTimeoutTimestamp > 0L) && (now > idleTimeoutTimestamp)) {
timeoutStatus.set(TimeoutStatus.AuthTimeout);
disconnect(SshConstants.SSH2_DISCONNECT_PROTOCOL_ERROR, "User session has timed out idling after " + idleTimeoutMs + " ms.");
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSessionFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSessionFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSessionFactory.java
index eb8636b..37fa761 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSessionFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSessionFactory.java
@@ -18,9 +18,7 @@
*/
package org.apache.sshd.common.session;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
+import org.apache.sshd.common.FactoryManager;
import org.apache.sshd.common.io.IoSession;
import org.apache.sshd.common.util.ValidateUtils;
@@ -29,45 +27,32 @@ import org.apache.sshd.common.util.ValidateUtils;
*
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public abstract class AbstractSessionFactory extends AbstractSessionIoHandler {
+public abstract class AbstractSessionFactory<M extends FactoryManager, S extends AbstractSession> extends AbstractSessionIoHandler {
+ private final M manager;
- protected final List<SessionListener> listeners = new CopyOnWriteArrayList<>();
+ protected AbstractSessionFactory(M manager) {
+ this.manager = ValidateUtils.checkNotNull(manager, "No factory manager instance");
+ }
- protected AbstractSessionFactory() {
- super();
+ public M getFactoryManager() {
+ return manager;
}
@Override
- protected AbstractSession createSession(IoSession ioSession) throws Exception {
- AbstractSession session = doCreateSession(ioSession);
-
- for (SessionListener l : listeners) {
- l.sessionCreated(session);
- session.addListener(l);
- }
-
- return session;
+ protected S createSession(IoSession ioSession) throws Exception {
+ return setupSession(doCreateSession(ioSession));
}
- protected abstract AbstractSession doCreateSession(IoSession ioSession) throws Exception;
+ protected abstract S doCreateSession(IoSession ioSession) throws Exception;
- /**
- * Add a session |listener|.
- *
- * @param listener the session listener to add
- */
- public void addListener(SessionListener listener) {
- ValidateUtils.checkNotNull(listener, "addListener(%s) no listener", this);
- this.listeners.add(listener);
- }
-
- /**
- * Remove a session |listener|.
- *
- * @param listener the session listener to remove
- */
- public void removeListener(SessionListener listener) {
- this.listeners.remove(listener);
+ protected S setupSession(S session) throws Exception {
+ FactoryManager listenersManager = getFactoryManager();
+ SessionListener sessionListener = listenersManager.getSessionListenerProxy();
+ // Inform the listener of the newly created session
+ sessionListener.sessionCreated(session);
+ // Delegate the task of further notifications to the session
+ session.addSessionListener(sessionListener);
+ session.addChannelListener(listenersManager.getChannelListenerProxy());
+ return session;
}
-
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
index 66616af..ec35195 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.sshd.common.Closeable;
import org.apache.sshd.common.FactoryManager;
import org.apache.sshd.common.Service;
+import org.apache.sshd.common.channel.ChannelListenerManager;
import org.apache.sshd.common.future.SshFuture;
import org.apache.sshd.common.io.IoSession;
import org.apache.sshd.common.io.IoWriteFuture;
@@ -36,7 +37,7 @@ import org.apache.sshd.common.util.buffer.Buffer;
*
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public interface Session extends Closeable {
+public interface Session extends SessionListenerManager, ChannelListenerManager, Closeable {
/**
* Timeout status.
@@ -181,20 +182,6 @@ public interface Session extends Closeable {
void exceptionCaught(Throwable t);
/**
- * Add a session |listener|.
- *
- * @param listener the session listener to add
- */
- void addListener(SessionListener listener);
-
- /**
- * Remove a session |listener|.
- *
- * @param listener the session listener to remove
- */
- void removeListener(SessionListener listener);
-
- /**
* Initiate a new key exchange.
*
* @return An {@link SshFuture} for awaiting the completion of the exchange
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/40195ab4/sshd-core/src/main/java/org/apache/sshd/common/session/SessionListenerManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/SessionListenerManager.java b/sshd-core/src/main/java/org/apache/sshd/common/session/SessionListenerManager.java
new file mode 100644
index 0000000..eda580a
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/SessionListenerManager.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.session;
+
+/**
+ * Marker interface for classes that allow to add/remove session listeners.
+ * <B>Note:</B> if adding/removing listeners while connections are being
+ * established and/or torn down there are no guarantees as to the order of
+ * the calls to the recently added/removed listener's methods in the interim.
+ * The correct order is guaranteed only as of the <U>next</U> session after
+ * the listener has been added/removed.
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface SessionListenerManager {
+ /**
+ * Add a session listener.
+ *
+ * @param listener The {@link SessionListener} to add - not {@code null}
+ */
+ void addSessionListener(SessionListener listener);
+
+ /**
+ * Remove a session listener.
+ *
+ * @param listener The {@link SessionListener} to remove
+ */
+ void removeSessionListener(SessionListener listener);
+
+ /**
+ * @return A (never {@code null} proxy {@link SessionListener} that represents
+ * all the currently registered listeners. Any method invocation on the proxy
+ * is replicated to the currently registered listeners
+ */
+ SessionListener getSessionListenerProxy();
+}