You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2016/01/07 09:35:36 UTC

[1/3] mina-sshd git commit: Added timeout for global response method to avoid infinite waits

Repository: mina-sshd
Updated Branches:
  refs/heads/master 1e09776e7 -> e980eb63d


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java
new file mode 100644
index 0000000..e7b9162
--- /dev/null
+++ b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.forward;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.httpclient.HostConfiguration;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpVersion;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.service.IoAcceptor;
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.apache.sshd.server.SshServer;
+import org.apache.sshd.server.forward.AcceptAllForwardingFilter;
+import org.apache.sshd.util.test.BaseTestSupport;
+import org.apache.sshd.util.test.JSchLogger;
+import org.apache.sshd.util.test.SimpleUserInfo;
+import org.apache.sshd.util.test.Utils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+
+/**
+ * Port forwarding tests
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class PortForwardingLoadTest extends BaseTestSupport {
+    private final Logger log;
+    private SshServer sshd;
+    private int sshPort;
+    private IoAcceptor acceptor;
+
+    public PortForwardingLoadTest() {
+        log = LoggerFactory.getLogger(getClass());
+    }
+
+    @BeforeClass
+    public static void jschInit() {
+        JSchLogger.init();
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        sshd = setupTestServer();
+        sshd.setTcpipForwardingFilter(AcceptAllForwardingFilter.INSTANCE);
+        sshd.start();
+        sshPort = sshd.getPort();
+
+        NioSocketAcceptor acceptor = new NioSocketAcceptor();
+        acceptor.setHandler(new IoHandlerAdapter() {
+            @Override
+            public void messageReceived(IoSession session, Object message) throws Exception {
+                IoBuffer recv = (IoBuffer) message;
+                IoBuffer sent = IoBuffer.allocate(recv.remaining());
+                sent.put(recv);
+                sent.flip();
+                session.write(sent);
+            }
+        });
+        acceptor.setReuseAddress(true);
+        acceptor.bind(new InetSocketAddress(0));
+        log.info("setUp() echo address = {}", acceptor.getLocalAddress());
+        this.acceptor = acceptor;
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (sshd != null) {
+            sshd.stop(true);
+        }
+        if (acceptor != null) {
+            acceptor.dispose(true);
+        }
+    }
+
+    @Test
+    public void testLocalForwardingPayload() throws Exception {
+        final int NUM_ITERATIONS = 100;
+        final String PAYLOAD_TMP = "This is significantly longer Test Data. This is significantly " +
+                "longer Test Data. This is significantly longer Test Data. This is significantly " +
+                "longer Test Data. This is significantly longer Test Data. This is significantly " +
+                "longer Test Data. This is significantly longer Test Data. This is significantly " +
+                "longer Test Data. This is significantly longer Test Data. This is significantly " +
+                "longer Test Data. ";
+        StringBuilder sb = new StringBuilder(PAYLOAD_TMP.length() * 1000);
+        for (int i = 0; i < 1000; i++) {
+            sb.append(PAYLOAD_TMP);
+        }
+        final String PAYLOAD = sb.toString();
+
+        Session session = createSession();
+        try (final ServerSocket ss = new ServerSocket()) {
+            ss.setReuseAddress(true);
+            ss.bind(new InetSocketAddress((InetAddress) null, 0));
+            int forwardedPort = ss.getLocalPort();
+            int sinkPort = session.setPortForwardingL(0, TEST_LOCALHOST, forwardedPort);
+            final AtomicInteger conCount = new AtomicInteger(0);
+
+            Thread tAcceptor = new Thread(getCurrentTestName() + "Acceptor") {
+                @SuppressWarnings("synthetic-access")
+                @Override
+                public void run() {
+                    try {
+                        byte[] buf = new byte[8192];
+                        log.info("Started...");
+                        for (int i = 0; i < NUM_ITERATIONS; ++i) {
+                            try (Socket s = ss.accept()) {
+                                conCount.incrementAndGet();
+
+                                try (InputStream sockIn = s.getInputStream();
+                                     ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+
+                                    int l;
+                                    while ((baos.size() < PAYLOAD.length()) && ((l = sockIn.read(buf)) > 0)) {
+                                        baos.write(buf, 0, l);
+                                    }
+
+                                    assertEquals("Mismatched received data at iteration #" + i, PAYLOAD, baos.toString());
+
+                                    try (InputStream inputCopy = new ByteArrayInputStream(baos.toByteArray());
+                                         OutputStream sockOut = s.getOutputStream()) {
+
+                                        while ((l = sockIn.read(buf)) > 0) {
+                                            sockOut.write(buf, 0, l);
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                        log.info("Done");
+                    } catch (Exception e) {
+                        log.error("Failed to complete run loop", e);
+                    }
+                }
+            };
+            tAcceptor.start();
+            Thread.sleep(50);
+
+            byte[] buf = new byte[8192];
+            byte[] bytes = PAYLOAD.getBytes(StandardCharsets.UTF_8);
+            for (int i = 0; i < NUM_ITERATIONS; i++) {
+                log.info("Iteration {}", Integer.valueOf(i));
+                try (Socket s = new Socket(TEST_LOCALHOST, sinkPort);
+                     OutputStream sockOut = s.getOutputStream()) {
+
+                    s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
+
+                    sockOut.write(bytes);
+                    sockOut.flush();
+
+                    try (InputStream sockIn = s.getInputStream();
+                         ByteArrayOutputStream baos = new ByteArrayOutputStream(bytes.length)) {
+                        int l;
+                        while ((baos.size() < PAYLOAD.length()) && ((l = sockIn.read(buf)) > 0)) {
+                            baos.write(buf, 0, l);
+                        }
+                        assertEquals("Mismatched payload at iteration #" + i, PAYLOAD, baos.toString());
+                    }
+                } catch (Exception e) {
+                    log.error("Error in iteration #" + i, e);
+                }
+            }
+            session.delPortForwardingL(sinkPort);
+
+            ss.close();
+            tAcceptor.join(TimeUnit.SECONDS.toMillis(5L));
+        } finally {
+            session.disconnect();
+        }
+    }
+
+    @Test
+    public void testRemoteForwardingPayload() throws Exception {
+        final int NUM_ITERATIONS = 100;
+        final String PAYLOAD = "This is significantly longer Test Data. This is significantly " +
+                "longer Test Data. This is significantly longer Test Data. This is significantly " +
+                "longer Test Data. This is significantly longer Test Data. This is significantly " +
+                "longer Test Data. This is significantly longer Test Data. This is significantly " +
+                "longer Test Data. This is significantly longer Test Data. This is significantly " +
+                "longer Test Data. ";
+        Session session = createSession();
+        try (final ServerSocket ss = new ServerSocket()) {
+            ss.setReuseAddress(true);
+            ss.bind(new InetSocketAddress((InetAddress) null, 0));
+            int forwardedPort = ss.getLocalPort();
+            int sinkPort = Utils.getFreePort();
+            session.setPortForwardingR(sinkPort, TEST_LOCALHOST, forwardedPort);
+            final boolean started[] = new boolean[1];
+            started[0] = false;
+            final AtomicInteger conCount = new AtomicInteger(0);
+
+            Thread tWriter = new Thread(getCurrentTestName() + "Writer") {
+                @SuppressWarnings("synthetic-access")
+                @Override
+                public void run() {
+                    started[0] = true;
+                    try {
+                        byte[] bytes = PAYLOAD.getBytes(StandardCharsets.UTF_8);
+                        for (int i = 0; i < NUM_ITERATIONS; ++i) {
+                            try (Socket s = ss.accept()) {
+                                conCount.incrementAndGet();
+
+                                try (OutputStream sockOut = s.getOutputStream()) {
+                                    sockOut.write(bytes);
+                                    sockOut.flush();
+                                }
+                            }
+                        }
+                    } catch (Exception e) {
+                        log.error("Failed to complete run loop", e);
+                    }
+                }
+            };
+            tWriter.start();
+            Thread.sleep(50);
+            assertTrue("Server not started", started[0]);
+
+            final RuntimeException lenOK[] = new RuntimeException[NUM_ITERATIONS];
+            final RuntimeException dataOK[] = new RuntimeException[NUM_ITERATIONS];
+            byte b2[] = new byte[PAYLOAD.length()];
+            byte b1[] = new byte[b2.length / 2];
+
+            for (int i = 0; i < NUM_ITERATIONS; i++) {
+                final int ii = i;
+                try (Socket s = new Socket(TEST_LOCALHOST, sinkPort);
+                    InputStream sockIn = s.getInputStream()) {
+                    s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
+
+                    int read1 = sockIn.read(b1);
+                    String part1 = new String(b1, 0, read1, StandardCharsets.UTF_8);
+                    Thread.sleep(50);
+
+                    int read2 = sockIn.read(b2);
+                    String part2 = new String(b2, 0, read2, StandardCharsets.UTF_8);
+                    int totalRead = read1 + read2;
+                    lenOK[ii] = (PAYLOAD.length() == totalRead)
+                            ? null
+                            : new IndexOutOfBoundsException("Mismatched length: expected=" + PAYLOAD.length() + ", actual=" + totalRead);
+
+                    String readData = part1 + part2;
+                    dataOK[ii] = PAYLOAD.equals(readData) ? null : new IllegalStateException("Mismatched content");
+                    if (lenOK[ii] != null) {
+                        throw lenOK[ii];
+                    }
+
+                    if (dataOK[ii] != null) {
+                        throw dataOK[ii];
+                    }
+                } catch (Exception e) {
+                    if (e instanceof IOException) {
+                        log.warn("I/O exception in iteration #" + i, e);
+                    } else {
+                        log.error("Failed to complete iteration #" + i, e);
+                    }
+                }
+            }
+            int ok = 0;
+            for (int i = 0; i < NUM_ITERATIONS; i++) {
+                ok += (lenOK[i] == null) ? 1 : 0;
+            }
+            log.info("Successful iteration: " + ok + " out of " + NUM_ITERATIONS);
+            Thread.sleep(55L);
+            for (int i = 0; i < NUM_ITERATIONS; i++) {
+                assertNull("Bad length at iteration " + i, lenOK[i]);
+                assertNull("Bad data at iteration " + i, dataOK[i]);
+            }
+            session.delPortForwardingR(forwardedPort);
+            ss.close();
+            tWriter.join(TimeUnit.SECONDS.toMillis(5L));
+        } finally {
+            session.disconnect();
+        }
+    }
+
+    @Test
+    public void testForwardingOnLoad() throws Exception {
+//        final String path = "/history/recent/troubles/";
+//        final String host = "www.bbc.co.uk";
+//        final String path = "";
+//        final String host = "www.bahn.de";
+        final String path = "";
+        final String host = TEST_LOCALHOST;
+        final int nbThread = 2;
+        final int nbDownloads = 2;
+        final int nbLoops = 2;
+
+        StringBuilder resp = new StringBuilder();
+        resp.append("<html><body>\n");
+        for (int i = 0; i < 1000; i++) {
+            resp.append("0123456789\n");
+        }
+        resp.append("</body></html>\n");
+        final StringBuilder sb = new StringBuilder();
+        sb.append("HTTP/1.1 200 OK").append('\n');
+        sb.append("Content-Type: text/HTML").append('\n');
+        sb.append("Content-Length: ").append(resp.length()).append('\n');
+        sb.append('\n');
+        sb.append(resp);
+        NioSocketAcceptor acceptor = new NioSocketAcceptor();
+        acceptor.setHandler(new IoHandlerAdapter() {
+            @Override
+            public void messageReceived(IoSession session, Object message) throws Exception {
+                session.write(IoBuffer.wrap(sb.toString().getBytes(StandardCharsets.UTF_8)));
+            }
+        });
+        acceptor.setReuseAddress(true);
+        acceptor.bind(new InetSocketAddress(0));
+        final int port = acceptor.getLocalAddress().getPort();
+
+        Session session = createSession();
+        try {
+            final int forwardedPort1 = session.setPortForwardingL(0, host, port);
+            final int forwardedPort2 = Utils.getFreePort();
+            session.setPortForwardingR(forwardedPort2, TEST_LOCALHOST, forwardedPort1);
+            System.err.println("URL: http://localhost:" + forwardedPort2);
+
+            final CountDownLatch latch = new CountDownLatch(nbThread * nbDownloads * nbLoops);
+            final Thread[] threads = new Thread[nbThread];
+            final List<Throwable> errors = new CopyOnWriteArrayList<Throwable>();
+            for (int i = 0; i < threads.length; i++) {
+                threads[i] = new Thread(getCurrentTestName() + "[" + i + "]") {
+                    @Override
+                    public void run() {
+                        for (int j = 0; j < nbLoops; j++) {
+                            final MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
+                            final HttpClient client = new HttpClient(mgr);
+                            client.getHttpConnectionManager().getParams().setDefaultMaxConnectionsPerHost(100);
+                            client.getHttpConnectionManager().getParams().setMaxTotalConnections(1000);
+                            for (int i = 0; i < nbDownloads; i++) {
+                                try {
+                                    checkHtmlPage(client, new URL("http://localhost:" + forwardedPort2 + path));
+                                } catch (Throwable e) {
+                                    errors.add(e);
+                                } finally {
+                                    latch.countDown();
+                                    System.err.println("Remaining: " + latch.getCount());
+                                }
+                            }
+                            mgr.shutdown();
+                        }
+                    }
+                };
+            }
+            for (int i = 0; i < threads.length; i++) {
+                threads[i].start();
+            }
+            latch.await();
+            for (Throwable t : errors) {
+                t.printStackTrace();
+            }
+            assertEquals(0, errors.size());
+        } finally {
+            session.disconnect();
+        }
+    }
+
+    protected Session createSession() throws JSchException {
+        JSch sch = new JSch();
+        Session session = sch.getSession("sshd", TEST_LOCALHOST, sshPort);
+        session.setUserInfo(new SimpleUserInfo("sshd"));
+        session.connect();
+        return session;
+    }
+
+    protected void checkHtmlPage(HttpClient client, URL url) throws IOException {
+        client.setHostConfiguration(new HostConfiguration());
+        client.getHostConfiguration().setHost(url.getHost(), url.getPort());
+        GetMethod get = new GetMethod("");
+        get.getParams().setVersion(HttpVersion.HTTP_1_1);
+        client.executeMethod(get);
+        String str = get.getResponseBodyAsString();
+        if (str.indexOf("</html>") <= 0) {
+            System.err.println(str);
+        }
+        assertTrue((str.indexOf("</html>") > 0));
+        get.releaseConnection();
+//        url.openConnection().setDefaultUseCaches(false);
+//        Reader reader = new BufferedReader(new InputStreamReader(url.openStream()));
+//        try {
+//            StringWriter sw = new StringWriter();
+//            char[] buf = new char[8192];
+//            while (true) {
+//                int len = reader.read(buf);
+//                if (len < 0) {
+//                    break;
+//                }
+//                sw.write(buf, 0, len);
+//            }
+//            assertTrue(sw.toString().indexOf("</html>") > 0);
+//        } finally {
+//            reader.close();
+//        }
+    }
+
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java
new file mode 100644
index 0000000..0cd71e0
--- /dev/null
+++ b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.forward;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.service.IoAcceptor;
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.apache.sshd.client.SshClient;
+import org.apache.sshd.client.channel.ChannelDirectTcpip;
+import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.PropertyResolverUtils;
+import org.apache.sshd.common.SshdSocketAddress;
+import org.apache.sshd.common.forward.TcpipForwarder;
+import org.apache.sshd.common.forward.TcpipForwarderFactory;
+import org.apache.sshd.common.session.ConnectionService;
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.ValidateUtils;
+import org.apache.sshd.server.SshServer;
+import org.apache.sshd.server.forward.AcceptAllForwardingFilter;
+import org.apache.sshd.server.global.CancelTcpipForwardHandler;
+import org.apache.sshd.server.global.TcpipForwardHandler;
+import org.apache.sshd.util.test.BaseTestSupport;
+import org.apache.sshd.util.test.JSchLogger;
+import org.apache.sshd.util.test.SimpleUserInfo;
+import org.apache.sshd.util.test.Utils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+import org.slf4j.LoggerFactory;
+
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+
+/**
+ * Port forwarding tests
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class PortForwardingTest extends BaseTestSupport {
+
+    private final org.slf4j.Logger log = LoggerFactory.getLogger(getClass());
+
+    private final BlockingQueue<String> requestsQ = new LinkedBlockingDeque<String>();
+
+    private SshServer sshd;
+    private int sshPort;
+    private int echoPort;
+    private IoAcceptor acceptor;
+    private SshClient client;
+
+    public PortForwardingTest() {
+        super();
+    }
+
+    @BeforeClass
+    public static void jschInit() {
+        JSchLogger.init();
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        sshd = setupTestServer();
+        PropertyResolverUtils.updateProperty(sshd, FactoryManager.WINDOW_SIZE, 2048);
+        PropertyResolverUtils.updateProperty(sshd, FactoryManager.MAX_PACKET_SIZE, 256);
+        sshd.setTcpipForwardingFilter(AcceptAllForwardingFilter.INSTANCE);
+        sshd.start();
+
+        if (!requestsQ.isEmpty()) {
+            requestsQ.clear();
+        }
+
+        final TcpipForwarderFactory factory = ValidateUtils.checkNotNull(sshd.getTcpipForwarderFactory(), "No TcpipForwarderFactory");
+        sshd.setTcpipForwarderFactory(new TcpipForwarderFactory() {
+            private final Class<?>[] interfaces = {TcpipForwarder.class};
+            private final Map<String, String> method2req = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER) {
+                private static final long serialVersionUID = 1L;    // we're not serializing it...
+
+                {
+                    put("localPortForwardingRequested", TcpipForwardHandler.REQUEST);
+                    put("localPortForwardingCancelled", CancelTcpipForwardHandler.REQUEST);
+                }
+            };
+
+            @Override
+            public TcpipForwarder create(ConnectionService service) {
+                Thread thread = Thread.currentThread();
+                ClassLoader cl = thread.getContextClassLoader();
+
+                final TcpipForwarder forwarder = factory.create(service);
+                return (TcpipForwarder) Proxy.newProxyInstance(cl, interfaces, new InvocationHandler() {
+                    @SuppressWarnings("synthetic-access")
+                    @Override
+                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+                        Object result = method.invoke(forwarder, args);
+                        String name = method.getName();
+                        String request = method2req.get(name);
+                        if (GenericUtils.length(request) > 0) {
+                            if (requestsQ.offer(request)) {
+                                log.info("Signal " + request);
+                            } else {
+                                log.error("Failed to offer request=" + request);
+                            }
+                        }
+                        return result;
+                    }
+                });
+            }
+        });
+        sshPort = sshd.getPort();
+
+        NioSocketAcceptor acceptor = new NioSocketAcceptor();
+        acceptor.setHandler(new IoHandlerAdapter() {
+            @Override
+            public void messageReceived(IoSession session, Object message) throws Exception {
+                IoBuffer recv = (IoBuffer) message;
+                IoBuffer sent = IoBuffer.allocate(recv.remaining());
+                sent.put(recv);
+                sent.flip();
+                session.write(sent);
+            }
+        });
+        acceptor.setReuseAddress(true);
+        acceptor.bind(new InetSocketAddress(0));
+        echoPort = acceptor.getLocalAddress().getPort();
+        this.acceptor = acceptor;
+    }
+
+    private void waitForForwardingRequest(String expected, long timeout) throws InterruptedException {
+        for (long remaining = timeout; remaining > 0L; ) {
+            long waitStart = System.currentTimeMillis();
+            String actual = requestsQ.poll(remaining, TimeUnit.MILLISECONDS);
+            long waitEnd = System.currentTimeMillis();
+            if (GenericUtils.isEmpty(actual)) {
+                throw new IllegalStateException("Failed to retrieve request=" + expected);
+            }
+
+            if (expected.equals(actual)) {
+                return;
+            }
+
+            long waitDuration = waitEnd - waitStart;
+            remaining -= waitDuration;
+        }
+
+        throw new IllegalStateException("Timeout while waiting to retrieve request=" + expected);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (sshd != null) {
+            sshd.stop(true);
+        }
+        if (acceptor != null) {
+            acceptor.dispose(true);
+        }
+        if (client != null) {
+            client.stop();
+        }
+    }
+
+    @Test
+    public void testRemoteForwarding() throws Exception {
+        Session session = createSession();
+        try {
+            int forwardedPort = Utils.getFreePort();
+            session.setPortForwardingR(forwardedPort, TEST_LOCALHOST, echoPort);
+            waitForForwardingRequest(TcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
+
+            try (Socket s = new Socket(TEST_LOCALHOST, forwardedPort);
+                 OutputStream output = s.getOutputStream();
+                 InputStream input = s.getInputStream()) {
+
+                s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
+
+                String expected = getCurrentTestName();
+                byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
+                output.write(bytes);
+                output.flush();
+
+                byte[] buf = new byte[bytes.length + Long.SIZE];
+                int n = input.read(buf);
+                String res = new String(buf, 0, n, StandardCharsets.UTF_8);
+                assertEquals("Mismatched data", expected, res);
+            } finally {
+                session.delPortForwardingR(forwardedPort);
+            }
+        } finally {
+            session.disconnect();
+        }
+    }
+
+    @Test
+    public void testRemoteForwardingSecondTimeInSameSession() throws Exception {
+        Session session = createSession();
+        try {
+            int forwardedPort = Utils.getFreePort();
+            session.setPortForwardingR(forwardedPort, TEST_LOCALHOST, echoPort);
+            waitForForwardingRequest(TcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
+
+            session.delPortForwardingR(TEST_LOCALHOST, forwardedPort);
+            waitForForwardingRequest(CancelTcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
+
+            session.setPortForwardingR(forwardedPort, TEST_LOCALHOST, echoPort);
+            waitForForwardingRequest(TcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
+
+            try (Socket s = new Socket(TEST_LOCALHOST, forwardedPort);
+                 OutputStream output = s.getOutputStream();
+                 InputStream input = s.getInputStream()) {
+
+                s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
+
+                String expected = getCurrentTestName();
+                byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
+                output.write(bytes);
+                output.flush();
+
+                byte[] buf = new byte[bytes.length + Long.SIZE];
+                int n = input.read(buf);
+                String res = new String(buf, 0, n, StandardCharsets.UTF_8);
+                assertEquals("Mismatched data", expected, res);
+            } finally {
+                session.delPortForwardingR(TEST_LOCALHOST, forwardedPort);
+            }
+        } finally {
+            session.disconnect();
+        }
+    }
+
+    @Test
+    public void testRemoteForwardingNative() throws Exception {
+        try (ClientSession session = createNativeSession()) {
+            SshdSocketAddress remote = new SshdSocketAddress("", 0);
+            SshdSocketAddress local = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
+            SshdSocketAddress bound = session.startRemotePortForwarding(remote, local);
+
+            try (Socket s = new Socket(bound.getHostName(), bound.getPort());
+                 OutputStream output = s.getOutputStream();
+                 InputStream input = s.getInputStream()) {
+
+                s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
+
+                String expected = getCurrentTestName();
+                byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
+                output.write(bytes);
+                output.flush();
+
+                byte[] buf = new byte[bytes.length + Long.SIZE];
+                int n = input.read(buf);
+                String res = new String(buf, 0, n);
+                assertEquals("Mismatched data", expected, res);
+            } finally {
+                session.stopRemotePortForwarding(remote);
+            }
+        }
+    }
+
+    @Test
+    public void testRemoteForwardingNativeBigPayload() throws Exception {
+        try (ClientSession session = createNativeSession()) {
+            SshdSocketAddress remote = new SshdSocketAddress("", 0);
+            SshdSocketAddress local = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
+            SshdSocketAddress bound = session.startRemotePortForwarding(remote, local);
+
+            try (Socket s = new Socket(bound.getHostName(), bound.getPort());
+                 OutputStream output = s.getOutputStream();
+                 InputStream input = s.getInputStream()) {
+
+                s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
+
+                String expected = getCurrentTestName();
+                byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
+                byte[] buf = new byte[bytes.length + Long.SIZE];
+
+                for (int i = 0; i < 1000; i++) {
+                    output.write(bytes);
+                    output.flush();
+
+                    int n = input.read(buf);
+                    String res = new String(buf, 0, n);
+                    assertEquals("Mismatched data at iteration #" + i, expected, res);
+                }
+            } finally {
+                session.stopRemotePortForwarding(remote);
+            }
+        }
+    }
+
+    @Test
+    public void testLocalForwarding() throws Exception {
+        Session session = createSession();
+        try {
+            int forwardedPort = Utils.getFreePort();
+            session.setPortForwardingL(forwardedPort, TEST_LOCALHOST, echoPort);
+
+            try (Socket s = new Socket(TEST_LOCALHOST, forwardedPort);
+                 OutputStream output = s.getOutputStream();
+                 InputStream input = s.getInputStream()) {
+
+                s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
+
+                String expected = getCurrentTestName();
+                byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
+
+                output.write(bytes);
+                output.flush();
+
+                byte[] buf = new byte[bytes.length + Long.SIZE];
+                int n = input.read(buf);
+                String res = new String(buf, 0, n);
+                assertEquals("Mismatched data", expected, res);
+            } finally {
+                session.delPortForwardingL(forwardedPort);
+            }
+        } finally {
+            session.disconnect();
+        }
+    }
+
+    @Test
+    public void testLocalForwardingNative() throws Exception {
+        try (ClientSession session = createNativeSession()) {
+            SshdSocketAddress local = new SshdSocketAddress("", 0);
+            SshdSocketAddress remote = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
+            SshdSocketAddress bound = session.startLocalPortForwarding(local, remote);
+
+            try (Socket s = new Socket(bound.getHostName(), bound.getPort());
+                 OutputStream output = s.getOutputStream();
+                 InputStream input = s.getInputStream()) {
+
+                s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
+
+                String expected = getCurrentTestName();
+                byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
+
+                output.write(bytes);
+                output.flush();
+
+                byte[] buf = new byte[bytes.length + Long.SIZE];
+                int n = input.read(buf);
+                String res = new String(buf, 0, n);
+                assertEquals("Mismatched data", expected, res);
+            } finally {
+                session.stopLocalPortForwarding(bound);
+            }
+        }
+    }
+
+    @Test
+    public void testLocalForwardingNativeReuse() throws Exception {
+        try (ClientSession session = createNativeSession()) {
+            SshdSocketAddress local = new SshdSocketAddress("", 0);
+            SshdSocketAddress remote = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
+            SshdSocketAddress bound = session.startLocalPortForwarding(local, remote);
+
+            session.stopLocalPortForwarding(bound);
+
+            SshdSocketAddress bound2 = session.startLocalPortForwarding(local, remote);
+            session.stopLocalPortForwarding(bound2);
+        }
+    }
+
+    @Test
+    public void testLocalForwardingNativeBigPayload() throws Exception {
+        try (ClientSession session = createNativeSession()) {
+            String expected = getCurrentTestName();
+            byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
+            byte[] buf = new byte[bytes.length + Long.SIZE];
+
+            SshdSocketAddress local = new SshdSocketAddress("", 0);
+            SshdSocketAddress remote = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
+            SshdSocketAddress bound = session.startLocalPortForwarding(local, remote);
+            try (Socket s = new Socket(bound.getHostName(), bound.getPort());
+                 OutputStream output = s.getOutputStream();
+                 InputStream input = s.getInputStream()) {
+
+                s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
+
+                for (int i = 0; i < 1000; i++) {
+                    output.write(bytes);
+                    output.flush();
+
+                    int n = input.read(buf);
+                    String res = new String(buf, 0, n);
+                    assertEquals("Mismatched data at iteration #" + i, expected, res);
+                }
+            } finally {
+                session.stopLocalPortForwarding(bound);
+            }
+        }
+    }
+
+    @Test
+    public void testForwardingChannel() throws Exception {
+        try (ClientSession session = createNativeSession()) {
+            SshdSocketAddress local = new SshdSocketAddress("", 0);
+            SshdSocketAddress remote = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
+
+            try (ChannelDirectTcpip channel = session.createDirectTcpipChannel(local, remote)) {
+                channel.open().verify(9L, TimeUnit.SECONDS);
+
+                String expected = getCurrentTestName();
+                byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
+
+                try (OutputStream output = channel.getInvertedIn();
+                     InputStream input = channel.getInvertedOut()) {
+                    output.write(bytes);
+                    output.flush();
+
+                    byte[] buf = new byte[bytes.length + Long.SIZE];
+                    int n = input.read(buf);
+                    String res = new String(buf, 0, n);
+                    assertEquals("Mismatched data", expected, res);
+                }
+                channel.close(false);
+            }
+        }
+    }
+
+    @Test(timeout = 45000)
+    public void testRemoteForwardingWithDisconnect() throws Exception {
+        Session session = createSession();
+        try {
+            // 1. Create a Port Forward
+            int forwardedPort = Utils.getFreePort();
+            session.setPortForwardingR(forwardedPort, TEST_LOCALHOST, echoPort);
+            waitForForwardingRequest(TcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
+
+            // 2. Establish a connection through it
+            try (Socket s = new Socket(TEST_LOCALHOST, forwardedPort)) {
+                s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
+
+                // 3. Simulate the client going away
+                rudelyDisconnectJschSession(session);
+
+                // 4. Make sure the NIOprocessor is not stuck
+                {
+                    Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
+                    // from here, we need to check all the threads running and find a
+                    // "NioProcessor-"
+                    // that is stuck on a PortForward.dispose
+                    ThreadGroup root = Thread.currentThread().getThreadGroup().getParent();
+                    while (root.getParent() != null) {
+                        root = root.getParent();
+                    }
+
+                    for (int index = 0; ; index++) {
+                        Collection<Thread> pending = findThreads(root, "NioProcessor-");
+                        if (GenericUtils.size(pending) <= 0) {
+                            log.info("Finished after " + index + " iterations");
+                            break;
+                        }
+                        try {
+                            Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
+                        } catch (InterruptedException e) {
+                            // ignored
+                        }
+                    }
+                }
+
+                session.delPortForwardingR(forwardedPort);
+            }
+        } finally {
+            session.disconnect();
+        }
+    }
+
+    /**
+     * Close the socket inside this JSCH session. Use reflection to find it and
+     * just close it.
+     *
+     * @param session the Session to violate
+     * @throws Exception
+     */
+    private void rudelyDisconnectJschSession(Session session) throws Exception {
+        Field fSocket = session.getClass().getDeclaredField("socket");
+        fSocket.setAccessible(true);
+
+        try (Socket socket = (Socket) fSocket.get(session)) {
+            assertTrue("socket is not connected", socket.isConnected());
+            assertFalse("socket should not be closed", socket.isClosed());
+            socket.close();
+            assertTrue("socket has not closed", socket.isClosed());
+        }
+    }
+
+    private Set<Thread> findThreads(ThreadGroup group, String name) {
+        int numThreads = group.activeCount();
+        Thread[] threads = new Thread[numThreads * 2];
+        numThreads = group.enumerate(threads, false);
+        Set<Thread> ret = new HashSet<Thread>();
+
+        // Enumerate each thread in `group'
+        for (int i = 0; i < numThreads; ++i) {
+            Thread t = threads[i];
+            // Get thread
+            // log.debug("Thread name: " + threads[i].getName());
+            if (checkThreadForPortForward(t, name)) {
+                ret.add(t);
+            }
+        }
+        // didn't find the thread to check the
+        int numGroups = group.activeGroupCount();
+        ThreadGroup[] groups = new ThreadGroup[numGroups * 2];
+        numGroups = group.enumerate(groups, false);
+        for (int i = 0; i < numGroups; ++i) {
+            ThreadGroup g = groups[i];
+            Collection<Thread> c = findThreads(g, name);
+            if (GenericUtils.isEmpty(c)) {
+                continue;   // debug breakpoint
+            }
+            ret.addAll(c);
+        }
+        return ret;
+    }
+
+    private boolean checkThreadForPortForward(Thread thread, String name) {
+        if (thread == null)
+            return false;
+        // does it contain the name we're looking for?
+        if (thread.getName().contains(name)) {
+            // look at the stack
+            StackTraceElement[] stack = thread.getStackTrace();
+            if (stack.length == 0)
+                return false;
+            else {
+                // does it have
+                // 'org.apache.sshd.server.session.TcpipForwardSupport.close'?
+                for (int i = 0; i < stack.length; ++i) {
+                    String clazzName = stack[i].getClassName();
+                    String methodName = stack[i].getMethodName();
+                    // log.debug("Class: " + clazzName);
+                    // log.debug("Method: " + methodName);
+                    if (clazzName
+                            .equals("org.apache.sshd.server.session.TcpipForwardSupport")
+                            && (methodName.equals("close") || methodName
+                            .equals("sessionCreated"))) {
+                        log.warn(thread.getName() + " stuck at " + clazzName
+                                + "." + methodName + ": "
+                                + stack[i].getLineNumber());
+                        return true;
+                    }
+                }
+            }
+        }
+        return false;
+    }
+
+    protected Session createSession() throws JSchException {
+        JSch sch = new JSch();
+        Session session = sch.getSession(getCurrentTestName(), TEST_LOCALHOST, sshPort);
+        session.setUserInfo(new SimpleUserInfo(getCurrentTestName()));
+        session.connect();
+        return session;
+    }
+
+    protected ClientSession createNativeSession() throws Exception {
+        client = setupTestClient();
+        PropertyResolverUtils.updateProperty(client, FactoryManager.WINDOW_SIZE, 2048);
+        PropertyResolverUtils.updateProperty(client, FactoryManager.MAX_PACKET_SIZE, 256);
+        client.setTcpipForwardingFilter(AcceptAllForwardingFilter.INSTANCE);
+        client.start();
+
+        ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, sshPort).verify(7L, TimeUnit.SECONDS).getSession();
+        session.addPasswordIdentity(getCurrentTestName());
+        session.auth().verify(11L, TimeUnit.SECONDS);
+        return session;
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/test/java/org/apache/sshd/util/test/BogusInvertedShell.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/util/test/BogusInvertedShell.java b/sshd-core/src/test/java/org/apache/sshd/util/test/BogusInvertedShell.java
index 3954bca..c99f4ce 100644
--- a/sshd-core/src/test/java/org/apache/sshd/util/test/BogusInvertedShell.java
+++ b/sshd-core/src/test/java/org/apache/sshd/util/test/BogusInvertedShell.java
@@ -27,9 +27,10 @@ import java.util.Map;
 import org.apache.sshd.common.util.io.IoUtils;
 import org.apache.sshd.server.Environment;
 import org.apache.sshd.server.session.ServerSession;
+import org.apache.sshd.server.session.ServerSessionHolder;
 import org.apache.sshd.server.shell.InvertedShell;
 
-public class BogusInvertedShell implements InvertedShell {
+public class BogusInvertedShell implements InvertedShell, ServerSessionHolder {
 
     private final OutputStream in;
     private final InputStream out;
@@ -48,6 +49,11 @@ public class BogusInvertedShell implements InvertedShell {
     }
 
     @Override
+    public ServerSession getServerSession() {
+        return session;
+    }
+
+    @Override
     public void setSession(ServerSession session) {
         this.session = session;
     }


[3/3] mina-sshd git commit: Upgraded surefire plugin version to 2.9.1

Posted by lg...@apache.org.
Upgraded surefire plugin version to 2.9.1


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/e980eb63
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/e980eb63
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/e980eb63

Branch: refs/heads/master
Commit: e980eb63d3a1aef171a3b4ad0154c0a4066f018b
Parents: f45ee0e
Author: Lyor Goldstein <lg...@vmware.com>
Authored: Thu Jan 7 10:34:53 2016 +0200
Committer: Lyor Goldstein <lg...@vmware.com>
Committed: Thu Jan 7 10:34:53 2016 +0200

----------------------------------------------------------------------
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e980eb63/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d36ca17..1615d83 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,7 +103,7 @@
         <spring.version>3.0.6.RELEASE</spring.version>
         <jgit.version>3.4.1.201406201815-r</jgit.version>
         <junit.version>4.12</junit.version>
-        <surefire.plugin.version>2.19</surefire.plugin.version>
+        <surefire.plugin.version>2.19.1</surefire.plugin.version>
         <httpcomps.version>4.4.1</httpcomps.version>
     </properties>
 


[2/3] mina-sshd git commit: Added timeout for global response method to avoid infinite waits

Posted by lg...@apache.org.
Added timeout for global response method to avoid infinite waits


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/f45ee0ef
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/f45ee0ef
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/f45ee0ef

Branch: refs/heads/master
Commit: f45ee0ef4ed742d4d620b95169736200422c3544
Parents: 1e09776
Author: Lyor Goldstein <lg...@vmware.com>
Authored: Thu Jan 7 10:34:36 2016 +0200
Committer: Lyor Goldstein <lg...@vmware.com>
Committed: Thu Jan 7 10:34:36 2016 +0200

----------------------------------------------------------------------
 .../common/forward/DefaultTcpipForwarder.java   |  45 +-
 .../sshd/common/future/AbstractSshFuture.java   |   5 -
 .../sshd/common/future/DefaultSshFuture.java    |   5 +-
 .../sshd/common/session/AbstractSession.java    |  63 +-
 .../org/apache/sshd/common/session/Session.java |  15 +-
 .../apache/sshd/common/util/GenericUtils.java   |   6 +
 .../src/test/java/org/apache/sshd/LoadTest.java |   4 +
 .../org/apache/sshd/PortForwardingLoadTest.java | 438 -------------
 .../org/apache/sshd/PortForwardingTest.java     | 610 -------------------
 .../apache/sshd/SinglePublicKeyAuthTest.java    | 181 ------
 .../common/auth/SinglePublicKeyAuthTest.java    | 181 ++++++
 .../common/forward/PortForwardingLoadTest.java  | 442 ++++++++++++++
 .../sshd/common/forward/PortForwardingTest.java | 610 +++++++++++++++++++
 .../sshd/util/test/BogusInvertedShell.java      |   8 +-
 14 files changed, 1346 insertions(+), 1267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
index e177aa9..c94a380 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
@@ -27,12 +27,14 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.sshd.client.channel.ClientChannel;
 import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.Factory;
 import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.PropertyResolverUtils;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.SshdSocketAddress;
@@ -44,6 +46,7 @@ import org.apache.sshd.common.io.IoServiceFactory;
 import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.session.Session;
+import org.apache.sshd.common.session.SessionHolder;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.Readable;
 import org.apache.sshd.common.util.ValidateUtils;
@@ -53,11 +56,26 @@ import org.apache.sshd.common.util.closeable.AbstractInnerCloseable;
 import org.apache.sshd.server.forward.ForwardingFilter;
 
 /**
- * TODO Add javadoc
+ * Requests a &quot;tcpip-forward&quot; action
  *
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public class DefaultTcpipForwarder extends AbstractInnerCloseable implements TcpipForwarder {
+public class DefaultTcpipForwarder
+        extends AbstractInnerCloseable
+        implements TcpipForwarder, SessionHolder<Session> {
+
+    /**
+     * Used to configure the timeout (milliseconds) for receiving a response
+     * for the forwarding request
+     *
+     * @see #DEFAULT_FORWARD_REQUEST_TIMEOUT
+     */
+    public static final String FORWARD_REQUEST_TIMEOUT = "tcpip-forward-request-timeout";
+
+    /**
+     * Default value for {@link #FORWARD_REQUEST_TIMEOUT} if none specified
+     */
+    public static final long DEFAULT_FORWARD_REQUEST_TIMEOUT = TimeUnit.SECONDS.toMillis(15L);
 
     private final ConnectionService service;
     private final IoHandlerFactory socksProxyIoHandlerFactory = new IoHandlerFactory() {
@@ -66,7 +84,7 @@ public class DefaultTcpipForwarder extends AbstractInnerCloseable implements Tcp
             return new SocksProxy(getConnectionService());
         }
     };
-    private final Session session;
+    private final Session sessionInstance;
     private final Map<Integer, SshdSocketAddress> localToRemote = new HashMap<>();
     private final Map<Integer, SshdSocketAddress> remoteToLocal = new HashMap<>();
     private final Map<Integer, SocksProxy> dynamicLocal = new HashMap<>();
@@ -81,7 +99,12 @@ public class DefaultTcpipForwarder extends AbstractInnerCloseable implements Tcp
 
     public DefaultTcpipForwarder(ConnectionService service) {
         this.service = ValidateUtils.checkNotNull(service, "No connection service");
-        this.session = ValidateUtils.checkNotNull(service.getSession(), "No session");
+        this.sessionInstance = ValidateUtils.checkNotNull(service.getSession(), "No session");
+    }
+
+    @Override
+    public Session getSession() {
+        return sessionInstance;
     }
 
     public final ConnectionService getConnectionService() {
@@ -151,12 +174,15 @@ public class DefaultTcpipForwarder extends AbstractInnerCloseable implements Tcp
 
         String remoteHost = remote.getHostName();
         int remotePort = remote.getPort();
+        Session session = getSession();
         Buffer buffer = session.createBuffer(SshConstants.SSH_MSG_GLOBAL_REQUEST, remoteHost.length() + Long.SIZE);
         buffer.putString("tcpip-forward");
         buffer.putBoolean(true);    // want reply
         buffer.putString(remoteHost);
         buffer.putInt(remotePort);
-        Buffer result = session.request(buffer);
+
+        long timeout = PropertyResolverUtils.getLongProperty(session, FORWARD_REQUEST_TIMEOUT, DEFAULT_FORWARD_REQUEST_TIMEOUT);
+        Buffer result = session.request(buffer, timeout, TimeUnit.MILLISECONDS);
         if (result == null) {
             throw new SshException("Tcpip forwarding request denied by server");
         }
@@ -192,6 +218,7 @@ public class DefaultTcpipForwarder extends AbstractInnerCloseable implements Tcp
             }
 
             String remoteHost = remote.getHostName();
+            Session session = getSession();
             Buffer buffer = session.createBuffer(SshConstants.SSH_MSG_GLOBAL_REQUEST, remoteHost.length() + Long.SIZE);
             buffer.putString("cancel-tcpip-forward");
             buffer.putBoolean(false);   // want reply
@@ -269,7 +296,8 @@ public class DefaultTcpipForwarder extends AbstractInnerCloseable implements Tcp
         ValidateUtils.checkNotNull(local, "Local address is null");
         ValidateUtils.checkTrue(local.getPort() >= 0, "Invalid local port: %s", local);
 
-        FactoryManager manager = session.getFactoryManager();
+        Session session = getSession();
+        FactoryManager manager = ValidateUtils.checkNotNull(session.getFactoryManager(), "No factory manager");
         ForwardingFilter filter = manager.getTcpipForwardingFilter();
         if ((filter == null) || (!filter.canListen(local, session))) {
             if (log.isDebugEnabled()) {
@@ -330,7 +358,8 @@ public class DefaultTcpipForwarder extends AbstractInnerCloseable implements Tcp
      */
     private InetSocketAddress doBind(SshdSocketAddress address, Factory<? extends IoHandler> handlerFactory) throws IOException {
         if (acceptor == null) {
-            FactoryManager manager = session.getFactoryManager();
+            Session session = getSession();
+            FactoryManager manager = ValidateUtils.checkNotNull(session.getFactoryManager(), "No factory manager");
             IoServiceFactory factory = manager.getIoServiceFactory();
             IoHandler handler = handlerFactory.create();
             acceptor = factory.createAcceptor(handler);
@@ -365,7 +394,7 @@ public class DefaultTcpipForwarder extends AbstractInnerCloseable implements Tcp
 
     @Override
     public String toString() {
-        return getClass().getSimpleName() + "[" + session + "]";
+        return getClass().getSimpleName() + "[" + getSession() + "]";
     }
 
     //

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/main/java/org/apache/sshd/common/future/AbstractSshFuture.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/future/AbstractSshFuture.java b/sshd-core/src/main/java/org/apache/sshd/common/future/AbstractSshFuture.java
index 2f179e7..8efd65f 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/future/AbstractSshFuture.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/future/AbstractSshFuture.java
@@ -38,11 +38,6 @@ public abstract class AbstractSshFuture<T extends SshFuture> extends AbstractLog
      */
     protected static final Object CANCELED = new Object();
 
-    /**
-     * A value indicating a {@code null} value
-     */
-    protected static final Object NULL = new Object();
-
     protected AbstractSshFuture() {
         super();
     }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java b/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
index 9e25bcc..8f31339 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
@@ -21,6 +21,7 @@ package org.apache.sshd.common.future;
 import java.io.InterruptedIOException;
 import java.lang.reflect.Array;
 
+import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.ValidateUtils;
 
 /**
@@ -96,7 +97,7 @@ public class DefaultSshFuture<T extends SshFuture> extends AbstractSshFuture<T>
                 return;
             }
 
-            result = newValue != null ? newValue : NULL;
+            result = (newValue != null) ? newValue : GenericUtils.NULL;
             lock.notifyAll();
         }
 
@@ -109,7 +110,7 @@ public class DefaultSshFuture<T extends SshFuture> extends AbstractSshFuture<T>
      */
     public Object getValue() {
         synchronized (lock) {
-            return result == NULL ? null : result;
+            return (result == GenericUtils.NULL) ? null : result;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
index 92f5843..e5eaa54 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
@@ -20,6 +20,7 @@ package org.apache.sshd.common.session;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.net.SocketTimeoutException;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 import java.util.Collections;
@@ -171,7 +172,6 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
     protected final Object encodeLock = new Object();
     protected final Object decodeLock = new Object();
     protected final Object requestLock = new Object();
-    protected final AtomicReference<Buffer> requestResult = new AtomicReference<>();
     protected final Map<AttributeKey<?>, Object> attributes = new ConcurrentHashMap<>();
 
     // Session timeout measurements
@@ -210,6 +210,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
      */
     private final FactoryManager factoryManager;
     private final Map<String, Object> properties = new ConcurrentHashMap<>();
+    private final AtomicReference<Object> requestResult = new AtomicReference<>();
 
     /**
      * Create a new session.
@@ -771,6 +772,12 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
             }
         }
 
+        // if anyone waiting for global response notify them about the closing session
+        synchronized (requestResult) {
+            requestResult.set(GenericUtils.NULL);
+            requestResult.notify();
+        }
+
         // Fire 'close' event
         SessionListener listener = getSessionListenerProxy();
         try {
@@ -934,28 +941,54 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
         return count;
     }
 
-    /**
-     * Send a global request and wait for the response.
-     * This must only be used when sending a SSH_MSG_GLOBAL_REQUEST with a result expected,
-     * else it will wait forever.
-     *
-     * @param buffer the buffer containing the global request
-     * @return <code>true</code> if the request was successful, <code>false</code> otherwise.
-     * @throws IOException if an error occurred when encoding sending the packet
-     */
     @Override
-    public Buffer request(Buffer buffer) throws IOException {
+    public Buffer request(Buffer buffer, long timeout, TimeUnit unit) throws IOException {
+        ValidateUtils.checkTrue(timeout > 0L, "Non-positive timeout requested: %d", timeout);
+
+        long maxWaitMillis = TimeUnit.MILLISECONDS.convert(timeout, unit);
+        if (maxWaitMillis <= 0L) {
+            throw new IllegalArgumentException("Requested timeout below 1 msec: " + timeout + " " + unit);
+        }
+
+        Object result;
         synchronized (requestLock) {
             try {
+                writePacket(buffer);
+
                 synchronized (requestResult) {
-                    writePacket(buffer);
-                    requestResult.wait();
-                    return requestResult.get();
+                    while (isOpen() && (maxWaitMillis > 0L) && (requestResult.get() == null)) {
+                        long waitStart = System.nanoTime();
+                        requestResult.wait(maxWaitMillis);
+                        long waitEnd = System.nanoTime();
+                        long waitDuration = waitEnd - waitStart;
+                        long waitMillis = TimeUnit.NANOSECONDS.toMillis(waitDuration);
+                        if (waitMillis > 0L) {
+                            maxWaitMillis -= waitMillis;
+                        } else {
+                            maxWaitMillis--;
+                        }
+                    }
+
+                    result = requestResult.getAndSet(null);
                 }
             } catch (InterruptedException e) {
                 throw (InterruptedIOException) new InterruptedIOException("Interrupted while waiting for request result").initCause(e);
             }
         }
+
+        if (!isOpen()) {
+            throw new IOException("Session is closed or closing");
+        }
+
+        if (result == null) {
+            throw new SocketTimeoutException("No response received after " + timeout + " " + unit);
+        }
+
+        if (result instanceof Buffer) {
+            return (Buffer) result;
+        }
+
+        return null;
     }
 
     @Override
@@ -1690,7 +1723,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
 
     protected void requestFailure(Buffer buffer) throws Exception {
         synchronized (requestResult) {
-            requestResult.set(null);
+            requestResult.set(GenericUtils.NULL);
             resetIdleTimeout();
             requestResult.notify();
         }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
index 1138310..d47553b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
@@ -182,7 +182,7 @@ public interface Session
      *
      * @param buffer the buffer to encode and send
      * @return a future that can be used to check when the packet has actually been sent
-     * @throws java.io.IOException if an error occurred when encoding sending the packet
+     * @throws IOException if an error occurred when encoding sending the packet
      */
     IoWriteFuture writePacket(Buffer buffer) throws IOException;
 
@@ -196,20 +196,21 @@ public interface Session
      * @param timeout the timeout
      * @param unit    the time unit of the timeout parameter
      * @return a future that can be used to check when the packet has actually been sent
-     * @throws java.io.IOException if an error occurred when encoding sending the packet
+     * @throws IOException if an error occurred when encoding sending the packet
      */
     IoWriteFuture writePacket(Buffer buffer, long timeout, TimeUnit unit) throws IOException;
 
     /**
-     * Send a global request and wait for the response.
-     * This must only be used when sending a SSH_MSG_GLOBAL_REQUEST with a result expected,
-     * else it will wait forever.
+     * Send a global request and wait for the response. This must only be used when sending
+     * a {@code SSH_MSG_GLOBAL_REQUEST} with a result expected, else it will time out
      *
      * @param buffer the buffer containing the global request
+     * @param timeout The number of time units to wait - must be <U>positive</U>
+     * @param unit The {@link TimeUnit} to wait for the response
      * @return the return buffer if the request was successful, {@code null} otherwise.
-     * @throws java.io.IOException if an error occurred when encoding sending the packet
+     * @throws IOException if an error occurred when encoding sending the packet
      */
-    Buffer request(Buffer buffer) throws IOException;
+    Buffer request(Buffer buffer, long timeout, TimeUnit unit) throws IOException;
 
     /**
      * Handle any exceptions that occurred on this session.

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/main/java/org/apache/sshd/common/util/GenericUtils.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/GenericUtils.java b/sshd-core/src/main/java/org/apache/sshd/common/util/GenericUtils.java
index ef06c54..988254b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/GenericUtils.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/GenericUtils.java
@@ -54,6 +54,12 @@ public final class GenericUtils {
     public static final Object[] EMPTY_OBJECT_ARRAY = {};
 
     /**
+     * A value indicating a {@code null} value - to be used as a placeholder
+     * where {@code null}s are not allowed
+     */
+    public static final Object NULL = new Object();
+
+    /**
      * The complement of {@link String#CASE_INSENSITIVE_ORDER}
      */
     public static final Comparator<String> CASE_SENSITIVE_ORDER = new Comparator<String>() {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/test/java/org/apache/sshd/LoadTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/LoadTest.java b/sshd-core/src/test/java/org/apache/sshd/LoadTest.java
index 4d900c7..63272bd 100644
--- a/sshd-core/src/test/java/org/apache/sshd/LoadTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/LoadTest.java
@@ -54,6 +54,10 @@ public class LoadTest extends BaseTestSupport {
     private SshServer sshd;
     private int port;
 
+    public LoadTest() {
+        super();
+    }
+
     @Before
     public void setUp() throws Exception {
         sshd = setupTestServer();

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/test/java/org/apache/sshd/PortForwardingLoadTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/PortForwardingLoadTest.java b/sshd-core/src/test/java/org/apache/sshd/PortForwardingLoadTest.java
deleted file mode 100644
index fe89fba..0000000
--- a/sshd-core/src/test/java/org/apache/sshd/PortForwardingLoadTest.java
+++ /dev/null
@@ -1,438 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sshd;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.httpclient.HostConfiguration;
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.HttpVersion;
-import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
-import org.apache.commons.httpclient.methods.GetMethod;
-import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.service.IoAcceptor;
-import org.apache.mina.core.service.IoHandlerAdapter;
-import org.apache.mina.core.session.IoSession;
-import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
-import org.apache.sshd.server.SshServer;
-import org.apache.sshd.server.forward.AcceptAllForwardingFilter;
-import org.apache.sshd.util.test.BaseTestSupport;
-import org.apache.sshd.util.test.JSchLogger;
-import org.apache.sshd.util.test.SimpleUserInfo;
-import org.apache.sshd.util.test.Utils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.FixMethodOrder;
-import org.junit.Test;
-import org.junit.runners.MethodSorters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.jcraft.jsch.JSch;
-import com.jcraft.jsch.JSchException;
-import com.jcraft.jsch.Session;
-
-/**
- * Port forwarding tests
- */
-@FixMethodOrder(MethodSorters.NAME_ASCENDING)
-public class PortForwardingLoadTest extends BaseTestSupport {
-    private final Logger log = LoggerFactory.getLogger(getClass());
-    private SshServer sshd;
-    private int sshPort;
-    private int echoPort;
-    private IoAcceptor acceptor;
-
-    public PortForwardingLoadTest() {
-        super();
-    }
-
-    @BeforeClass
-    public static void jschInit() {
-        JSchLogger.init();
-    }
-
-    @Before
-    public void setUp() throws Exception {
-        sshd = setupTestServer();
-        sshd.setTcpipForwardingFilter(AcceptAllForwardingFilter.INSTANCE);
-        sshd.start();
-        sshPort = sshd.getPort();
-
-        NioSocketAcceptor acceptor = new NioSocketAcceptor();
-        acceptor.setHandler(new IoHandlerAdapter() {
-            @Override
-            public void messageReceived(IoSession session, Object message) throws Exception {
-                IoBuffer recv = (IoBuffer) message;
-                IoBuffer sent = IoBuffer.allocate(recv.remaining());
-                sent.put(recv);
-                sent.flip();
-                session.write(sent);
-            }
-        });
-        acceptor.setReuseAddress(true);
-        acceptor.bind(new InetSocketAddress(0));
-        echoPort = acceptor.getLocalAddress().getPort();
-        this.acceptor = acceptor;
-
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        if (sshd != null) {
-            sshd.stop(true);
-        }
-        if (acceptor != null) {
-            acceptor.dispose(true);
-        }
-    }
-
-    @Test
-    public void testLocalForwardingPayload() throws Exception {
-        final int NUM_ITERATIONS = 100;
-        final String PAYLOAD_TMP = "This is significantly longer Test Data. This is significantly " +
-                "longer Test Data. This is significantly longer Test Data. This is significantly " +
-                "longer Test Data. This is significantly longer Test Data. This is significantly " +
-                "longer Test Data. This is significantly longer Test Data. This is significantly " +
-                "longer Test Data. This is significantly longer Test Data. This is significantly " +
-                "longer Test Data. ";
-        StringBuilder sb = new StringBuilder(PAYLOAD_TMP.length() * 1000);
-        for (int i = 0; i < 1000; i++) {
-            sb.append(PAYLOAD_TMP);
-        }
-        final String PAYLOAD = sb.toString();
-
-        Session session = createSession();
-        try (final ServerSocket ss = new ServerSocket()) {
-            ss.setReuseAddress(true);
-            ss.bind(new InetSocketAddress((InetAddress) null, 0));
-            int forwardedPort = ss.getLocalPort();
-            int sinkPort = session.setPortForwardingL(0, TEST_LOCALHOST, forwardedPort);
-            final AtomicInteger conCount = new AtomicInteger(0);
-
-            Thread tAcceptor = new Thread(getCurrentTestName() + "Acceptor") {
-                @SuppressWarnings("synthetic-access")
-                @Override
-                public void run() {
-                    try {
-                        byte[] buf = new byte[8192];
-                        log.info("Started...");
-                        for (int i = 0; i < NUM_ITERATIONS; ++i) {
-                            try (Socket s = ss.accept()) {
-                                conCount.incrementAndGet();
-
-                                try (InputStream sockIn = s.getInputStream();
-                                     ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-
-                                    int l;
-                                    while ((baos.size() < PAYLOAD.length()) && ((l = sockIn.read(buf)) > 0)) {
-                                        baos.write(buf, 0, l);
-                                    }
-
-                                    assertEquals("Mismatched received data at iteration #" + i, PAYLOAD, baos.toString());
-
-                                    try (InputStream inputCopy = new ByteArrayInputStream(baos.toByteArray());
-                                         OutputStream sockOut = s.getOutputStream()) {
-
-                                        while ((l = sockIn.read(buf)) > 0) {
-                                            sockOut.write(buf, 0, l);
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                        log.info("Done");
-                    } catch (Exception e) {
-                        log.error("Failed to complete run loop", e);
-                    }
-                }
-            };
-            tAcceptor.start();
-            Thread.sleep(50);
-
-            byte[] buf = new byte[8192];
-            byte[] bytes = PAYLOAD.getBytes(StandardCharsets.UTF_8);
-            for (int i = 0; i < NUM_ITERATIONS; i++) {
-                log.info("Iteration {}", Integer.valueOf(i));
-                try (Socket s = new Socket(TEST_LOCALHOST, sinkPort);
-                     OutputStream sockOut = s.getOutputStream()) {
-
-                    s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
-
-                    sockOut.write(bytes);
-                    sockOut.flush();
-
-                    try (InputStream sockIn = s.getInputStream();
-                         ByteArrayOutputStream baos = new ByteArrayOutputStream(bytes.length)) {
-                        int l;
-                        while ((baos.size() < PAYLOAD.length()) && ((l = sockIn.read(buf)) > 0)) {
-                            baos.write(buf, 0, l);
-                        }
-                        assertEquals("Mismatched payload at iteration #" + i, PAYLOAD, baos.toString());
-                    }
-                } catch (Exception e) {
-                    log.error("Error in iteration #" + i, e);
-                }
-            }
-            session.delPortForwardingL(sinkPort);
-
-            ss.close();
-            tAcceptor.join(TimeUnit.SECONDS.toMillis(5L));
-        } finally {
-            session.disconnect();
-        }
-    }
-
-    @Test
-    public void testRemoteForwardingPayload() throws Exception {
-        final int NUM_ITERATIONS = 100;
-        final String PAYLOAD = "This is significantly longer Test Data. This is significantly " +
-                "longer Test Data. This is significantly longer Test Data. This is significantly " +
-                "longer Test Data. This is significantly longer Test Data. This is significantly " +
-                "longer Test Data. This is significantly longer Test Data. This is significantly " +
-                "longer Test Data. This is significantly longer Test Data. This is significantly " +
-                "longer Test Data. ";
-        Session session = createSession();
-        try (final ServerSocket ss = new ServerSocket()) {
-            ss.setReuseAddress(true);
-            ss.bind(new InetSocketAddress((InetAddress) null, 0));
-            int forwardedPort = ss.getLocalPort();
-            int sinkPort = Utils.getFreePort();
-            session.setPortForwardingR(sinkPort, TEST_LOCALHOST, forwardedPort);
-            final boolean started[] = new boolean[1];
-            started[0] = false;
-            final AtomicInteger conCount = new AtomicInteger(0);
-
-            Thread tWriter = new Thread(getCurrentTestName() + "Writer") {
-                @SuppressWarnings("synthetic-access")
-                @Override
-                public void run() {
-                    started[0] = true;
-                    try {
-                        byte[] bytes = PAYLOAD.getBytes(StandardCharsets.UTF_8);
-                        for (int i = 0; i < NUM_ITERATIONS; ++i) {
-                            try (Socket s = ss.accept()) {
-                                conCount.incrementAndGet();
-
-                                try (OutputStream sockOut = s.getOutputStream()) {
-                                    sockOut.write(bytes);
-                                    sockOut.flush();
-                                }
-                            }
-                        }
-                    } catch (Exception e) {
-                        log.error("Failed to complete run loop", e);
-                    }
-                }
-            };
-            tWriter.start();
-            Thread.sleep(50);
-            assertTrue("Server not started", started[0]);
-
-            final boolean lenOK[] = new boolean[NUM_ITERATIONS];
-            final boolean dataOK[] = new boolean[NUM_ITERATIONS];
-            byte b2[] = new byte[PAYLOAD.length()];
-            byte b1[] = new byte[b2.length / 2];
-
-            for (int i = 0; i < NUM_ITERATIONS; i++) {
-                final int ii = i;
-                try (Socket s = new Socket(TEST_LOCALHOST, sinkPort);
-                     InputStream sockIn = s.getInputStream()) {
-
-                    s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
-
-                    int read1 = sockIn.read(b1);
-                    String part1 = new String(b1, 0, read1);
-                    Thread.sleep(50);
-
-                    int read2 = sockIn.read(b2);
-                    String part2 = new String(b2, 0, read2);
-                    int totalRead = read1 + read2;
-                    lenOK[ii] = PAYLOAD.length() == totalRead;
-
-                    String readData = part1 + part2;
-                    dataOK[ii] = PAYLOAD.equals(readData);
-                    if (!lenOK[ii]) {
-                        throw new IndexOutOfBoundsException("Mismatched length: expected=" + PAYLOAD.length() + ", actual=" + totalRead);
-                    }
-
-                    if (!dataOK[ii]) {
-                        throw new IllegalStateException("Mismatched content");
-                    }
-                } catch (Exception e) {
-                    log.error("Failed to complete iteration #" + i, e);
-                }
-            }
-            int ok = 0;
-            for (int i = 0; i < NUM_ITERATIONS; i++) {
-                ok += lenOK[i] ? 1 : 0;
-            }
-            Thread.sleep(50);
-            for (int i = 0; i < NUM_ITERATIONS; i++) {
-                assertTrue("Bad length at iteration " + i, lenOK[i]);
-                assertTrue("Bad data at iteration " + i, dataOK[i]);
-            }
-            session.delPortForwardingR(forwardedPort);
-            ss.close();
-            tWriter.join(TimeUnit.SECONDS.toMillis(5L));
-        } finally {
-            session.disconnect();
-        }
-    }
-
-    @Test
-    public void testForwardingOnLoad() throws Exception {
-//        final String path = "/history/recent/troubles/";
-//        final String host = "www.bbc.co.uk";
-//        final String path = "";
-//        final String host = "www.bahn.de";
-        final String path = "";
-        final String host = TEST_LOCALHOST;
-        final int nbThread = 2;
-        final int nbDownloads = 2;
-        final int nbLoops = 2;
-
-        StringBuilder resp = new StringBuilder();
-        resp.append("<html><body>\n");
-        for (int i = 0; i < 1000; i++) {
-            resp.append("0123456789\n");
-        }
-        resp.append("</body></html>\n");
-        final StringBuilder sb = new StringBuilder();
-        sb.append("HTTP/1.1 200 OK").append('\n');
-        sb.append("Content-Type: text/HTML").append('\n');
-        sb.append("Content-Length: ").append(resp.length()).append('\n');
-        sb.append('\n');
-        sb.append(resp);
-        NioSocketAcceptor acceptor = new NioSocketAcceptor();
-        acceptor.setHandler(new IoHandlerAdapter() {
-            @Override
-            public void messageReceived(IoSession session, Object message) throws Exception {
-                session.write(IoBuffer.wrap(sb.toString().getBytes(StandardCharsets.UTF_8)));
-            }
-        });
-        acceptor.setReuseAddress(true);
-        acceptor.bind(new InetSocketAddress(0));
-        final int port = acceptor.getLocalAddress().getPort();
-
-        Session session = createSession();
-        try {
-            final int forwardedPort1 = session.setPortForwardingL(0, host, port);
-            final int forwardedPort2 = Utils.getFreePort();
-            session.setPortForwardingR(forwardedPort2, TEST_LOCALHOST, forwardedPort1);
-            System.err.println("URL: http://localhost:" + forwardedPort2);
-
-            final CountDownLatch latch = new CountDownLatch(nbThread * nbDownloads * nbLoops);
-            final Thread[] threads = new Thread[nbThread];
-            final List<Throwable> errors = new CopyOnWriteArrayList<Throwable>();
-            for (int i = 0; i < threads.length; i++) {
-                threads[i] = new Thread(getCurrentTestName() + "[" + i + "]") {
-                    @Override
-                    public void run() {
-                        for (int j = 0; j < nbLoops; j++) {
-                            final MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
-                            final HttpClient client = new HttpClient(mgr);
-                            client.getHttpConnectionManager().getParams().setDefaultMaxConnectionsPerHost(100);
-                            client.getHttpConnectionManager().getParams().setMaxTotalConnections(1000);
-                            for (int i = 0; i < nbDownloads; i++) {
-                                try {
-                                    checkHtmlPage(client, new URL("http://localhost:" + forwardedPort2 + path));
-                                } catch (Throwable e) {
-                                    errors.add(e);
-                                } finally {
-                                    latch.countDown();
-                                    System.err.println("Remaining: " + latch.getCount());
-                                }
-                            }
-                            mgr.shutdown();
-                        }
-                    }
-                };
-            }
-            for (int i = 0; i < threads.length; i++) {
-                threads[i].start();
-            }
-            latch.await();
-            for (Throwable t : errors) {
-                t.printStackTrace();
-            }
-            assertEquals(0, errors.size());
-        } finally {
-            session.disconnect();
-        }
-    }
-
-    protected Session createSession() throws JSchException {
-        JSch sch = new JSch();
-        Session session = sch.getSession("sshd", TEST_LOCALHOST, sshPort);
-        session.setUserInfo(new SimpleUserInfo("sshd"));
-        session.connect();
-        return session;
-    }
-
-    protected void checkHtmlPage(HttpClient client, URL url) throws IOException {
-        client.setHostConfiguration(new HostConfiguration());
-        client.getHostConfiguration().setHost(url.getHost(), url.getPort());
-        GetMethod get = new GetMethod("");
-        get.getParams().setVersion(HttpVersion.HTTP_1_1);
-        client.executeMethod(get);
-        String str = get.getResponseBodyAsString();
-        if (str.indexOf("</html>") <= 0) {
-            System.err.println(str);
-        }
-        assertTrue((str.indexOf("</html>") > 0));
-        get.releaseConnection();
-//        url.openConnection().setDefaultUseCaches(false);
-//        Reader reader = new BufferedReader(new InputStreamReader(url.openStream()));
-//        try {
-//            StringWriter sw = new StringWriter();
-//            char[] buf = new char[8192];
-//            while (true) {
-//                int len = reader.read(buf);
-//                if (len < 0) {
-//                    break;
-//                }
-//                sw.write(buf, 0, len);
-//            }
-//            assertTrue(sw.toString().indexOf("</html>") > 0);
-//        } finally {
-//            reader.close();
-//        }
-    }
-
-
-}
-
-

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java b/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java
deleted file mode 100644
index 3b5ddb2..0000000
--- a/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java
+++ /dev/null
@@ -1,610 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sshd;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.charset.StandardCharsets;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.service.IoAcceptor;
-import org.apache.mina.core.service.IoHandlerAdapter;
-import org.apache.mina.core.session.IoSession;
-import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
-import org.apache.sshd.client.SshClient;
-import org.apache.sshd.client.channel.ChannelDirectTcpip;
-import org.apache.sshd.client.session.ClientSession;
-import org.apache.sshd.common.FactoryManager;
-import org.apache.sshd.common.PropertyResolverUtils;
-import org.apache.sshd.common.SshdSocketAddress;
-import org.apache.sshd.common.forward.TcpipForwarder;
-import org.apache.sshd.common.forward.TcpipForwarderFactory;
-import org.apache.sshd.common.session.ConnectionService;
-import org.apache.sshd.common.util.GenericUtils;
-import org.apache.sshd.common.util.ValidateUtils;
-import org.apache.sshd.server.SshServer;
-import org.apache.sshd.server.forward.AcceptAllForwardingFilter;
-import org.apache.sshd.server.global.CancelTcpipForwardHandler;
-import org.apache.sshd.server.global.TcpipForwardHandler;
-import org.apache.sshd.util.test.BaseTestSupport;
-import org.apache.sshd.util.test.JSchLogger;
-import org.apache.sshd.util.test.SimpleUserInfo;
-import org.apache.sshd.util.test.Utils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.FixMethodOrder;
-import org.junit.Test;
-import org.junit.runners.MethodSorters;
-import org.slf4j.LoggerFactory;
-
-import com.jcraft.jsch.JSch;
-import com.jcraft.jsch.JSchException;
-import com.jcraft.jsch.Session;
-
-/**
- * Port forwarding tests
- */
-@FixMethodOrder(MethodSorters.NAME_ASCENDING)
-public class PortForwardingTest extends BaseTestSupport {
-
-    private final org.slf4j.Logger log = LoggerFactory.getLogger(getClass());
-
-    private final BlockingQueue<String> requestsQ = new LinkedBlockingDeque<String>();
-
-    private SshServer sshd;
-    private int sshPort;
-    private int echoPort;
-    private IoAcceptor acceptor;
-    private SshClient client;
-
-    public PortForwardingTest() {
-        super();
-    }
-
-    @BeforeClass
-    public static void jschInit() {
-        JSchLogger.init();
-    }
-
-    @Before
-    public void setUp() throws Exception {
-        sshd = setupTestServer();
-        PropertyResolverUtils.updateProperty(sshd, FactoryManager.WINDOW_SIZE, 2048);
-        PropertyResolverUtils.updateProperty(sshd, FactoryManager.MAX_PACKET_SIZE, 256);
-        sshd.setTcpipForwardingFilter(AcceptAllForwardingFilter.INSTANCE);
-        sshd.start();
-
-        if (!requestsQ.isEmpty()) {
-            requestsQ.clear();
-        }
-
-        final TcpipForwarderFactory factory = ValidateUtils.checkNotNull(sshd.getTcpipForwarderFactory(), "No TcpipForwarderFactory");
-        sshd.setTcpipForwarderFactory(new TcpipForwarderFactory() {
-            private final Class<?>[] interfaces = {TcpipForwarder.class};
-            private final Map<String, String> method2req = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER) {
-                private static final long serialVersionUID = 1L;    // we're not serializing it...
-
-                {
-                    put("localPortForwardingRequested", TcpipForwardHandler.REQUEST);
-                    put("localPortForwardingCancelled", CancelTcpipForwardHandler.REQUEST);
-                }
-            };
-
-            @Override
-            public TcpipForwarder create(ConnectionService service) {
-                Thread thread = Thread.currentThread();
-                ClassLoader cl = thread.getContextClassLoader();
-
-                final TcpipForwarder forwarder = factory.create(service);
-                return (TcpipForwarder) Proxy.newProxyInstance(cl, interfaces, new InvocationHandler() {
-                    @SuppressWarnings("synthetic-access")
-                    @Override
-                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
-                        Object result = method.invoke(forwarder, args);
-                        String name = method.getName();
-                        String request = method2req.get(name);
-                        if (GenericUtils.length(request) > 0) {
-                            if (requestsQ.offer(request)) {
-                                log.info("Signal " + request);
-                            } else {
-                                log.error("Failed to offer request=" + request);
-                            }
-                        }
-                        return result;
-                    }
-                });
-            }
-        });
-        sshPort = sshd.getPort();
-
-        NioSocketAcceptor acceptor = new NioSocketAcceptor();
-        acceptor.setHandler(new IoHandlerAdapter() {
-            @Override
-            public void messageReceived(IoSession session, Object message) throws Exception {
-                IoBuffer recv = (IoBuffer) message;
-                IoBuffer sent = IoBuffer.allocate(recv.remaining());
-                sent.put(recv);
-                sent.flip();
-                session.write(sent);
-            }
-        });
-        acceptor.setReuseAddress(true);
-        acceptor.bind(new InetSocketAddress(0));
-        echoPort = acceptor.getLocalAddress().getPort();
-        this.acceptor = acceptor;
-    }
-
-    private void waitForForwardingRequest(String expected, long timeout) throws InterruptedException {
-        for (long remaining = timeout; remaining > 0L; ) {
-            long waitStart = System.currentTimeMillis();
-            String actual = requestsQ.poll(remaining, TimeUnit.MILLISECONDS);
-            long waitEnd = System.currentTimeMillis();
-            if (GenericUtils.isEmpty(actual)) {
-                throw new IllegalStateException("Failed to retrieve request=" + expected);
-            }
-
-            if (expected.equals(actual)) {
-                return;
-            }
-
-            long waitDuration = waitEnd - waitStart;
-            remaining -= waitDuration;
-        }
-
-        throw new IllegalStateException("Timeout while waiting to retrieve request=" + expected);
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        if (sshd != null) {
-            sshd.stop(true);
-        }
-        if (acceptor != null) {
-            acceptor.dispose(true);
-        }
-        if (client != null) {
-            client.stop();
-        }
-    }
-
-    @Test
-    public void testRemoteForwarding() throws Exception {
-        Session session = createSession();
-        try {
-            int forwardedPort = Utils.getFreePort();
-            session.setPortForwardingR(forwardedPort, TEST_LOCALHOST, echoPort);
-            waitForForwardingRequest(TcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
-
-            try (Socket s = new Socket(TEST_LOCALHOST, forwardedPort);
-                 OutputStream output = s.getOutputStream();
-                 InputStream input = s.getInputStream()) {
-
-                s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
-
-                String expected = getCurrentTestName();
-                byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
-                output.write(bytes);
-                output.flush();
-
-                byte[] buf = new byte[bytes.length + Long.SIZE];
-                int n = input.read(buf);
-                String res = new String(buf, 0, n, StandardCharsets.UTF_8);
-                assertEquals("Mismatched data", expected, res);
-            } finally {
-                session.delPortForwardingR(forwardedPort);
-            }
-        } finally {
-            session.disconnect();
-        }
-    }
-
-    @Test
-    public void testRemoteForwardingSecondTimeInSameSession() throws Exception {
-        Session session = createSession();
-        try {
-            int forwardedPort = Utils.getFreePort();
-            session.setPortForwardingR(forwardedPort, TEST_LOCALHOST, echoPort);
-            waitForForwardingRequest(TcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
-
-            session.delPortForwardingR(TEST_LOCALHOST, forwardedPort);
-            waitForForwardingRequest(CancelTcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
-
-            session.setPortForwardingR(forwardedPort, TEST_LOCALHOST, echoPort);
-            waitForForwardingRequest(TcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
-
-            try (Socket s = new Socket(TEST_LOCALHOST, forwardedPort);
-                 OutputStream output = s.getOutputStream();
-                 InputStream input = s.getInputStream()) {
-
-                s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
-
-                String expected = getCurrentTestName();
-                byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
-                output.write(bytes);
-                output.flush();
-
-                byte[] buf = new byte[bytes.length + Long.SIZE];
-                int n = input.read(buf);
-                String res = new String(buf, 0, n, StandardCharsets.UTF_8);
-                assertEquals("Mismatched data", expected, res);
-            } finally {
-                session.delPortForwardingR(TEST_LOCALHOST, forwardedPort);
-            }
-        } finally {
-            session.disconnect();
-        }
-    }
-
-    @Test
-    public void testRemoteForwardingNative() throws Exception {
-        try (ClientSession session = createNativeSession()) {
-            SshdSocketAddress remote = new SshdSocketAddress("", 0);
-            SshdSocketAddress local = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
-            SshdSocketAddress bound = session.startRemotePortForwarding(remote, local);
-
-            try (Socket s = new Socket(bound.getHostName(), bound.getPort());
-                 OutputStream output = s.getOutputStream();
-                 InputStream input = s.getInputStream()) {
-
-                s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
-
-                String expected = getCurrentTestName();
-                byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
-                output.write(bytes);
-                output.flush();
-
-                byte[] buf = new byte[bytes.length + Long.SIZE];
-                int n = input.read(buf);
-                String res = new String(buf, 0, n);
-                assertEquals("Mismatched data", expected, res);
-            } finally {
-                session.stopRemotePortForwarding(remote);
-            }
-        }
-    }
-
-    @Test
-    public void testRemoteForwardingNativeBigPayload() throws Exception {
-        try (ClientSession session = createNativeSession()) {
-            SshdSocketAddress remote = new SshdSocketAddress("", 0);
-            SshdSocketAddress local = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
-            SshdSocketAddress bound = session.startRemotePortForwarding(remote, local);
-
-            try (Socket s = new Socket(bound.getHostName(), bound.getPort());
-                 OutputStream output = s.getOutputStream();
-                 InputStream input = s.getInputStream()) {
-
-                s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
-
-                String expected = getCurrentTestName();
-                byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
-                byte[] buf = new byte[bytes.length + Long.SIZE];
-
-                for (int i = 0; i < 1000; i++) {
-                    output.write(bytes);
-                    output.flush();
-
-                    int n = input.read(buf);
-                    String res = new String(buf, 0, n);
-                    assertEquals("Mismatched data at iteration #" + i, expected, res);
-                }
-            } finally {
-                session.stopRemotePortForwarding(remote);
-            }
-        }
-    }
-
-    @Test
-    public void testLocalForwarding() throws Exception {
-        Session session = createSession();
-        try {
-            int forwardedPort = Utils.getFreePort();
-            session.setPortForwardingL(forwardedPort, TEST_LOCALHOST, echoPort);
-
-            try (Socket s = new Socket(TEST_LOCALHOST, forwardedPort);
-                 OutputStream output = s.getOutputStream();
-                 InputStream input = s.getInputStream()) {
-
-                s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
-
-                String expected = getCurrentTestName();
-                byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
-
-                output.write(bytes);
-                output.flush();
-
-                byte[] buf = new byte[bytes.length + Long.SIZE];
-                int n = input.read(buf);
-                String res = new String(buf, 0, n);
-                assertEquals("Mismatched data", expected, res);
-            } finally {
-                session.delPortForwardingL(forwardedPort);
-            }
-        } finally {
-            session.disconnect();
-        }
-    }
-
-    @Test
-    public void testLocalForwardingNative() throws Exception {
-        try (ClientSession session = createNativeSession()) {
-            SshdSocketAddress local = new SshdSocketAddress("", 0);
-            SshdSocketAddress remote = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
-            SshdSocketAddress bound = session.startLocalPortForwarding(local, remote);
-
-            try (Socket s = new Socket(bound.getHostName(), bound.getPort());
-                 OutputStream output = s.getOutputStream();
-                 InputStream input = s.getInputStream()) {
-
-                s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
-
-                String expected = getCurrentTestName();
-                byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
-
-                output.write(bytes);
-                output.flush();
-
-                byte[] buf = new byte[bytes.length + Long.SIZE];
-                int n = input.read(buf);
-                String res = new String(buf, 0, n);
-                assertEquals("Mismatched data", expected, res);
-            } finally {
-                session.stopLocalPortForwarding(bound);
-            }
-        }
-    }
-
-    @Test
-    public void testLocalForwardingNativeReuse() throws Exception {
-        try (ClientSession session = createNativeSession()) {
-            SshdSocketAddress local = new SshdSocketAddress("", 0);
-            SshdSocketAddress remote = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
-            SshdSocketAddress bound = session.startLocalPortForwarding(local, remote);
-
-            session.stopLocalPortForwarding(bound);
-
-            SshdSocketAddress bound2 = session.startLocalPortForwarding(local, remote);
-            session.stopLocalPortForwarding(bound2);
-        }
-    }
-
-    @Test
-    public void testLocalForwardingNativeBigPayload() throws Exception {
-        try (ClientSession session = createNativeSession()) {
-            String expected = getCurrentTestName();
-            byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
-            byte[] buf = new byte[bytes.length + Long.SIZE];
-
-            SshdSocketAddress local = new SshdSocketAddress("", 0);
-            SshdSocketAddress remote = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
-            SshdSocketAddress bound = session.startLocalPortForwarding(local, remote);
-            try (Socket s = new Socket(bound.getHostName(), bound.getPort());
-                 OutputStream output = s.getOutputStream();
-                 InputStream input = s.getInputStream()) {
-
-                s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
-
-                for (int i = 0; i < 1000; i++) {
-                    output.write(bytes);
-                    output.flush();
-
-                    int n = input.read(buf);
-                    String res = new String(buf, 0, n);
-                    assertEquals("Mismatched data at iteration #" + i, expected, res);
-                }
-            } finally {
-                session.stopLocalPortForwarding(bound);
-            }
-        }
-    }
-
-    @Test
-    public void testForwardingChannel() throws Exception {
-        try (ClientSession session = createNativeSession()) {
-            SshdSocketAddress local = new SshdSocketAddress("", 0);
-            SshdSocketAddress remote = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
-
-            try (ChannelDirectTcpip channel = session.createDirectTcpipChannel(local, remote)) {
-                channel.open().verify(9L, TimeUnit.SECONDS);
-
-                String expected = getCurrentTestName();
-                byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
-
-                try (OutputStream output = channel.getInvertedIn();
-                     InputStream input = channel.getInvertedOut()) {
-                    output.write(bytes);
-                    output.flush();
-
-                    byte[] buf = new byte[bytes.length + Long.SIZE];
-                    int n = input.read(buf);
-                    String res = new String(buf, 0, n);
-                    assertEquals("Mismatched data", expected, res);
-                }
-                channel.close(false);
-            }
-        }
-    }
-
-    @Test(timeout = 45000)
-    public void testRemoteForwardingWithDisconnect() throws Exception {
-        Session session = createSession();
-        try {
-            // 1. Create a Port Forward
-            int forwardedPort = Utils.getFreePort();
-            session.setPortForwardingR(forwardedPort, TEST_LOCALHOST, echoPort);
-            waitForForwardingRequest(TcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
-
-            // 2. Establish a connection through it
-            try (Socket s = new Socket(TEST_LOCALHOST, forwardedPort)) {
-                s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
-
-                // 3. Simulate the client going away
-                rudelyDisconnectJschSession(session);
-
-                // 4. Make sure the NIOprocessor is not stuck
-                {
-                    Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
-                    // from here, we need to check all the threads running and find a
-                    // "NioProcessor-"
-                    // that is stuck on a PortForward.dispose
-                    ThreadGroup root = Thread.currentThread().getThreadGroup().getParent();
-                    while (root.getParent() != null) {
-                        root = root.getParent();
-                    }
-
-                    for (int index = 0; ; index++) {
-                        Collection<Thread> pending = findThreads(root, "NioProcessor-");
-                        if (GenericUtils.size(pending) <= 0) {
-                            log.info("Finished after " + index + " iterations");
-                            break;
-                        }
-                        try {
-                            Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
-                        } catch (InterruptedException e) {
-                            // ignored
-                        }
-                    }
-                }
-
-                session.delPortForwardingR(forwardedPort);
-            }
-        } finally {
-            session.disconnect();
-        }
-    }
-
-    /**
-     * Close the socket inside this JSCH session. Use reflection to find it and
-     * just close it.
-     *
-     * @param session the Session to violate
-     * @throws Exception
-     */
-    private void rudelyDisconnectJschSession(Session session) throws Exception {
-        Field fSocket = session.getClass().getDeclaredField("socket");
-        fSocket.setAccessible(true);
-
-        try (Socket socket = (Socket) fSocket.get(session)) {
-            assertTrue("socket is not connected", socket.isConnected());
-            assertFalse("socket should not be closed", socket.isClosed());
-            socket.close();
-            assertTrue("socket has not closed", socket.isClosed());
-        }
-    }
-
-    private Set<Thread> findThreads(ThreadGroup group, String name) {
-        int numThreads = group.activeCount();
-        Thread[] threads = new Thread[numThreads * 2];
-        numThreads = group.enumerate(threads, false);
-        Set<Thread> ret = new HashSet<Thread>();
-
-        // Enumerate each thread in `group'
-        for (int i = 0; i < numThreads; ++i) {
-            Thread t = threads[i];
-            // Get thread
-            // log.debug("Thread name: " + threads[i].getName());
-            if (checkThreadForPortForward(t, name)) {
-                ret.add(t);
-            }
-        }
-        // didn't find the thread to check the
-        int numGroups = group.activeGroupCount();
-        ThreadGroup[] groups = new ThreadGroup[numGroups * 2];
-        numGroups = group.enumerate(groups, false);
-        for (int i = 0; i < numGroups; ++i) {
-            ThreadGroup g = groups[i];
-            Collection<Thread> c = findThreads(g, name);
-            if (GenericUtils.isEmpty(c)) {
-                continue;   // debug breakpoint
-            }
-            ret.addAll(c);
-        }
-        return ret;
-    }
-
-    private boolean checkThreadForPortForward(Thread thread, String name) {
-        if (thread == null)
-            return false;
-        // does it contain the name we're looking for?
-        if (thread.getName().contains(name)) {
-            // look at the stack
-            StackTraceElement[] stack = thread.getStackTrace();
-            if (stack.length == 0)
-                return false;
-            else {
-                // does it have
-                // 'org.apache.sshd.server.session.TcpipForwardSupport.close'?
-                for (int i = 0; i < stack.length; ++i) {
-                    String clazzName = stack[i].getClassName();
-                    String methodName = stack[i].getMethodName();
-                    // log.debug("Class: " + clazzName);
-                    // log.debug("Method: " + methodName);
-                    if (clazzName
-                            .equals("org.apache.sshd.server.session.TcpipForwardSupport")
-                            && (methodName.equals("close") || methodName
-                            .equals("sessionCreated"))) {
-                        log.warn(thread.getName() + " stuck at " + clazzName
-                                + "." + methodName + ": "
-                                + stack[i].getLineNumber());
-                        return true;
-                    }
-                }
-            }
-        }
-        return false;
-    }
-
-    protected Session createSession() throws JSchException {
-        JSch sch = new JSch();
-        Session session = sch.getSession(getCurrentTestName(), TEST_LOCALHOST, sshPort);
-        session.setUserInfo(new SimpleUserInfo(getCurrentTestName()));
-        session.connect();
-        return session;
-    }
-
-    protected ClientSession createNativeSession() throws Exception {
-        client = setupTestClient();
-        PropertyResolverUtils.updateProperty(client, FactoryManager.WINDOW_SIZE, 2048);
-        PropertyResolverUtils.updateProperty(client, FactoryManager.MAX_PACKET_SIZE, 256);
-        client.setTcpipForwardingFilter(AcceptAllForwardingFilter.INSTANCE);
-        client.start();
-
-        ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, sshPort).verify(7L, TimeUnit.SECONDS).getSession();
-        session.addPasswordIdentity(getCurrentTestName());
-        session.auth().verify(11L, TimeUnit.SECONDS);
-        return session;
-    }
-}
-
-

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/test/java/org/apache/sshd/SinglePublicKeyAuthTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/SinglePublicKeyAuthTest.java b/sshd-core/src/test/java/org/apache/sshd/SinglePublicKeyAuthTest.java
deleted file mode 100644
index d6a4561..0000000
--- a/sshd-core/src/test/java/org/apache/sshd/SinglePublicKeyAuthTest.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sshd;
-
-import java.security.KeyPair;
-import java.security.PublicKey;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.sshd.client.SshClient;
-import org.apache.sshd.client.future.AuthFuture;
-import org.apache.sshd.client.session.ClientSession;
-import org.apache.sshd.common.PropertyResolverUtils;
-import org.apache.sshd.common.config.keys.KeyUtils;
-import org.apache.sshd.common.keyprovider.KeyPairProvider;
-import org.apache.sshd.server.ServerFactoryManager;
-import org.apache.sshd.server.SshServer;
-import org.apache.sshd.server.auth.pubkey.CachingPublicKeyAuthenticator;
-import org.apache.sshd.server.auth.pubkey.PublickeyAuthenticator;
-import org.apache.sshd.server.auth.pubkey.UserAuthPublicKeyFactory;
-import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
-import org.apache.sshd.server.session.ServerSession;
-import org.apache.sshd.util.test.BaseTestSupport;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.FixMethodOrder;
-import org.junit.Test;
-import org.junit.runners.MethodSorters;
-
-/**
- * TODO Add javadoc
- *
- * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
- */
-@FixMethodOrder(MethodSorters.NAME_ASCENDING)
-public class SinglePublicKeyAuthTest extends BaseTestSupport {
-
-    private SshServer sshd;
-    private int port;
-    private KeyPair pairRsa = createTestHostKeyProvider().loadKey(KeyPairProvider.SSH_RSA);
-    private KeyPair pairRsaBad;
-    private PublickeyAuthenticator delegate;
-
-    public SinglePublicKeyAuthTest() {
-        SimpleGeneratorHostKeyProvider provider = new SimpleGeneratorHostKeyProvider();
-        provider.setAlgorithm("RSA");
-        pairRsaBad = provider.loadKey(KeyPairProvider.SSH_RSA);
-    }
-
-    @Before
-    public void setUp() throws Exception {
-        sshd = setupTestServer();
-        PropertyResolverUtils.updateProperty(sshd, ServerFactoryManager.AUTH_METHODS, UserAuthPublicKeyFactory.NAME);
-        sshd.setPublickeyAuthenticator(new PublickeyAuthenticator() {
-            @SuppressWarnings("synthetic-access")
-            @Override
-            public boolean authenticate(String username, PublicKey key, ServerSession session) {
-                return delegate.authenticate(username, key, session);
-            }
-        });
-        sshd.start();
-        port = sshd.getPort();
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        if (sshd != null) {
-            sshd.stop(true);
-        }
-    }
-
-    @Test
-    public void testPublicKeyAuthWithCache() throws Exception {
-        final ConcurrentHashMap<String, AtomicInteger> count = new ConcurrentHashMap<String, AtomicInteger>();
-        TestCachingPublicKeyAuthenticator auth = new TestCachingPublicKeyAuthenticator(new PublickeyAuthenticator() {
-            @SuppressWarnings("synthetic-access")
-            @Override
-            public boolean authenticate(String username, PublicKey key, ServerSession session) {
-                String fp = KeyUtils.getFingerPrint(key);
-                count.putIfAbsent(fp, new AtomicInteger());
-                count.get(fp).incrementAndGet();
-                return key.equals(pairRsa.getPublic());
-            }
-        });
-        delegate = auth;
-
-        try (SshClient client = setupTestClient()) {
-            client.start();
-
-            try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession()) {
-                session.addPublicKeyIdentity(pairRsaBad);
-                session.addPublicKeyIdentity(pairRsa);
-                session.auth().verify(5L, TimeUnit.SECONDS);
-
-                assertEquals("Mismatched authentication invocations count", 2, count.size());
-
-                String fpBad = KeyUtils.getFingerPrint(pairRsaBad.getPublic());
-                String fpGood = KeyUtils.getFingerPrint(pairRsa.getPublic());
-                assertTrue("Missing bad public key", count.containsKey(fpBad));
-                assertTrue("Missing good public key", count.containsKey(fpGood));
-                assertEquals("Mismatched bad key authentication attempts", 1, count.get(fpBad).get());
-                assertEquals("Mismatched good key authentication attempts", 1, count.get(fpGood).get());
-            } finally {
-                client.stop();
-            }
-        }
-
-        Thread.sleep(100L);
-        assertTrue("Cache not empty", auth.getCache().isEmpty());
-    }
-
-    @Test
-    public void testPublicKeyAuthWithoutCache() throws Exception {
-        final ConcurrentHashMap<String, AtomicInteger> count = new ConcurrentHashMap<String, AtomicInteger>();
-        delegate = new PublickeyAuthenticator() {
-            @SuppressWarnings("synthetic-access")
-            @Override
-            public boolean authenticate(String username, PublicKey key, ServerSession session) {
-                String fp = KeyUtils.getFingerPrint(key);
-                count.putIfAbsent(fp, new AtomicInteger());
-                count.get(fp).incrementAndGet();
-                return key.equals(pairRsa.getPublic());
-            }
-        };
-
-        try (SshClient client = setupTestClient()) {
-            client.start();
-
-            try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession()) {
-                session.addPublicKeyIdentity(pairRsaBad);
-                session.addPublicKeyIdentity(pairRsa);
-
-                AuthFuture auth = session.auth();
-                assertTrue("Failed to authenticate on time", auth.await(5L, TimeUnit.SECONDS));
-                assertTrue("Authentication failed", auth.isSuccess());
-            } finally {
-                client.stop();
-            }
-        }
-
-        assertEquals("Mismatched attempted keys count", 2, count.size());
-
-        String badFingerPrint = KeyUtils.getFingerPrint(pairRsaBad.getPublic());
-        Number badIndex = count.get(badFingerPrint);
-        assertNotNull("Missing bad RSA key", badIndex);
-        assertEquals("Mismatched attempt index for bad key", 1, badIndex.intValue());
-
-        String goodFingerPrint = KeyUtils.getFingerPrint(pairRsa.getPublic());
-        Number goodIndex = count.get(goodFingerPrint);
-        assertNotNull("Missing good RSA key", goodIndex);
-        assertEquals("Mismatched attempt index for good key", 2, goodIndex.intValue());
-    }
-
-    public static class TestCachingPublicKeyAuthenticator extends CachingPublicKeyAuthenticator {
-        public TestCachingPublicKeyAuthenticator(PublickeyAuthenticator authenticator) {
-            super(authenticator);
-        }
-
-        public Map<ServerSession, Map<PublicKey, Boolean>> getCache() {
-            return cache;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/test/java/org/apache/sshd/common/auth/SinglePublicKeyAuthTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/auth/SinglePublicKeyAuthTest.java b/sshd-core/src/test/java/org/apache/sshd/common/auth/SinglePublicKeyAuthTest.java
new file mode 100644
index 0000000..f1ca19c
--- /dev/null
+++ b/sshd-core/src/test/java/org/apache/sshd/common/auth/SinglePublicKeyAuthTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.auth;
+
+import java.security.KeyPair;
+import java.security.PublicKey;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.sshd.client.SshClient;
+import org.apache.sshd.client.future.AuthFuture;
+import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.common.PropertyResolverUtils;
+import org.apache.sshd.common.config.keys.KeyUtils;
+import org.apache.sshd.common.keyprovider.KeyPairProvider;
+import org.apache.sshd.server.ServerFactoryManager;
+import org.apache.sshd.server.SshServer;
+import org.apache.sshd.server.auth.pubkey.CachingPublicKeyAuthenticator;
+import org.apache.sshd.server.auth.pubkey.PublickeyAuthenticator;
+import org.apache.sshd.server.auth.pubkey.UserAuthPublicKeyFactory;
+import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
+import org.apache.sshd.server.session.ServerSession;
+import org.apache.sshd.util.test.BaseTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+/**
+ * TODO Add javadoc
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class SinglePublicKeyAuthTest extends BaseTestSupport {
+
+    private SshServer sshd;
+    private int port;
+    private KeyPair pairRsa = createTestHostKeyProvider().loadKey(KeyPairProvider.SSH_RSA);
+    private KeyPair pairRsaBad;
+    private PublickeyAuthenticator delegate;
+
+    public SinglePublicKeyAuthTest() {
+        SimpleGeneratorHostKeyProvider provider = new SimpleGeneratorHostKeyProvider();
+        provider.setAlgorithm("RSA");
+        pairRsaBad = provider.loadKey(KeyPairProvider.SSH_RSA);
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        sshd = setupTestServer();
+        PropertyResolverUtils.updateProperty(sshd, ServerFactoryManager.AUTH_METHODS, UserAuthPublicKeyFactory.NAME);
+        sshd.setPublickeyAuthenticator(new PublickeyAuthenticator() {
+            @SuppressWarnings("synthetic-access")
+            @Override
+            public boolean authenticate(String username, PublicKey key, ServerSession session) {
+                return delegate.authenticate(username, key, session);
+            }
+        });
+        sshd.start();
+        port = sshd.getPort();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (sshd != null) {
+            sshd.stop(true);
+        }
+    }
+
+    @Test
+    public void testPublicKeyAuthWithCache() throws Exception {
+        final ConcurrentHashMap<String, AtomicInteger> count = new ConcurrentHashMap<String, AtomicInteger>();
+        TestCachingPublicKeyAuthenticator auth = new TestCachingPublicKeyAuthenticator(new PublickeyAuthenticator() {
+            @SuppressWarnings("synthetic-access")
+            @Override
+            public boolean authenticate(String username, PublicKey key, ServerSession session) {
+                String fp = KeyUtils.getFingerPrint(key);
+                count.putIfAbsent(fp, new AtomicInteger());
+                count.get(fp).incrementAndGet();
+                return key.equals(pairRsa.getPublic());
+            }
+        });
+        delegate = auth;
+
+        try (SshClient client = setupTestClient()) {
+            client.start();
+
+            try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession()) {
+                session.addPublicKeyIdentity(pairRsaBad);
+                session.addPublicKeyIdentity(pairRsa);
+                session.auth().verify(5L, TimeUnit.SECONDS);
+
+                assertEquals("Mismatched authentication invocations count", 2, count.size());
+
+                String fpBad = KeyUtils.getFingerPrint(pairRsaBad.getPublic());
+                String fpGood = KeyUtils.getFingerPrint(pairRsa.getPublic());
+                assertTrue("Missing bad public key", count.containsKey(fpBad));
+                assertTrue("Missing good public key", count.containsKey(fpGood));
+                assertEquals("Mismatched bad key authentication attempts", 1, count.get(fpBad).get());
+                assertEquals("Mismatched good key authentication attempts", 1, count.get(fpGood).get());
+            } finally {
+                client.stop();
+            }
+        }
+
+        Thread.sleep(100L);
+        assertTrue("Cache not empty", auth.getCache().isEmpty());
+    }
+
+    @Test
+    public void testPublicKeyAuthWithoutCache() throws Exception {
+        final ConcurrentHashMap<String, AtomicInteger> count = new ConcurrentHashMap<String, AtomicInteger>();
+        delegate = new PublickeyAuthenticator() {
+            @SuppressWarnings("synthetic-access")
+            @Override
+            public boolean authenticate(String username, PublicKey key, ServerSession session) {
+                String fp = KeyUtils.getFingerPrint(key);
+                count.putIfAbsent(fp, new AtomicInteger());
+                count.get(fp).incrementAndGet();
+                return key.equals(pairRsa.getPublic());
+            }
+        };
+
+        try (SshClient client = setupTestClient()) {
+            client.start();
+
+            try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession()) {
+                session.addPublicKeyIdentity(pairRsaBad);
+                session.addPublicKeyIdentity(pairRsa);
+
+                AuthFuture auth = session.auth();
+                assertTrue("Failed to authenticate on time", auth.await(5L, TimeUnit.SECONDS));
+                assertTrue("Authentication failed", auth.isSuccess());
+            } finally {
+                client.stop();
+            }
+        }
+
+        assertEquals("Mismatched attempted keys count", 2, count.size());
+
+        String badFingerPrint = KeyUtils.getFingerPrint(pairRsaBad.getPublic());
+        Number badIndex = count.get(badFingerPrint);
+        assertNotNull("Missing bad RSA key", badIndex);
+        assertEquals("Mismatched attempt index for bad key", 1, badIndex.intValue());
+
+        String goodFingerPrint = KeyUtils.getFingerPrint(pairRsa.getPublic());
+        Number goodIndex = count.get(goodFingerPrint);
+        assertNotNull("Missing good RSA key", goodIndex);
+        assertEquals("Mismatched attempt index for good key", 2, goodIndex.intValue());
+    }
+
+    public static class TestCachingPublicKeyAuthenticator extends CachingPublicKeyAuthenticator {
+        public TestCachingPublicKeyAuthenticator(PublickeyAuthenticator authenticator) {
+            super(authenticator);
+        }
+
+        public Map<ServerSession, Map<PublicKey, Boolean>> getCache() {
+            return cache;
+        }
+    }
+}
\ No newline at end of file