You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2015/06/30 09:10:29 UTC

mina-sshd git commit: [SSHD-508] Use event-driven wait times for PortForwardingTest

Repository: mina-sshd
Updated Branches:
  refs/heads/master 78d703fd4 -> e06cfdcb1


[SSHD-508] Use event-driven wait times for PortForwardingTest


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/e06cfdcb
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/e06cfdcb
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/e06cfdcb

Branch: refs/heads/master
Commit: e06cfdcb11b57d6646e6d3926c2cad344887b261
Parents: 78d703f
Author: Lyor Goldstein <lg...@vmware.com>
Authored: Tue Jun 30 10:10:20 2015 +0300
Committer: Lyor Goldstein <lg...@vmware.com>
Committed: Tue Jun 30 10:10:20 2015 +0300

----------------------------------------------------------------------
 .../sshd/common/forward/TcpipForwarder.java     |  26 ++--
 .../common/forward/TcpipForwarderFactory.java   |   2 +-
 .../sshd/server/forward/TcpipServerChannel.java |   7 +-
 .../global/CancelTcpipForwardHandler.java       |   4 +-
 .../sshd/server/global/TcpipForwardHandler.java |   4 +-
 .../org/apache/sshd/PortForwardingTest.java     | 134 ++++++++++++++++---
 6 files changed, 140 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e06cfdcb/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java
index 20552cf..16e9a1f 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarder.java
@@ -28,11 +28,16 @@ public interface TcpipForwarder extends Closeable {
 
     /**
      * Start forwarding the given local address on the client to the given address on the server.
+     * @param remote The remote address
+     * @param local The local address
+     * @throws IOException If failed to handle request
      */
     SshdSocketAddress startLocalPortForwarding(SshdSocketAddress local, SshdSocketAddress remote) throws IOException;
 
     /**
      * Stop forwarding the given local address.
+     * @param local The local address
+     * @throws IOException If failed to handle request
      */
     void stopLocalPortForwarding(SshdSocketAddress local) throws IOException;
 
@@ -51,34 +56,37 @@ public interface TcpipForwarder extends Closeable {
      *    <li>"127.0.0.1" and "::1" indicate listening on the loopback interfaces for
      *              IPv4 and IPv6 respectively</li>
      * </ul>
-     *
+     * @param remote The remote address
+     * @param local The local address
+     * @throws IOException If failed to handle request
      */
     SshdSocketAddress startRemotePortForwarding(SshdSocketAddress remote, SshdSocketAddress local) throws IOException;
 
     /**
      * Stop forwarding of the given remote address.
+     * @param remote The remote {@link SshdSocketAddress}
+     * @throws IOException If failed to handle request
      */
     void stopRemotePortForwarding(SshdSocketAddress remote) throws IOException;
 
     /**
-     * Retrieve the local address that the remote port is forwarded to
-     * @param remotePort
-     * @return
+     * @param remotePort The remote port
+     * @return The local {@link SshdSocketAddress} that the remote port is forwarded to
      */
     SshdSocketAddress getForwardedPort(int remotePort);
 
     /**
      * Called when the other side requested a remote port forward.
-     * @param local
-     * @return the list of bound local addresses
-     * @throws IOException
+     * @param local The request address
+     * @return The bound local addresses
+     * @throws IOException If failed to handle request
      */
     SshdSocketAddress localPortForwardingRequested(SshdSocketAddress local) throws IOException;
 
     /**
      * Called when the other side cancelled a remote port forward.
-     * @param local
-     * @throws IOException
+     * @param local The local {@link SshdSocketAddress}
+     * @throws IOException If failed to handle request
      */
     void localPortForwardingCancelled(SshdSocketAddress local) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e06cfdcb/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarderFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarderFactory.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarderFactory.java
index 9d03280..8adffbc 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarderFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipForwarderFactory.java
@@ -32,6 +32,6 @@ public interface TcpipForwarderFactory {
      * @param service the service the connections are forwarded through
      * @return the TcpipForwarder that will listen for connections and set up forwarding
      */
-    public TcpipForwarder create(ConnectionService service);
+    TcpipForwarder create(ConnectionService service);
 
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e06cfdcb/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
index 22ea6b4..4741136 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ExecutorService;
 
 import org.apache.sshd.client.future.DefaultOpenFuture;
 import org.apache.sshd.client.future.OpenFuture;
+import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.NamedFactory;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshdSocketAddress;
@@ -143,7 +144,8 @@ public class TcpipServerChannel extends AbstractServerChannel {
         }
 
         Session session = getSession();
-        ForwardingFilter filter = session.getFactoryManager().getTcpipForwardingFilter();
+        FactoryManager manager = session.getFactoryManager();
+        ForwardingFilter filter = manager.getTcpipForwardingFilter();
         if ((address == null) || (filter == null) || (!filter.canConnect(type, address, session))) {
             if (log.isDebugEnabled()) {
                 log.debug("doInit(" + session + ")[" + type + "][haveFilter=" + (filter != null) + "] filtered out " + address);
@@ -183,8 +185,7 @@ public class TcpipServerChannel extends AbstractServerChannel {
                 close(true);
             }
         };
-        connector = getSession().getFactoryManager().getIoServiceFactory()
-                .createConnector(handler);
+        connector = manager.getIoServiceFactory().createConnector(handler);
         IoConnectFuture future = connector.connect(address.toInetSocketAddress());
         future.addListener(new SshFutureListener<IoConnectFuture>() {
             @SuppressWarnings("synthetic-access")

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e06cfdcb/sshd-core/src/main/java/org/apache/sshd/server/global/CancelTcpipForwardHandler.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/global/CancelTcpipForwardHandler.java b/sshd-core/src/main/java/org/apache/sshd/server/global/CancelTcpipForwardHandler.java
index 342c8f2..a139517 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/global/CancelTcpipForwardHandler.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/global/CancelTcpipForwardHandler.java
@@ -33,13 +33,15 @@ import org.apache.sshd.common.util.logging.AbstractLoggingBean;
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
 public class CancelTcpipForwardHandler extends AbstractLoggingBean implements RequestHandler<ConnectionService> {
+    public static final String REQUEST = "cancel-tcpip-forward";
+
     public CancelTcpipForwardHandler() {
         super();
     }
 
     @Override
     public Result process(ConnectionService connectionService, String request, boolean wantReply, Buffer buffer) throws Exception {
-        if ("cancel-tcpip-forward".equals(request)) {
+        if (REQUEST.equals(request)) {
             String address = buffer.getString();
             int port = buffer.getInt();
             SshdSocketAddress socketAddress = new SshdSocketAddress(address, port);

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e06cfdcb/sshd-core/src/main/java/org/apache/sshd/server/global/TcpipForwardHandler.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/global/TcpipForwardHandler.java b/sshd-core/src/main/java/org/apache/sshd/server/global/TcpipForwardHandler.java
index 53b4ce5..6208ff2 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/global/TcpipForwardHandler.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/global/TcpipForwardHandler.java
@@ -33,13 +33,15 @@ import org.apache.sshd.common.util.logging.AbstractLoggingBean;
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
 public class TcpipForwardHandler extends AbstractLoggingBean implements RequestHandler<ConnectionService> {
+    public static final String REQUEST = "tcpip-forward";
+
     public TcpipForwardHandler() {
         super();
     }
 
     @Override
     public Result process(ConnectionService connectionService, String request, boolean wantReply, Buffer buffer) throws Exception {
-        if ("tcpip-forward".equals(request)) {
+        if (REQUEST.equals(request)) {
             String address = buffer.getString();
             int port = buffer.getInt();
             SshdSocketAddress socketAddress = new SshdSocketAddress(address, port);

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e06cfdcb/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java b/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java
index 0bbb489..7acd3f9 100644
--- a/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java
@@ -23,11 +23,19 @@ import static org.apache.sshd.util.Utils.getFreePort;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.lang.reflect.Field;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.charset.StandardCharsets;
+import java.util.Collection;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.mina.core.buffer.IoBuffer;
@@ -41,8 +49,15 @@ import org.apache.sshd.client.session.ClientSession;
 import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.FactoryManagerUtils;
 import org.apache.sshd.common.SshdSocketAddress;
+import org.apache.sshd.common.forward.TcpipForwarder;
+import org.apache.sshd.common.forward.TcpipForwarderFactory;
+import org.apache.sshd.common.session.ConnectionService;
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.server.SshServer;
 import org.apache.sshd.server.forward.ForwardingFilter;
+import org.apache.sshd.server.global.CancelTcpipForwardHandler;
+import org.apache.sshd.server.global.TcpipForwardHandler;
 import org.apache.sshd.util.BaseTestSupport;
 import org.apache.sshd.util.BogusPasswordAuthenticator;
 import org.apache.sshd.util.EchoShellFactory;
@@ -68,6 +83,8 @@ public class PortForwardingTest extends BaseTestSupport {
 
     private final org.slf4j.Logger log = LoggerFactory.getLogger(getClass());
 
+    private final BlockingQueue<String> requestsQ = new LinkedBlockingDeque<String>();
+
     private SshServer sshd;
     private int sshPort;
     private int echoPort;
@@ -84,6 +101,48 @@ public class PortForwardingTest extends BaseTestSupport {
         sshd.setPasswordAuthenticator(BogusPasswordAuthenticator.INSTANCE);
         sshd.setTcpipForwardingFilter(ForwardingFilter.AcceptAllForwardingFilter.INSTANCE);
         sshd.start();
+        
+        if (!requestsQ.isEmpty()) {
+            requestsQ.clear();
+        }
+
+        final TcpipForwarderFactory factory = ValidateUtils.checkNotNull(sshd.getTcpipForwarderFactory(), "No TcpipForwarderFactory", GenericUtils.EMPTY_OBJECT_ARRAY);
+        sshd.setTcpipForwarderFactory(new TcpipForwarderFactory() {
+                private final Class<?>[] interfaces = { TcpipForwarder.class };
+                private final Map<String,String> method2req = new TreeMap<String,String>(String.CASE_INSENSITIVE_ORDER) {
+                        private static final long serialVersionUID = 1L;    // we're not serializing it...
+
+                        {
+                            put("localPortForwardingRequested", TcpipForwardHandler.REQUEST);
+                            put("localPortForwardingCancelled", CancelTcpipForwardHandler.REQUEST);
+                        }
+                    };
+
+                @Override
+                public TcpipForwarder create(ConnectionService service) {
+                    Thread thread = Thread.currentThread();
+                    ClassLoader cl = thread.getContextClassLoader();
+
+                    final TcpipForwarder forwarder = factory.create(service);
+                    return (TcpipForwarder) Proxy.newProxyInstance(cl, interfaces, new InvocationHandler() {
+                        @SuppressWarnings("synthetic-access")
+                        @Override
+                        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+                            Object result = method.invoke(forwarder, args);
+                            String name = method.getName();
+                            String request = method2req.get(name);
+                            if (GenericUtils.length(request) > 0) {
+                                if (requestsQ.offer(request)) {
+                                    log.info("Signal " + request);
+                                } else {
+                                    log.error("Failed to offer request=" + request);
+                                }
+                            }
+                            return result;
+                        }
+                    });
+                }
+            });
         sshPort = sshd.getPort();
 
         NioSocketAcceptor acceptor = new NioSocketAcceptor();
@@ -103,6 +162,26 @@ public class PortForwardingTest extends BaseTestSupport {
         this.acceptor = acceptor;
     }
 
+    private void waitForForwardingRequest(String expected, long timeout) throws InterruptedException {
+        for (long remaining = timeout; remaining > 0L; ) {
+            long waitStart = System.currentTimeMillis();
+            String actual = requestsQ.poll(remaining, TimeUnit.MILLISECONDS);
+            long waitEnd = System.currentTimeMillis();
+            if (GenericUtils.isEmpty(actual)) {
+                throw new IllegalStateException("Failed to retrieve request=" + expected);
+            }
+            
+            if (expected.equals(actual)) {
+                return;
+            }
+            
+            long waitDuration = waitEnd - waitStart;
+            remaining -= waitDuration;
+        }
+
+        throw new IllegalStateException("Timeout while waiting to retrieve request=" + expected);
+    }
+
     @After
     public void tearDown() throws Exception {
         if (sshd != null) {
@@ -122,7 +201,7 @@ public class PortForwardingTest extends BaseTestSupport {
         try {
             int forwardedPort = getFreePort();
             session.setPortForwardingR(forwardedPort, "localhost", echoPort);
-            Thread.sleep(100);
+            waitForForwardingRequest(TcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
     
             try(Socket s = new Socket("localhost", forwardedPort);
                 OutputStream output = s.getOutputStream();
@@ -153,11 +232,13 @@ public class PortForwardingTest extends BaseTestSupport {
         try {
             int forwardedPort = getFreePort();
             session.setPortForwardingR(forwardedPort, "localhost", echoPort);
-            Thread.sleep(155L);
+            waitForForwardingRequest(TcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
+
             session.delPortForwardingR("localhost", forwardedPort);
-            Thread.sleep(155L);
+            waitForForwardingRequest(CancelTcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
+
             session.setPortForwardingR(forwardedPort, "localhost", echoPort);
-            Thread.sleep(155L);
+            waitForForwardingRequest(TcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
     
             try(Socket s = new Socket("localhost", forwardedPort);
                 OutputStream output = s.getOutputStream();
@@ -379,14 +460,15 @@ public class PortForwardingTest extends BaseTestSupport {
         }
     }
 
-    @Test(timeout = 20000)
+    @Test(timeout = 30000)
     public void testRemoteForwardingWithDisconnect() throws Exception {
         Session session = createSession();
         try {
             // 1. Create a Port Forward
             int forwardedPort = getFreePort();
             session.setPortForwardingR(forwardedPort, "localhost", echoPort);
-    
+            waitForForwardingRequest(TcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
+
             // 2. Establish a connection through it
             try(Socket s = new Socket("localhost", forwardedPort)) {
                 s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
@@ -396,7 +478,7 @@ public class PortForwardingTest extends BaseTestSupport {
         
                 // 4. Make sure the NIOprocessor is not stuck
                 {
-                    Thread.sleep(1000L);
+                    Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
                     // from here, we need to check all the threads running and find a
                     // "NioProcessor-"
                     // that is stuck on a PortForward.dispose
@@ -404,18 +486,19 @@ public class PortForwardingTest extends BaseTestSupport {
                     while (root.getParent() != null) {
                         root = root.getParent();
                     }
-                    boolean stuck;
-                    do {
-                        stuck = false;
-                        for (Thread t : findThreads(root, "NioProcessor-")) {
-                            stuck = true;
+                    
+                    for (int index = 0; ; index++) {
+                        Collection<Thread>  pending = findThreads(root, "NioProcessor-"); 
+                        if (GenericUtils.size(pending) <= 0) {
+                            log.info("Finished after " + index + " iterations");
+                            break;
                         }
                         try {
-                            Thread.sleep(1000);
+                            Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
                         } catch (InterruptedException e) {
                             // ignored
                         }
-                    } while (stuck);
+                    }
                 }
         
                 session.delPortForwardingR(forwardedPort);
@@ -446,16 +529,18 @@ public class PortForwardingTest extends BaseTestSupport {
     }
 
     private Set<Thread> findThreads(ThreadGroup group, String name) {
-        HashSet<Thread> ret = new HashSet<Thread>();
         int numThreads = group.activeCount();
         Thread[] threads = new Thread[numThreads * 2];
         numThreads = group.enumerate(threads, false);
+        Set<Thread> ret = new HashSet<Thread>();
+
         // Enumerate each thread in `group'
         for (int i = 0; i < numThreads; ++i) {
+            Thread t = threads[i];
             // Get thread
             // log.debug("Thread name: " + threads[i].getName());
-            if (checkThreadForPortForward(threads[i], name)) {
-                ret.add(threads[i]);
+            if (checkThreadForPortForward(t, name)) {
+                ret.add(t);
             }
         }
         // didn't find the thread to check the
@@ -463,7 +548,12 @@ public class PortForwardingTest extends BaseTestSupport {
         ThreadGroup[] groups = new ThreadGroup[numGroups * 2];
         numGroups = group.enumerate(groups, false);
         for (int i = 0; i < numGroups; ++i) {
-            ret.addAll(findThreads(groups[i], name));
+            ThreadGroup g = groups[i];
+            Collection<Thread> c = findThreads(g, name);
+            if (GenericUtils.isEmpty(c)) {
+                continue;   // debug breakpoint
+            }
+            ret.addAll(c);
         }
         return ret;
     }
@@ -503,8 +593,8 @@ public class PortForwardingTest extends BaseTestSupport {
     protected Session createSession() throws JSchException {
         JSchLogger.init();
         JSch sch = new JSch();
-        Session session = sch.getSession("sshd", "localhost", sshPort);
-        session.setUserInfo(new SimpleUserInfo("sshd"));
+        Session session = sch.getSession(getCurrentTestName(), "localhost", sshPort);
+        session.setUserInfo(new SimpleUserInfo(getCurrentTestName()));
         session.connect();
         return session;
     }
@@ -516,8 +606,8 @@ public class PortForwardingTest extends BaseTestSupport {
         client.setTcpipForwardingFilter(ForwardingFilter.AcceptAllForwardingFilter.INSTANCE);
         client.start();
 
-        ClientSession session = client.connect("sshd", "localhost", sshPort).verify(7L, TimeUnit.SECONDS).getSession();
-        session.addPasswordIdentity("sshd");
+        ClientSession session = client.connect(getCurrentTestName(), "localhost", sshPort).verify(7L, TimeUnit.SECONDS).getSession();
+        session.addPasswordIdentity(getCurrentTestName());
         session.auth().verify();
         return session;
     }