You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2015/06/30 09:10:29 UTC
mina-sshd git commit: [SSHD-508] Use event-driven wait times for
PortForwardingTest
Repository: mina-sshd
Updated Branches:
refs/heads/master 78d703fd4 -> e06cfdcb1
[SSHD-508] Use event-driven wait times for PortForwardingTest
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/e06cfdcb
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/e06cfdcb
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/e06cfdcb
Branch: refs/heads/master
Commit: e06cfdcb11b57d6646e6d3926c2cad344887b261
Parents: 78d703f
Author: Lyor Goldstein <lg...@vmware.com>
Authored: Tue Jun 30 10:10:20 2015 +0300
Committer: Lyor Goldstein <lg...@vmware.com>
Committed: Tue Jun 30 10:10:20 2015 +0300
----------------------------------------------------------------------
.../sshd/common/forward/TcpipForwarder.java | 26 ++--
.../common/forward/TcpipForwarderFactory.java | 2 +-
.../sshd/server/forward/TcpipServerChannel.java | 7 +-
.../global/CancelTcpipForwardHandler.java | 4 +-
.../sshd/server/global/TcpipForwardHandler.java | 4 +-
.../org/apache/sshd/PortForwardingTest.java | 134 ++++++++++++++++---
6 files changed, 140 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e06cfdcb/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java
index 20552cf..16e9a1f 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java
@@ -28,11 +28,16 @@ public interface TcpipForwarder extends Closeable {
/**
* Start forwarding the given local address on the client to the given address on the server.
+ * @param remote The remote address
+ * @param local The local address
+ * @throws IOException If failed to handle request
*/
SshdSocketAddress startLocalPortForwarding(SshdSocketAddress local, SshdSocketAddress remote) throws IOException;
/**
* Stop forwarding the given local address.
+ * @param local The local address
+ * @throws IOException If failed to handle request
*/
void stopLocalPortForwarding(SshdSocketAddress local) throws IOException;
@@ -51,34 +56,37 @@ public interface TcpipForwarder extends Closeable {
* <li>"127.0.0.1" and "::1" indicate listening on the loopback interfaces for
* IPv4 and IPv6 respectively</li>
* </ul>
- *
+ * @param remote The remote address
+ * @param local The local address
+ * @throws IOException If failed to handle request
*/
SshdSocketAddress startRemotePortForwarding(SshdSocketAddress remote, SshdSocketAddress local) throws IOException;
/**
* Stop forwarding of the given remote address.
+ * @param remote The remote {@link SshdSocketAddress}
+ * @throws IOException If failed to handle request
*/
void stopRemotePortForwarding(SshdSocketAddress remote) throws IOException;
/**
- * Retrieve the local address that the remote port is forwarded to
- * @param remotePort
- * @return
+ * @param remotePort The remote port
+ * @return The local {@link SshdSocketAddress} that the remote port is forwarded to
*/
SshdSocketAddress getForwardedPort(int remotePort);
/**
* Called when the other side requested a remote port forward.
- * @param local
- * @return the list of bound local addresses
- * @throws IOException
+ * @param local The request address
+ * @return The bound local addresses
+ * @throws IOException If failed to handle request
*/
SshdSocketAddress localPortForwardingRequested(SshdSocketAddress local) throws IOException;
/**
* Called when the other side cancelled a remote port forward.
- * @param local
- * @throws IOException
+ * @param local The local {@link SshdSocketAddress}
+ * @throws IOException If failed to handle request
*/
void localPortForwardingCancelled(SshdSocketAddress local) throws IOException;
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e06cfdcb/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarderFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarderFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarderFactory.java
index 9d03280..8adffbc 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarderFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarderFactory.java
@@ -32,6 +32,6 @@ public interface TcpipForwarderFactory {
* @param service the service the connections are forwarded through
* @return the TcpipForwarder that will listen for connections and set up forwarding
*/
- public TcpipForwarder create(ConnectionService service);
+ TcpipForwarder create(ConnectionService service);
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e06cfdcb/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
index 22ea6b4..4741136 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ExecutorService;
import org.apache.sshd.client.future.DefaultOpenFuture;
import org.apache.sshd.client.future.OpenFuture;
+import org.apache.sshd.common.FactoryManager;
import org.apache.sshd.common.NamedFactory;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.SshdSocketAddress;
@@ -143,7 +144,8 @@ public class TcpipServerChannel extends AbstractServerChannel {
}
Session session = getSession();
- ForwardingFilter filter = session.getFactoryManager().getTcpipForwardingFilter();
+ FactoryManager manager = session.getFactoryManager();
+ ForwardingFilter filter = manager.getTcpipForwardingFilter();
if ((address == null) || (filter == null) || (!filter.canConnect(type, address, session))) {
if (log.isDebugEnabled()) {
log.debug("doInit(" + session + ")[" + type + "][haveFilter=" + (filter != null) + "] filtered out " + address);
@@ -183,8 +185,7 @@ public class TcpipServerChannel extends AbstractServerChannel {
close(true);
}
};
- connector = getSession().getFactoryManager().getIoServiceFactory()
- .createConnector(handler);
+ connector = manager.getIoServiceFactory().createConnector(handler);
IoConnectFuture future = connector.connect(address.toInetSocketAddress());
future.addListener(new SshFutureListener<IoConnectFuture>() {
@SuppressWarnings("synthetic-access")
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e06cfdcb/sshd-core/src/main/java/org/apache/sshd/server/global/CancelTcpipForwardHandler.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/global/CancelTcpipForwardHandler.java b/sshd-core/src/main/java/org/apache/sshd/server/global/CancelTcpipForwardHandler.java
index 342c8f2..a139517 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/global/CancelTcpipForwardHandler.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/global/CancelTcpipForwardHandler.java
@@ -33,13 +33,15 @@ import org.apache.sshd.common.util.logging.AbstractLoggingBean;
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
public class CancelTcpipForwardHandler extends AbstractLoggingBean implements RequestHandler<ConnectionService> {
+ public static final String REQUEST = "cancel-tcpip-forward";
+
public CancelTcpipForwardHandler() {
super();
}
@Override
public Result process(ConnectionService connectionService, String request, boolean wantReply, Buffer buffer) throws Exception {
- if ("cancel-tcpip-forward".equals(request)) {
+ if (REQUEST.equals(request)) {
String address = buffer.getString();
int port = buffer.getInt();
SshdSocketAddress socketAddress = new SshdSocketAddress(address, port);
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e06cfdcb/sshd-core/src/main/java/org/apache/sshd/server/global/TcpipForwardHandler.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/global/TcpipForwardHandler.java b/sshd-core/src/main/java/org/apache/sshd/server/global/TcpipForwardHandler.java
index 53b4ce5..6208ff2 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/global/TcpipForwardHandler.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/global/TcpipForwardHandler.java
@@ -33,13 +33,15 @@ import org.apache.sshd.common.util.logging.AbstractLoggingBean;
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
public class TcpipForwardHandler extends AbstractLoggingBean implements RequestHandler<ConnectionService> {
+ public static final String REQUEST = "tcpip-forward";
+
public TcpipForwardHandler() {
super();
}
@Override
public Result process(ConnectionService connectionService, String request, boolean wantReply, Buffer buffer) throws Exception {
- if ("tcpip-forward".equals(request)) {
+ if (REQUEST.equals(request)) {
String address = buffer.getString();
int port = buffer.getInt();
SshdSocketAddress socketAddress = new SshdSocketAddress(address, port);
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e06cfdcb/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java b/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java
index 0bbb489..7acd3f9 100644
--- a/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java
@@ -23,11 +23,19 @@ import static org.apache.sshd.util.Utils.getFreePort;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Field;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
+import java.util.Collection;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.apache.mina.core.buffer.IoBuffer;
@@ -41,8 +49,15 @@ import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.common.FactoryManager;
import org.apache.sshd.common.FactoryManagerUtils;
import org.apache.sshd.common.SshdSocketAddress;
+import org.apache.sshd.common.forward.TcpipForwarder;
+import org.apache.sshd.common.forward.TcpipForwarderFactory;
+import org.apache.sshd.common.session.ConnectionService;
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.server.SshServer;
import org.apache.sshd.server.forward.ForwardingFilter;
+import org.apache.sshd.server.global.CancelTcpipForwardHandler;
+import org.apache.sshd.server.global.TcpipForwardHandler;
import org.apache.sshd.util.BaseTestSupport;
import org.apache.sshd.util.BogusPasswordAuthenticator;
import org.apache.sshd.util.EchoShellFactory;
@@ -68,6 +83,8 @@ public class PortForwardingTest extends BaseTestSupport {
private final org.slf4j.Logger log = LoggerFactory.getLogger(getClass());
+ private final BlockingQueue<String> requestsQ = new LinkedBlockingDeque<String>();
+
private SshServer sshd;
private int sshPort;
private int echoPort;
@@ -84,6 +101,48 @@ public class PortForwardingTest extends BaseTestSupport {
sshd.setPasswordAuthenticator(BogusPasswordAuthenticator.INSTANCE);
sshd.setTcpipForwardingFilter(ForwardingFilter.AcceptAllForwardingFilter.INSTANCE);
sshd.start();
+
+ if (!requestsQ.isEmpty()) {
+ requestsQ.clear();
+ }
+
+ final TcpipForwarderFactory factory = ValidateUtils.checkNotNull(sshd.getTcpipForwarderFactory(), "No TcpipForwarderFactory", GenericUtils.EMPTY_OBJECT_ARRAY);
+ sshd.setTcpipForwarderFactory(new TcpipForwarderFactory() {
+ private final Class<?>[] interfaces = { TcpipForwarder.class };
+ private final Map<String,String> method2req = new TreeMap<String,String>(String.CASE_INSENSITIVE_ORDER) {
+ private static final long serialVersionUID = 1L; // we're not serializing it...
+
+ {
+ put("localPortForwardingRequested", TcpipForwardHandler.REQUEST);
+ put("localPortForwardingCancelled", CancelTcpipForwardHandler.REQUEST);
+ }
+ };
+
+ @Override
+ public TcpipForwarder create(ConnectionService service) {
+ Thread thread = Thread.currentThread();
+ ClassLoader cl = thread.getContextClassLoader();
+
+ final TcpipForwarder forwarder = factory.create(service);
+ return (TcpipForwarder) Proxy.newProxyInstance(cl, interfaces, new InvocationHandler() {
+ @SuppressWarnings("synthetic-access")
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ Object result = method.invoke(forwarder, args);
+ String name = method.getName();
+ String request = method2req.get(name);
+ if (GenericUtils.length(request) > 0) {
+ if (requestsQ.offer(request)) {
+ log.info("Signal " + request);
+ } else {
+ log.error("Failed to offer request=" + request);
+ }
+ }
+ return result;
+ }
+ });
+ }
+ });
sshPort = sshd.getPort();
NioSocketAcceptor acceptor = new NioSocketAcceptor();
@@ -103,6 +162,26 @@ public class PortForwardingTest extends BaseTestSupport {
this.acceptor = acceptor;
}
+ private void waitForForwardingRequest(String expected, long timeout) throws InterruptedException {
+ for (long remaining = timeout; remaining > 0L; ) {
+ long waitStart = System.currentTimeMillis();
+ String actual = requestsQ.poll(remaining, TimeUnit.MILLISECONDS);
+ long waitEnd = System.currentTimeMillis();
+ if (GenericUtils.isEmpty(actual)) {
+ throw new IllegalStateException("Failed to retrieve request=" + expected);
+ }
+
+ if (expected.equals(actual)) {
+ return;
+ }
+
+ long waitDuration = waitEnd - waitStart;
+ remaining -= waitDuration;
+ }
+
+ throw new IllegalStateException("Timeout while waiting to retrieve request=" + expected);
+ }
+
@After
public void tearDown() throws Exception {
if (sshd != null) {
@@ -122,7 +201,7 @@ public class PortForwardingTest extends BaseTestSupport {
try {
int forwardedPort = getFreePort();
session.setPortForwardingR(forwardedPort, "localhost", echoPort);
- Thread.sleep(100);
+ waitForForwardingRequest(TcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
try(Socket s = new Socket("localhost", forwardedPort);
OutputStream output = s.getOutputStream();
@@ -153,11 +232,13 @@ public class PortForwardingTest extends BaseTestSupport {
try {
int forwardedPort = getFreePort();
session.setPortForwardingR(forwardedPort, "localhost", echoPort);
- Thread.sleep(155L);
+ waitForForwardingRequest(TcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
+
session.delPortForwardingR("localhost", forwardedPort);
- Thread.sleep(155L);
+ waitForForwardingRequest(CancelTcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
+
session.setPortForwardingR(forwardedPort, "localhost", echoPort);
- Thread.sleep(155L);
+ waitForForwardingRequest(TcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
try(Socket s = new Socket("localhost", forwardedPort);
OutputStream output = s.getOutputStream();
@@ -379,14 +460,15 @@ public class PortForwardingTest extends BaseTestSupport {
}
}
- @Test(timeout = 20000)
+ @Test(timeout = 30000)
public void testRemoteForwardingWithDisconnect() throws Exception {
Session session = createSession();
try {
// 1. Create a Port Forward
int forwardedPort = getFreePort();
session.setPortForwardingR(forwardedPort, "localhost", echoPort);
-
+ waitForForwardingRequest(TcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
+
// 2. Establish a connection through it
try(Socket s = new Socket("localhost", forwardedPort)) {
s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
@@ -396,7 +478,7 @@ public class PortForwardingTest extends BaseTestSupport {
// 4. Make sure the NIOprocessor is not stuck
{
- Thread.sleep(1000L);
+ Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
// from here, we need to check all the threads running and find a
// "NioProcessor-"
// that is stuck on a PortForward.dispose
@@ -404,18 +486,19 @@ public class PortForwardingTest extends BaseTestSupport {
while (root.getParent() != null) {
root = root.getParent();
}
- boolean stuck;
- do {
- stuck = false;
- for (Thread t : findThreads(root, "NioProcessor-")) {
- stuck = true;
+
+ for (int index = 0; ; index++) {
+ Collection<Thread> pending = findThreads(root, "NioProcessor-");
+ if (GenericUtils.size(pending) <= 0) {
+ log.info("Finished after " + index + " iterations");
+ break;
}
try {
- Thread.sleep(1000);
+ Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
} catch (InterruptedException e) {
// ignored
}
- } while (stuck);
+ }
}
session.delPortForwardingR(forwardedPort);
@@ -446,16 +529,18 @@ public class PortForwardingTest extends BaseTestSupport {
}
private Set<Thread> findThreads(ThreadGroup group, String name) {
- HashSet<Thread> ret = new HashSet<Thread>();
int numThreads = group.activeCount();
Thread[] threads = new Thread[numThreads * 2];
numThreads = group.enumerate(threads, false);
+ Set<Thread> ret = new HashSet<Thread>();
+
// Enumerate each thread in `group'
for (int i = 0; i < numThreads; ++i) {
+ Thread t = threads[i];
// Get thread
// log.debug("Thread name: " + threads[i].getName());
- if (checkThreadForPortForward(threads[i], name)) {
- ret.add(threads[i]);
+ if (checkThreadForPortForward(t, name)) {
+ ret.add(t);
}
}
// didn't find the thread to check the
@@ -463,7 +548,12 @@ public class PortForwardingTest extends BaseTestSupport {
ThreadGroup[] groups = new ThreadGroup[numGroups * 2];
numGroups = group.enumerate(groups, false);
for (int i = 0; i < numGroups; ++i) {
- ret.addAll(findThreads(groups[i], name));
+ ThreadGroup g = groups[i];
+ Collection<Thread> c = findThreads(g, name);
+ if (GenericUtils.isEmpty(c)) {
+ continue; // debug breakpoint
+ }
+ ret.addAll(c);
}
return ret;
}
@@ -503,8 +593,8 @@ public class PortForwardingTest extends BaseTestSupport {
protected Session createSession() throws JSchException {
JSchLogger.init();
JSch sch = new JSch();
- Session session = sch.getSession("sshd", "localhost", sshPort);
- session.setUserInfo(new SimpleUserInfo("sshd"));
+ Session session = sch.getSession(getCurrentTestName(), "localhost", sshPort);
+ session.setUserInfo(new SimpleUserInfo(getCurrentTestName()));
session.connect();
return session;
}
@@ -516,8 +606,8 @@ public class PortForwardingTest extends BaseTestSupport {
client.setTcpipForwardingFilter(ForwardingFilter.AcceptAllForwardingFilter.INSTANCE);
client.start();
- ClientSession session = client.connect("sshd", "localhost", sshPort).verify(7L, TimeUnit.SECONDS).getSession();
- session.addPasswordIdentity("sshd");
+ ClientSession session = client.connect(getCurrentTestName(), "localhost", sshPort).verify(7L, TimeUnit.SECONDS).getSession();
+ session.addPasswordIdentity(getCurrentTestName());
session.auth().verify();
return session;
}