You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2016/01/07 09:35:36 UTC
[1/3] mina-sshd git commit: Added timeout for global response method
to avoid infinite waits
Repository: mina-sshd
Updated Branches:
refs/heads/master 1e09776e7 -> e980eb63d
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java
new file mode 100644
index 0000000..e7b9162
--- /dev/null
+++ b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.forward;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.httpclient.HostConfiguration;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpVersion;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.service.IoAcceptor;
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.apache.sshd.server.SshServer;
+import org.apache.sshd.server.forward.AcceptAllForwardingFilter;
+import org.apache.sshd.util.test.BaseTestSupport;
+import org.apache.sshd.util.test.JSchLogger;
+import org.apache.sshd.util.test.SimpleUserInfo;
+import org.apache.sshd.util.test.Utils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+
+/**
+ * Port forwarding tests
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class PortForwardingLoadTest extends BaseTestSupport {
+ private final Logger log;
+ private SshServer sshd;
+ private int sshPort;
+ private IoAcceptor acceptor;
+
+ public PortForwardingLoadTest() {
+ log = LoggerFactory.getLogger(getClass());
+ }
+
+ @BeforeClass
+ public static void jschInit() {
+ JSchLogger.init();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ sshd = setupTestServer();
+ sshd.setTcpipForwardingFilter(AcceptAllForwardingFilter.INSTANCE);
+ sshd.start();
+ sshPort = sshd.getPort();
+
+ NioSocketAcceptor acceptor = new NioSocketAcceptor();
+ acceptor.setHandler(new IoHandlerAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message) throws Exception {
+ IoBuffer recv = (IoBuffer) message;
+ IoBuffer sent = IoBuffer.allocate(recv.remaining());
+ sent.put(recv);
+ sent.flip();
+ session.write(sent);
+ }
+ });
+ acceptor.setReuseAddress(true);
+ acceptor.bind(new InetSocketAddress(0));
+ log.info("setUp() echo address = {}", acceptor.getLocalAddress());
+ this.acceptor = acceptor;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (sshd != null) {
+ sshd.stop(true);
+ }
+ if (acceptor != null) {
+ acceptor.dispose(true);
+ }
+ }
+
+ @Test
+ public void testLocalForwardingPayload() throws Exception {
+ final int NUM_ITERATIONS = 100;
+ final String PAYLOAD_TMP = "This is significantly longer Test Data. This is significantly " +
+ "longer Test Data. This is significantly longer Test Data. This is significantly " +
+ "longer Test Data. This is significantly longer Test Data. This is significantly " +
+ "longer Test Data. This is significantly longer Test Data. This is significantly " +
+ "longer Test Data. This is significantly longer Test Data. This is significantly " +
+ "longer Test Data. ";
+ StringBuilder sb = new StringBuilder(PAYLOAD_TMP.length() * 1000);
+ for (int i = 0; i < 1000; i++) {
+ sb.append(PAYLOAD_TMP);
+ }
+ final String PAYLOAD = sb.toString();
+
+ Session session = createSession();
+ try (final ServerSocket ss = new ServerSocket()) {
+ ss.setReuseAddress(true);
+ ss.bind(new InetSocketAddress((InetAddress) null, 0));
+ int forwardedPort = ss.getLocalPort();
+ int sinkPort = session.setPortForwardingL(0, TEST_LOCALHOST, forwardedPort);
+ final AtomicInteger conCount = new AtomicInteger(0);
+
+ Thread tAcceptor = new Thread(getCurrentTestName() + "Acceptor") {
+ @SuppressWarnings("synthetic-access")
+ @Override
+ public void run() {
+ try {
+ byte[] buf = new byte[8192];
+ log.info("Started...");
+ for (int i = 0; i < NUM_ITERATIONS; ++i) {
+ try (Socket s = ss.accept()) {
+ conCount.incrementAndGet();
+
+ try (InputStream sockIn = s.getInputStream();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+
+ int l;
+ while ((baos.size() < PAYLOAD.length()) && ((l = sockIn.read(buf)) > 0)) {
+ baos.write(buf, 0, l);
+ }
+
+ assertEquals("Mismatched received data at iteration #" + i, PAYLOAD, baos.toString());
+
+ try (InputStream inputCopy = new ByteArrayInputStream(baos.toByteArray());
+ OutputStream sockOut = s.getOutputStream()) {
+
+ while ((l = sockIn.read(buf)) > 0) {
+ sockOut.write(buf, 0, l);
+ }
+ }
+ }
+ }
+ }
+ log.info("Done");
+ } catch (Exception e) {
+ log.error("Failed to complete run loop", e);
+ }
+ }
+ };
+ tAcceptor.start();
+ Thread.sleep(50);
+
+ byte[] buf = new byte[8192];
+ byte[] bytes = PAYLOAD.getBytes(StandardCharsets.UTF_8);
+ for (int i = 0; i < NUM_ITERATIONS; i++) {
+ log.info("Iteration {}", Integer.valueOf(i));
+ try (Socket s = new Socket(TEST_LOCALHOST, sinkPort);
+ OutputStream sockOut = s.getOutputStream()) {
+
+ s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
+
+ sockOut.write(bytes);
+ sockOut.flush();
+
+ try (InputStream sockIn = s.getInputStream();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(bytes.length)) {
+ int l;
+ while ((baos.size() < PAYLOAD.length()) && ((l = sockIn.read(buf)) > 0)) {
+ baos.write(buf, 0, l);
+ }
+ assertEquals("Mismatched payload at iteration #" + i, PAYLOAD, baos.toString());
+ }
+ } catch (Exception e) {
+ log.error("Error in iteration #" + i, e);
+ }
+ }
+ session.delPortForwardingL(sinkPort);
+
+ ss.close();
+ tAcceptor.join(TimeUnit.SECONDS.toMillis(5L));
+ } finally {
+ session.disconnect();
+ }
+ }
+
+ @Test
+ public void testRemoteForwardingPayload() throws Exception {
+ final int NUM_ITERATIONS = 100;
+ final String PAYLOAD = "This is significantly longer Test Data. This is significantly " +
+ "longer Test Data. This is significantly longer Test Data. This is significantly " +
+ "longer Test Data. This is significantly longer Test Data. This is significantly " +
+ "longer Test Data. This is significantly longer Test Data. This is significantly " +
+ "longer Test Data. This is significantly longer Test Data. This is significantly " +
+ "longer Test Data. ";
+ Session session = createSession();
+ try (final ServerSocket ss = new ServerSocket()) {
+ ss.setReuseAddress(true);
+ ss.bind(new InetSocketAddress((InetAddress) null, 0));
+ int forwardedPort = ss.getLocalPort();
+ int sinkPort = Utils.getFreePort();
+ session.setPortForwardingR(sinkPort, TEST_LOCALHOST, forwardedPort);
+ final boolean started[] = new boolean[1];
+ started[0] = false;
+ final AtomicInteger conCount = new AtomicInteger(0);
+
+ Thread tWriter = new Thread(getCurrentTestName() + "Writer") {
+ @SuppressWarnings("synthetic-access")
+ @Override
+ public void run() {
+ started[0] = true;
+ try {
+ byte[] bytes = PAYLOAD.getBytes(StandardCharsets.UTF_8);
+ for (int i = 0; i < NUM_ITERATIONS; ++i) {
+ try (Socket s = ss.accept()) {
+ conCount.incrementAndGet();
+
+ try (OutputStream sockOut = s.getOutputStream()) {
+ sockOut.write(bytes);
+ sockOut.flush();
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("Failed to complete run loop", e);
+ }
+ }
+ };
+ tWriter.start();
+ Thread.sleep(50);
+ assertTrue("Server not started", started[0]);
+
+ final RuntimeException lenOK[] = new RuntimeException[NUM_ITERATIONS];
+ final RuntimeException dataOK[] = new RuntimeException[NUM_ITERATIONS];
+ byte b2[] = new byte[PAYLOAD.length()];
+ byte b1[] = new byte[b2.length / 2];
+
+ for (int i = 0; i < NUM_ITERATIONS; i++) {
+ final int ii = i;
+ try (Socket s = new Socket(TEST_LOCALHOST, sinkPort);
+ InputStream sockIn = s.getInputStream()) {
+ s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
+
+ int read1 = sockIn.read(b1);
+ String part1 = new String(b1, 0, read1, StandardCharsets.UTF_8);
+ Thread.sleep(50);
+
+ int read2 = sockIn.read(b2);
+ String part2 = new String(b2, 0, read2, StandardCharsets.UTF_8);
+ int totalRead = read1 + read2;
+ lenOK[ii] = (PAYLOAD.length() == totalRead)
+ ? null
+ : new IndexOutOfBoundsException("Mismatched length: expected=" + PAYLOAD.length() + ", actual=" + totalRead);
+
+ String readData = part1 + part2;
+ dataOK[ii] = PAYLOAD.equals(readData) ? null : new IllegalStateException("Mismatched content");
+ if (lenOK[ii] != null) {
+ throw lenOK[ii];
+ }
+
+ if (dataOK[ii] != null) {
+ throw dataOK[ii];
+ }
+ } catch (Exception e) {
+ if (e instanceof IOException) {
+ log.warn("I/O exception in iteration #" + i, e);
+ } else {
+ log.error("Failed to complete iteration #" + i, e);
+ }
+ }
+ }
+ int ok = 0;
+ for (int i = 0; i < NUM_ITERATIONS; i++) {
+ ok += (lenOK[i] == null) ? 1 : 0;
+ }
+ log.info("Successful iteration: " + ok + " out of " + NUM_ITERATIONS);
+ Thread.sleep(55L);
+ for (int i = 0; i < NUM_ITERATIONS; i++) {
+ assertNull("Bad length at iteration " + i, lenOK[i]);
+ assertNull("Bad data at iteration " + i, dataOK[i]);
+ }
+ session.delPortForwardingR(forwardedPort);
+ ss.close();
+ tWriter.join(TimeUnit.SECONDS.toMillis(5L));
+ } finally {
+ session.disconnect();
+ }
+ }
+
+ @Test
+ public void testForwardingOnLoad() throws Exception {
+// final String path = "/history/recent/troubles/";
+// final String host = "www.bbc.co.uk";
+// final String path = "";
+// final String host = "www.bahn.de";
+ final String path = "";
+ final String host = TEST_LOCALHOST;
+ final int nbThread = 2;
+ final int nbDownloads = 2;
+ final int nbLoops = 2;
+
+ StringBuilder resp = new StringBuilder();
+ resp.append("<html><body>\n");
+ for (int i = 0; i < 1000; i++) {
+ resp.append("0123456789\n");
+ }
+ resp.append("</body></html>\n");
+ final StringBuilder sb = new StringBuilder();
+ sb.append("HTTP/1.1 200 OK").append('\n');
+ sb.append("Content-Type: text/HTML").append('\n');
+ sb.append("Content-Length: ").append(resp.length()).append('\n');
+ sb.append('\n');
+ sb.append(resp);
+ NioSocketAcceptor acceptor = new NioSocketAcceptor();
+ acceptor.setHandler(new IoHandlerAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message) throws Exception {
+ session.write(IoBuffer.wrap(sb.toString().getBytes(StandardCharsets.UTF_8)));
+ }
+ });
+ acceptor.setReuseAddress(true);
+ acceptor.bind(new InetSocketAddress(0));
+ final int port = acceptor.getLocalAddress().getPort();
+
+ Session session = createSession();
+ try {
+ final int forwardedPort1 = session.setPortForwardingL(0, host, port);
+ final int forwardedPort2 = Utils.getFreePort();
+ session.setPortForwardingR(forwardedPort2, TEST_LOCALHOST, forwardedPort1);
+ System.err.println("URL: http://localhost:" + forwardedPort2);
+
+ final CountDownLatch latch = new CountDownLatch(nbThread * nbDownloads * nbLoops);
+ final Thread[] threads = new Thread[nbThread];
+ final List<Throwable> errors = new CopyOnWriteArrayList<Throwable>();
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new Thread(getCurrentTestName() + "[" + i + "]") {
+ @Override
+ public void run() {
+ for (int j = 0; j < nbLoops; j++) {
+ final MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
+ final HttpClient client = new HttpClient(mgr);
+ client.getHttpConnectionManager().getParams().setDefaultMaxConnectionsPerHost(100);
+ client.getHttpConnectionManager().getParams().setMaxTotalConnections(1000);
+ for (int i = 0; i < nbDownloads; i++) {
+ try {
+ checkHtmlPage(client, new URL("http://localhost:" + forwardedPort2 + path));
+ } catch (Throwable e) {
+ errors.add(e);
+ } finally {
+ latch.countDown();
+ System.err.println("Remaining: " + latch.getCount());
+ }
+ }
+ mgr.shutdown();
+ }
+ }
+ };
+ }
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].start();
+ }
+ latch.await();
+ for (Throwable t : errors) {
+ t.printStackTrace();
+ }
+ assertEquals(0, errors.size());
+ } finally {
+ session.disconnect();
+ }
+ }
+
+ protected Session createSession() throws JSchException {
+ JSch sch = new JSch();
+ Session session = sch.getSession("sshd", TEST_LOCALHOST, sshPort);
+ session.setUserInfo(new SimpleUserInfo("sshd"));
+ session.connect();
+ return session;
+ }
+
+ protected void checkHtmlPage(HttpClient client, URL url) throws IOException {
+ client.setHostConfiguration(new HostConfiguration());
+ client.getHostConfiguration().setHost(url.getHost(), url.getPort());
+ GetMethod get = new GetMethod("");
+ get.getParams().setVersion(HttpVersion.HTTP_1_1);
+ client.executeMethod(get);
+ String str = get.getResponseBodyAsString();
+ if (str.indexOf("</html>") <= 0) {
+ System.err.println(str);
+ }
+ assertTrue((str.indexOf("</html>") > 0));
+ get.releaseConnection();
+// url.openConnection().setDefaultUseCaches(false);
+// Reader reader = new BufferedReader(new InputStreamReader(url.openStream()));
+// try {
+// StringWriter sw = new StringWriter();
+// char[] buf = new char[8192];
+// while (true) {
+// int len = reader.read(buf);
+// if (len < 0) {
+// break;
+// }
+// sw.write(buf, 0, len);
+// }
+// assertTrue(sw.toString().indexOf("</html>") > 0);
+// } finally {
+// reader.close();
+// }
+ }
+
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java
new file mode 100644
index 0000000..0cd71e0
--- /dev/null
+++ b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.forward;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.service.IoAcceptor;
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.apache.sshd.client.SshClient;
+import org.apache.sshd.client.channel.ChannelDirectTcpip;
+import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.PropertyResolverUtils;
+import org.apache.sshd.common.SshdSocketAddress;
+import org.apache.sshd.common.forward.TcpipForwarder;
+import org.apache.sshd.common.forward.TcpipForwarderFactory;
+import org.apache.sshd.common.session.ConnectionService;
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.ValidateUtils;
+import org.apache.sshd.server.SshServer;
+import org.apache.sshd.server.forward.AcceptAllForwardingFilter;
+import org.apache.sshd.server.global.CancelTcpipForwardHandler;
+import org.apache.sshd.server.global.TcpipForwardHandler;
+import org.apache.sshd.util.test.BaseTestSupport;
+import org.apache.sshd.util.test.JSchLogger;
+import org.apache.sshd.util.test.SimpleUserInfo;
+import org.apache.sshd.util.test.Utils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+import org.slf4j.LoggerFactory;
+
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+
+/**
+ * Port forwarding tests
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class PortForwardingTest extends BaseTestSupport {
+
+ private final org.slf4j.Logger log = LoggerFactory.getLogger(getClass());
+
+ private final BlockingQueue<String> requestsQ = new LinkedBlockingDeque<String>();
+
+ private SshServer sshd;
+ private int sshPort;
+ private int echoPort;
+ private IoAcceptor acceptor;
+ private SshClient client;
+
+ public PortForwardingTest() {
+ super();
+ }
+
+ @BeforeClass
+ public static void jschInit() {
+ JSchLogger.init();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ sshd = setupTestServer();
+ PropertyResolverUtils.updateProperty(sshd, FactoryManager.WINDOW_SIZE, 2048);
+ PropertyResolverUtils.updateProperty(sshd, FactoryManager.MAX_PACKET_SIZE, 256);
+ sshd.setTcpipForwardingFilter(AcceptAllForwardingFilter.INSTANCE);
+ sshd.start();
+
+ if (!requestsQ.isEmpty()) {
+ requestsQ.clear();
+ }
+
+ final TcpipForwarderFactory factory = ValidateUtils.checkNotNull(sshd.getTcpipForwarderFactory(), "No TcpipForwarderFactory");
+ sshd.setTcpipForwarderFactory(new TcpipForwarderFactory() {
+ private final Class<?>[] interfaces = {TcpipForwarder.class};
+ private final Map<String, String> method2req = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER) {
+ private static final long serialVersionUID = 1L; // we're not serializing it...
+
+ {
+ put("localPortForwardingRequested", TcpipForwardHandler.REQUEST);
+ put("localPortForwardingCancelled", CancelTcpipForwardHandler.REQUEST);
+ }
+ };
+
+ @Override
+ public TcpipForwarder create(ConnectionService service) {
+ Thread thread = Thread.currentThread();
+ ClassLoader cl = thread.getContextClassLoader();
+
+ final TcpipForwarder forwarder = factory.create(service);
+ return (TcpipForwarder) Proxy.newProxyInstance(cl, interfaces, new InvocationHandler() {
+ @SuppressWarnings("synthetic-access")
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ Object result = method.invoke(forwarder, args);
+ String name = method.getName();
+ String request = method2req.get(name);
+ if (GenericUtils.length(request) > 0) {
+ if (requestsQ.offer(request)) {
+ log.info("Signal " + request);
+ } else {
+ log.error("Failed to offer request=" + request);
+ }
+ }
+ return result;
+ }
+ });
+ }
+ });
+ sshPort = sshd.getPort();
+
+ NioSocketAcceptor acceptor = new NioSocketAcceptor();
+ acceptor.setHandler(new IoHandlerAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message) throws Exception {
+ IoBuffer recv = (IoBuffer) message;
+ IoBuffer sent = IoBuffer.allocate(recv.remaining());
+ sent.put(recv);
+ sent.flip();
+ session.write(sent);
+ }
+ });
+ acceptor.setReuseAddress(true);
+ acceptor.bind(new InetSocketAddress(0));
+ echoPort = acceptor.getLocalAddress().getPort();
+ this.acceptor = acceptor;
+ }
+
+ private void waitForForwardingRequest(String expected, long timeout) throws InterruptedException {
+ for (long remaining = timeout; remaining > 0L; ) {
+ long waitStart = System.currentTimeMillis();
+ String actual = requestsQ.poll(remaining, TimeUnit.MILLISECONDS);
+ long waitEnd = System.currentTimeMillis();
+ if (GenericUtils.isEmpty(actual)) {
+ throw new IllegalStateException("Failed to retrieve request=" + expected);
+ }
+
+ if (expected.equals(actual)) {
+ return;
+ }
+
+ long waitDuration = waitEnd - waitStart;
+ remaining -= waitDuration;
+ }
+
+ throw new IllegalStateException("Timeout while waiting to retrieve request=" + expected);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (sshd != null) {
+ sshd.stop(true);
+ }
+ if (acceptor != null) {
+ acceptor.dispose(true);
+ }
+ if (client != null) {
+ client.stop();
+ }
+ }
+
+ @Test
+ public void testRemoteForwarding() throws Exception {
+ Session session = createSession();
+ try {
+ int forwardedPort = Utils.getFreePort();
+ session.setPortForwardingR(forwardedPort, TEST_LOCALHOST, echoPort);
+ waitForForwardingRequest(TcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
+
+ try (Socket s = new Socket(TEST_LOCALHOST, forwardedPort);
+ OutputStream output = s.getOutputStream();
+ InputStream input = s.getInputStream()) {
+
+ s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
+
+ String expected = getCurrentTestName();
+ byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
+ output.write(bytes);
+ output.flush();
+
+ byte[] buf = new byte[bytes.length + Long.SIZE];
+ int n = input.read(buf);
+ String res = new String(buf, 0, n, StandardCharsets.UTF_8);
+ assertEquals("Mismatched data", expected, res);
+ } finally {
+ session.delPortForwardingR(forwardedPort);
+ }
+ } finally {
+ session.disconnect();
+ }
+ }
+
+ @Test
+ public void testRemoteForwardingSecondTimeInSameSession() throws Exception {
+ Session session = createSession();
+ try {
+ int forwardedPort = Utils.getFreePort();
+ session.setPortForwardingR(forwardedPort, TEST_LOCALHOST, echoPort);
+ waitForForwardingRequest(TcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
+
+ session.delPortForwardingR(TEST_LOCALHOST, forwardedPort);
+ waitForForwardingRequest(CancelTcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
+
+ session.setPortForwardingR(forwardedPort, TEST_LOCALHOST, echoPort);
+ waitForForwardingRequest(TcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
+
+ try (Socket s = new Socket(TEST_LOCALHOST, forwardedPort);
+ OutputStream output = s.getOutputStream();
+ InputStream input = s.getInputStream()) {
+
+ s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
+
+ String expected = getCurrentTestName();
+ byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
+ output.write(bytes);
+ output.flush();
+
+ byte[] buf = new byte[bytes.length + Long.SIZE];
+ int n = input.read(buf);
+ String res = new String(buf, 0, n, StandardCharsets.UTF_8);
+ assertEquals("Mismatched data", expected, res);
+ } finally {
+ session.delPortForwardingR(TEST_LOCALHOST, forwardedPort);
+ }
+ } finally {
+ session.disconnect();
+ }
+ }
+
+ @Test
+ public void testRemoteForwardingNative() throws Exception {
+ try (ClientSession session = createNativeSession()) {
+ SshdSocketAddress remote = new SshdSocketAddress("", 0);
+ SshdSocketAddress local = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
+ SshdSocketAddress bound = session.startRemotePortForwarding(remote, local);
+
+ try (Socket s = new Socket(bound.getHostName(), bound.getPort());
+ OutputStream output = s.getOutputStream();
+ InputStream input = s.getInputStream()) {
+
+ s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
+
+ String expected = getCurrentTestName();
+ byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
+ output.write(bytes);
+ output.flush();
+
+ byte[] buf = new byte[bytes.length + Long.SIZE];
+ int n = input.read(buf);
+ String res = new String(buf, 0, n);
+ assertEquals("Mismatched data", expected, res);
+ } finally {
+ session.stopRemotePortForwarding(remote);
+ }
+ }
+ }
+
+ @Test
+ public void testRemoteForwardingNativeBigPayload() throws Exception {
+ try (ClientSession session = createNativeSession()) {
+ SshdSocketAddress remote = new SshdSocketAddress("", 0);
+ SshdSocketAddress local = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
+ SshdSocketAddress bound = session.startRemotePortForwarding(remote, local);
+
+ try (Socket s = new Socket(bound.getHostName(), bound.getPort());
+ OutputStream output = s.getOutputStream();
+ InputStream input = s.getInputStream()) {
+
+ s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
+
+ String expected = getCurrentTestName();
+ byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
+ byte[] buf = new byte[bytes.length + Long.SIZE];
+
+ for (int i = 0; i < 1000; i++) {
+ output.write(bytes);
+ output.flush();
+
+ int n = input.read(buf);
+ String res = new String(buf, 0, n);
+ assertEquals("Mismatched data at iteration #" + i, expected, res);
+ }
+ } finally {
+ session.stopRemotePortForwarding(remote);
+ }
+ }
+ }
+
+ @Test
+ public void testLocalForwarding() throws Exception {
+ Session session = createSession();
+ try {
+ int forwardedPort = Utils.getFreePort();
+ session.setPortForwardingL(forwardedPort, TEST_LOCALHOST, echoPort);
+
+ try (Socket s = new Socket(TEST_LOCALHOST, forwardedPort);
+ OutputStream output = s.getOutputStream();
+ InputStream input = s.getInputStream()) {
+
+ s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
+
+ String expected = getCurrentTestName();
+ byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
+
+ output.write(bytes);
+ output.flush();
+
+ byte[] buf = new byte[bytes.length + Long.SIZE];
+ int n = input.read(buf);
+ String res = new String(buf, 0, n);
+ assertEquals("Mismatched data", expected, res);
+ } finally {
+ session.delPortForwardingL(forwardedPort);
+ }
+ } finally {
+ session.disconnect();
+ }
+ }
+
+ @Test
+ public void testLocalForwardingNative() throws Exception {
+ try (ClientSession session = createNativeSession()) {
+ SshdSocketAddress local = new SshdSocketAddress("", 0);
+ SshdSocketAddress remote = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
+ SshdSocketAddress bound = session.startLocalPortForwarding(local, remote);
+
+ try (Socket s = new Socket(bound.getHostName(), bound.getPort());
+ OutputStream output = s.getOutputStream();
+ InputStream input = s.getInputStream()) {
+
+ s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
+
+ String expected = getCurrentTestName();
+ byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
+
+ output.write(bytes);
+ output.flush();
+
+ byte[] buf = new byte[bytes.length + Long.SIZE];
+ int n = input.read(buf);
+ String res = new String(buf, 0, n);
+ assertEquals("Mismatched data", expected, res);
+ } finally {
+ session.stopLocalPortForwarding(bound);
+ }
+ }
+ }
+
+ @Test
+ public void testLocalForwardingNativeReuse() throws Exception {
+ try (ClientSession session = createNativeSession()) {
+ SshdSocketAddress local = new SshdSocketAddress("", 0);
+ SshdSocketAddress remote = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
+ SshdSocketAddress bound = session.startLocalPortForwarding(local, remote);
+
+ session.stopLocalPortForwarding(bound);
+
+ SshdSocketAddress bound2 = session.startLocalPortForwarding(local, remote);
+ session.stopLocalPortForwarding(bound2);
+ }
+ }
+
+ @Test
+ public void testLocalForwardingNativeBigPayload() throws Exception {
+ try (ClientSession session = createNativeSession()) {
+ String expected = getCurrentTestName();
+ byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
+ byte[] buf = new byte[bytes.length + Long.SIZE];
+
+ SshdSocketAddress local = new SshdSocketAddress("", 0);
+ SshdSocketAddress remote = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
+ SshdSocketAddress bound = session.startLocalPortForwarding(local, remote);
+ try (Socket s = new Socket(bound.getHostName(), bound.getPort());
+ OutputStream output = s.getOutputStream();
+ InputStream input = s.getInputStream()) {
+
+ s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
+
+ for (int i = 0; i < 1000; i++) {
+ output.write(bytes);
+ output.flush();
+
+ int n = input.read(buf);
+ String res = new String(buf, 0, n);
+ assertEquals("Mismatched data at iteration #" + i, expected, res);
+ }
+ } finally {
+ session.stopLocalPortForwarding(bound);
+ }
+ }
+ }
+
+ @Test
+ public void testForwardingChannel() throws Exception {
+ try (ClientSession session = createNativeSession()) {
+ SshdSocketAddress local = new SshdSocketAddress("", 0);
+ SshdSocketAddress remote = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
+
+ try (ChannelDirectTcpip channel = session.createDirectTcpipChannel(local, remote)) {
+ channel.open().verify(9L, TimeUnit.SECONDS);
+
+ String expected = getCurrentTestName();
+ byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
+
+ try (OutputStream output = channel.getInvertedIn();
+ InputStream input = channel.getInvertedOut()) {
+ output.write(bytes);
+ output.flush();
+
+ byte[] buf = new byte[bytes.length + Long.SIZE];
+ int n = input.read(buf);
+ String res = new String(buf, 0, n);
+ assertEquals("Mismatched data", expected, res);
+ }
+ channel.close(false);
+ }
+ }
+ }
+
+ @Test(timeout = 45000)
+ public void testRemoteForwardingWithDisconnect() throws Exception {
+ Session session = createSession();
+ try {
+ // 1. Create a Port Forward
+ int forwardedPort = Utils.getFreePort();
+ session.setPortForwardingR(forwardedPort, TEST_LOCALHOST, echoPort);
+ waitForForwardingRequest(TcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
+
+ // 2. Establish a connection through it
+ try (Socket s = new Socket(TEST_LOCALHOST, forwardedPort)) {
+ s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
+
+ // 3. Simulate the client going away
+ rudelyDisconnectJschSession(session);
+
+ // 4. Make sure the NIOprocessor is not stuck
+ {
+ Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
+ // from here, we need to check all the threads running and find a
+ // "NioProcessor-"
+ // that is stuck on a PortForward.dispose
+ ThreadGroup root = Thread.currentThread().getThreadGroup().getParent();
+ while (root.getParent() != null) {
+ root = root.getParent();
+ }
+
+ for (int index = 0; ; index++) {
+ Collection<Thread> pending = findThreads(root, "NioProcessor-");
+ if (GenericUtils.size(pending) <= 0) {
+ log.info("Finished after " + index + " iterations");
+ break;
+ }
+ try {
+ Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
+ } catch (InterruptedException e) {
+ // ignored
+ }
+ }
+ }
+
+ session.delPortForwardingR(forwardedPort);
+ }
+ } finally {
+ session.disconnect();
+ }
+ }
+
+ /**
+ * Close the socket inside this JSCH session. Use reflection to find it and
+ * just close it.
+ *
+ * @param session the Session to violate
+ * @throws Exception
+ */
+ private void rudelyDisconnectJschSession(Session session) throws Exception {
+ Field fSocket = session.getClass().getDeclaredField("socket");
+ fSocket.setAccessible(true);
+
+ try (Socket socket = (Socket) fSocket.get(session)) {
+ assertTrue("socket is not connected", socket.isConnected());
+ assertFalse("socket should not be closed", socket.isClosed());
+ socket.close();
+ assertTrue("socket has not closed", socket.isClosed());
+ }
+ }
+
+ private Set<Thread> findThreads(ThreadGroup group, String name) {
+ int numThreads = group.activeCount();
+ Thread[] threads = new Thread[numThreads * 2];
+ numThreads = group.enumerate(threads, false);
+ Set<Thread> ret = new HashSet<Thread>();
+
+ // Enumerate each thread in `group'
+ for (int i = 0; i < numThreads; ++i) {
+ Thread t = threads[i];
+ // Get thread
+ // log.debug("Thread name: " + threads[i].getName());
+ if (checkThreadForPortForward(t, name)) {
+ ret.add(t);
+ }
+ }
+ // didn't find the thread to check the
+ int numGroups = group.activeGroupCount();
+ ThreadGroup[] groups = new ThreadGroup[numGroups * 2];
+ numGroups = group.enumerate(groups, false);
+ for (int i = 0; i < numGroups; ++i) {
+ ThreadGroup g = groups[i];
+ Collection<Thread> c = findThreads(g, name);
+ if (GenericUtils.isEmpty(c)) {
+ continue; // debug breakpoint
+ }
+ ret.addAll(c);
+ }
+ return ret;
+ }
+
+ private boolean checkThreadForPortForward(Thread thread, String name) {
+ if (thread == null)
+ return false;
+ // does it contain the name we're looking for?
+ if (thread.getName().contains(name)) {
+ // look at the stack
+ StackTraceElement[] stack = thread.getStackTrace();
+ if (stack.length == 0)
+ return false;
+ else {
+ // does it have
+ // 'org.apache.sshd.server.session.TcpipForwardSupport.close'?
+ for (int i = 0; i < stack.length; ++i) {
+ String clazzName = stack[i].getClassName();
+ String methodName = stack[i].getMethodName();
+ // log.debug("Class: " + clazzName);
+ // log.debug("Method: " + methodName);
+ if (clazzName
+ .equals("org.apache.sshd.server.session.TcpipForwardSupport")
+ && (methodName.equals("close") || methodName
+ .equals("sessionCreated"))) {
+ log.warn(thread.getName() + " stuck at " + clazzName
+ + "." + methodName + ": "
+ + stack[i].getLineNumber());
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ protected Session createSession() throws JSchException {
+ JSch sch = new JSch();
+ Session session = sch.getSession(getCurrentTestName(), TEST_LOCALHOST, sshPort);
+ session.setUserInfo(new SimpleUserInfo(getCurrentTestName()));
+ session.connect();
+ return session;
+ }
+
+ protected ClientSession createNativeSession() throws Exception {
+ client = setupTestClient();
+ PropertyResolverUtils.updateProperty(client, FactoryManager.WINDOW_SIZE, 2048);
+ PropertyResolverUtils.updateProperty(client, FactoryManager.MAX_PACKET_SIZE, 256);
+ client.setTcpipForwardingFilter(AcceptAllForwardingFilter.INSTANCE);
+ client.start();
+
+ ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, sshPort).verify(7L, TimeUnit.SECONDS).getSession();
+ session.addPasswordIdentity(getCurrentTestName());
+ session.auth().verify(11L, TimeUnit.SECONDS);
+ return session;
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/test/java/org/apache/sshd/util/test/BogusInvertedShell.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/util/test/BogusInvertedShell.java b/sshd-core/src/test/java/org/apache/sshd/util/test/BogusInvertedShell.java
index 3954bca..c99f4ce 100644
--- a/sshd-core/src/test/java/org/apache/sshd/util/test/BogusInvertedShell.java
+++ b/sshd-core/src/test/java/org/apache/sshd/util/test/BogusInvertedShell.java
@@ -27,9 +27,10 @@ import java.util.Map;
import org.apache.sshd.common.util.io.IoUtils;
import org.apache.sshd.server.Environment;
import org.apache.sshd.server.session.ServerSession;
+import org.apache.sshd.server.session.ServerSessionHolder;
import org.apache.sshd.server.shell.InvertedShell;
-public class BogusInvertedShell implements InvertedShell {
+public class BogusInvertedShell implements InvertedShell, ServerSessionHolder {
private final OutputStream in;
private final InputStream out;
@@ -48,6 +49,11 @@ public class BogusInvertedShell implements InvertedShell {
}
@Override
+ public ServerSession getServerSession() {
+ return session;
+ }
+
+ @Override
public void setSession(ServerSession session) {
this.session = session;
}
[3/3] mina-sshd git commit: Upgraded surefire plugin version to 2.9.1
Posted by lg...@apache.org.
Upgraded surefire plugin version to 2.9.1
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/e980eb63
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/e980eb63
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/e980eb63
Branch: refs/heads/master
Commit: e980eb63d3a1aef171a3b4ad0154c0a4066f018b
Parents: f45ee0e
Author: Lyor Goldstein <lg...@vmware.com>
Authored: Thu Jan 7 10:34:53 2016 +0200
Committer: Lyor Goldstein <lg...@vmware.com>
Committed: Thu Jan 7 10:34:53 2016 +0200
----------------------------------------------------------------------
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e980eb63/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d36ca17..1615d83 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,7 +103,7 @@
<spring.version>3.0.6.RELEASE</spring.version>
<jgit.version>3.4.1.201406201815-r</jgit.version>
<junit.version>4.12</junit.version>
- <surefire.plugin.version>2.19</surefire.plugin.version>
+ <surefire.plugin.version>2.19.1</surefire.plugin.version>
<httpcomps.version>4.4.1</httpcomps.version>
</properties>
[2/3] mina-sshd git commit: Added timeout for global response method
to avoid infinite waits
Posted by lg...@apache.org.
Added timeout for global response method to avoid infinite waits
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/f45ee0ef
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/f45ee0ef
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/f45ee0ef
Branch: refs/heads/master
Commit: f45ee0ef4ed742d4d620b95169736200422c3544
Parents: 1e09776
Author: Lyor Goldstein <lg...@vmware.com>
Authored: Thu Jan 7 10:34:36 2016 +0200
Committer: Lyor Goldstein <lg...@vmware.com>
Committed: Thu Jan 7 10:34:36 2016 +0200
----------------------------------------------------------------------
.../common/forward/DefaultTcpipForwarder.java | 45 +-
.../sshd/common/future/AbstractSshFuture.java | 5 -
.../sshd/common/future/DefaultSshFuture.java | 5 +-
.../sshd/common/session/AbstractSession.java | 63 +-
.../org/apache/sshd/common/session/Session.java | 15 +-
.../apache/sshd/common/util/GenericUtils.java | 6 +
.../src/test/java/org/apache/sshd/LoadTest.java | 4 +
.../org/apache/sshd/PortForwardingLoadTest.java | 438 -------------
.../org/apache/sshd/PortForwardingTest.java | 610 -------------------
.../apache/sshd/SinglePublicKeyAuthTest.java | 181 ------
.../common/auth/SinglePublicKeyAuthTest.java | 181 ++++++
.../common/forward/PortForwardingLoadTest.java | 442 ++++++++++++++
.../sshd/common/forward/PortForwardingTest.java | 610 +++++++++++++++++++
.../sshd/util/test/BogusInvertedShell.java | 8 +-
14 files changed, 1346 insertions(+), 1267 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
index e177aa9..c94a380 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
@@ -27,12 +27,14 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.sshd.client.channel.ClientChannel;
import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.common.Closeable;
import org.apache.sshd.common.Factory;
import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.PropertyResolverUtils;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.SshdSocketAddress;
@@ -44,6 +46,7 @@ import org.apache.sshd.common.io.IoServiceFactory;
import org.apache.sshd.common.io.IoSession;
import org.apache.sshd.common.session.ConnectionService;
import org.apache.sshd.common.session.Session;
+import org.apache.sshd.common.session.SessionHolder;
import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.Readable;
import org.apache.sshd.common.util.ValidateUtils;
@@ -53,11 +56,26 @@ import org.apache.sshd.common.util.closeable.AbstractInnerCloseable;
import org.apache.sshd.server.forward.ForwardingFilter;
/**
- * TODO Add javadoc
+ * Requests a "tcpip-forward" action
*
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public class DefaultTcpipForwarder extends AbstractInnerCloseable implements TcpipForwarder {
+public class DefaultTcpipForwarder
+ extends AbstractInnerCloseable
+ implements TcpipForwarder, SessionHolder<Session> {
+
+ /**
+ * Used to configure the timeout (milliseconds) for receiving a response
+ * for the forwarding request
+ *
+ * @see #DEFAULT_FORWARD_REQUEST_TIMEOUT
+ */
+ public static final String FORWARD_REQUEST_TIMEOUT = "tcpip-forward-request-timeout";
+
+ /**
+ * Default value for {@link #FORWARD_REQUEST_TIMEOUT} if none specified
+ */
+ public static final long DEFAULT_FORWARD_REQUEST_TIMEOUT = TimeUnit.SECONDS.toMillis(15L);
private final ConnectionService service;
private final IoHandlerFactory socksProxyIoHandlerFactory = new IoHandlerFactory() {
@@ -66,7 +84,7 @@ public class DefaultTcpipForwarder extends AbstractInnerCloseable implements Tcp
return new SocksProxy(getConnectionService());
}
};
- private final Session session;
+ private final Session sessionInstance;
private final Map<Integer, SshdSocketAddress> localToRemote = new HashMap<>();
private final Map<Integer, SshdSocketAddress> remoteToLocal = new HashMap<>();
private final Map<Integer, SocksProxy> dynamicLocal = new HashMap<>();
@@ -81,7 +99,12 @@ public class DefaultTcpipForwarder extends AbstractInnerCloseable implements Tcp
public DefaultTcpipForwarder(ConnectionService service) {
this.service = ValidateUtils.checkNotNull(service, "No connection service");
- this.session = ValidateUtils.checkNotNull(service.getSession(), "No session");
+ this.sessionInstance = ValidateUtils.checkNotNull(service.getSession(), "No session");
+ }
+
+ @Override
+ public Session getSession() {
+ return sessionInstance;
}
public final ConnectionService getConnectionService() {
@@ -151,12 +174,15 @@ public class DefaultTcpipForwarder extends AbstractInnerCloseable implements Tcp
String remoteHost = remote.getHostName();
int remotePort = remote.getPort();
+ Session session = getSession();
Buffer buffer = session.createBuffer(SshConstants.SSH_MSG_GLOBAL_REQUEST, remoteHost.length() + Long.SIZE);
buffer.putString("tcpip-forward");
buffer.putBoolean(true); // want reply
buffer.putString(remoteHost);
buffer.putInt(remotePort);
- Buffer result = session.request(buffer);
+
+ long timeout = PropertyResolverUtils.getLongProperty(session, FORWARD_REQUEST_TIMEOUT, DEFAULT_FORWARD_REQUEST_TIMEOUT);
+ Buffer result = session.request(buffer, timeout, TimeUnit.MILLISECONDS);
if (result == null) {
throw new SshException("Tcpip forwarding request denied by server");
}
@@ -192,6 +218,7 @@ public class DefaultTcpipForwarder extends AbstractInnerCloseable implements Tcp
}
String remoteHost = remote.getHostName();
+ Session session = getSession();
Buffer buffer = session.createBuffer(SshConstants.SSH_MSG_GLOBAL_REQUEST, remoteHost.length() + Long.SIZE);
buffer.putString("cancel-tcpip-forward");
buffer.putBoolean(false); // want reply
@@ -269,7 +296,8 @@ public class DefaultTcpipForwarder extends AbstractInnerCloseable implements Tcp
ValidateUtils.checkNotNull(local, "Local address is null");
ValidateUtils.checkTrue(local.getPort() >= 0, "Invalid local port: %s", local);
- FactoryManager manager = session.getFactoryManager();
+ Session session = getSession();
+ FactoryManager manager = ValidateUtils.checkNotNull(session.getFactoryManager(), "No factory manager");
ForwardingFilter filter = manager.getTcpipForwardingFilter();
if ((filter == null) || (!filter.canListen(local, session))) {
if (log.isDebugEnabled()) {
@@ -330,7 +358,8 @@ public class DefaultTcpipForwarder extends AbstractInnerCloseable implements Tcp
*/
private InetSocketAddress doBind(SshdSocketAddress address, Factory<? extends IoHandler> handlerFactory) throws IOException {
if (acceptor == null) {
- FactoryManager manager = session.getFactoryManager();
+ Session session = getSession();
+ FactoryManager manager = ValidateUtils.checkNotNull(session.getFactoryManager(), "No factory manager");
IoServiceFactory factory = manager.getIoServiceFactory();
IoHandler handler = handlerFactory.create();
acceptor = factory.createAcceptor(handler);
@@ -365,7 +394,7 @@ public class DefaultTcpipForwarder extends AbstractInnerCloseable implements Tcp
@Override
public String toString() {
- return getClass().getSimpleName() + "[" + session + "]";
+ return getClass().getSimpleName() + "[" + getSession() + "]";
}
//
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/main/java/org/apache/sshd/common/future/AbstractSshFuture.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/future/AbstractSshFuture.java b/sshd-core/src/main/java/org/apache/sshd/common/future/AbstractSshFuture.java
index 2f179e7..8efd65f 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/future/AbstractSshFuture.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/future/AbstractSshFuture.java
@@ -38,11 +38,6 @@ public abstract class AbstractSshFuture<T extends SshFuture> extends AbstractLog
*/
protected static final Object CANCELED = new Object();
- /**
- * A value indicating a {@code null} value
- */
- protected static final Object NULL = new Object();
-
protected AbstractSshFuture() {
super();
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java b/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
index 9e25bcc..8f31339 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
@@ -21,6 +21,7 @@ package org.apache.sshd.common.future;
import java.io.InterruptedIOException;
import java.lang.reflect.Array;
+import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.ValidateUtils;
/**
@@ -96,7 +97,7 @@ public class DefaultSshFuture<T extends SshFuture> extends AbstractSshFuture<T>
return;
}
- result = newValue != null ? newValue : NULL;
+ result = (newValue != null) ? newValue : GenericUtils.NULL;
lock.notifyAll();
}
@@ -109,7 +110,7 @@ public class DefaultSshFuture<T extends SshFuture> extends AbstractSshFuture<T>
*/
public Object getValue() {
synchronized (lock) {
- return result == NULL ? null : result;
+ return (result == GenericUtils.NULL) ? null : result;
}
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
index 92f5843..e5eaa54 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
@@ -20,6 +20,7 @@ package org.apache.sshd.common.session;
import java.io.IOException;
import java.io.InterruptedIOException;
+import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
@@ -171,7 +172,6 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
protected final Object encodeLock = new Object();
protected final Object decodeLock = new Object();
protected final Object requestLock = new Object();
- protected final AtomicReference<Buffer> requestResult = new AtomicReference<>();
protected final Map<AttributeKey<?>, Object> attributes = new ConcurrentHashMap<>();
// Session timeout measurements
@@ -210,6 +210,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
*/
private final FactoryManager factoryManager;
private final Map<String, Object> properties = new ConcurrentHashMap<>();
+ private final AtomicReference<Object> requestResult = new AtomicReference<>();
/**
* Create a new session.
@@ -771,6 +772,12 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
}
}
+ // if anyone waiting for global response notify them about the closing session
+ synchronized (requestResult) {
+ requestResult.set(GenericUtils.NULL);
+ requestResult.notify();
+ }
+
// Fire 'close' event
SessionListener listener = getSessionListenerProxy();
try {
@@ -934,28 +941,54 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
return count;
}
- /**
- * Send a global request and wait for the response.
- * This must only be used when sending a SSH_MSG_GLOBAL_REQUEST with a result expected,
- * else it will wait forever.
- *
- * @param buffer the buffer containing the global request
- * @return <code>true</code> if the request was successful, <code>false</code> otherwise.
- * @throws IOException if an error occurred when encoding sending the packet
- */
@Override
- public Buffer request(Buffer buffer) throws IOException {
+ public Buffer request(Buffer buffer, long timeout, TimeUnit unit) throws IOException {
+ ValidateUtils.checkTrue(timeout > 0L, "Non-positive timeout requested: %d", timeout);
+
+ long maxWaitMillis = TimeUnit.MILLISECONDS.convert(timeout, unit);
+ if (maxWaitMillis <= 0L) {
+ throw new IllegalArgumentException("Requested timeout below 1 msec: " + timeout + " " + unit);
+ }
+
+ Object result;
synchronized (requestLock) {
try {
+ writePacket(buffer);
+
synchronized (requestResult) {
- writePacket(buffer);
- requestResult.wait();
- return requestResult.get();
+ while (isOpen() && (maxWaitMillis > 0L) && (requestResult.get() == null)) {
+ long waitStart = System.nanoTime();
+ requestResult.wait(maxWaitMillis);
+ long waitEnd = System.nanoTime();
+ long waitDuration = waitEnd - waitStart;
+ long waitMillis = TimeUnit.NANOSECONDS.toMillis(waitDuration);
+ if (waitMillis > 0L) {
+ maxWaitMillis -= waitMillis;
+ } else {
+ maxWaitMillis--;
+ }
+ }
+
+ result = requestResult.getAndSet(null);
}
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException("Interrupted while waiting for request result").initCause(e);
}
}
+
+ if (!isOpen()) {
+ throw new IOException("Session is closed or closing");
+ }
+
+ if (result == null) {
+ throw new SocketTimeoutException("No response received after " + timeout + " " + unit);
+ }
+
+ if (result instanceof Buffer) {
+ return (Buffer) result;
+ }
+
+ return null;
}
@Override
@@ -1690,7 +1723,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
protected void requestFailure(Buffer buffer) throws Exception {
synchronized (requestResult) {
- requestResult.set(null);
+ requestResult.set(GenericUtils.NULL);
resetIdleTimeout();
requestResult.notify();
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
index 1138310..d47553b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
@@ -182,7 +182,7 @@ public interface Session
*
* @param buffer the buffer to encode and send
* @return a future that can be used to check when the packet has actually been sent
- * @throws java.io.IOException if an error occurred when encoding sending the packet
+ * @throws IOException if an error occurred when encoding sending the packet
*/
IoWriteFuture writePacket(Buffer buffer) throws IOException;
@@ -196,20 +196,21 @@ public interface Session
* @param timeout the timeout
* @param unit the time unit of the timeout parameter
* @return a future that can be used to check when the packet has actually been sent
- * @throws java.io.IOException if an error occurred when encoding sending the packet
+ * @throws IOException if an error occurred when encoding sending the packet
*/
IoWriteFuture writePacket(Buffer buffer, long timeout, TimeUnit unit) throws IOException;
/**
- * Send a global request and wait for the response.
- * This must only be used when sending a SSH_MSG_GLOBAL_REQUEST with a result expected,
- * else it will wait forever.
+ * Send a global request and wait for the response. This must only be used when sending
+ * a {@code SSH_MSG_GLOBAL_REQUEST} with a result expected, else it will time out
*
* @param buffer the buffer containing the global request
+ * @param timeout The number of time units to wait - must be <U>positive</U>
+ * @param unit The {@link TimeUnit} to wait for the response
* @return the return buffer if the request was successful, {@code null} otherwise.
- * @throws java.io.IOException if an error occurred when encoding sending the packet
+ * @throws IOException if an error occurred when encoding sending the packet
*/
- Buffer request(Buffer buffer) throws IOException;
+ Buffer request(Buffer buffer, long timeout, TimeUnit unit) throws IOException;
/**
* Handle any exceptions that occurred on this session.
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/main/java/org/apache/sshd/common/util/GenericUtils.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/GenericUtils.java b/sshd-core/src/main/java/org/apache/sshd/common/util/GenericUtils.java
index ef06c54..988254b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/GenericUtils.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/GenericUtils.java
@@ -54,6 +54,12 @@ public final class GenericUtils {
public static final Object[] EMPTY_OBJECT_ARRAY = {};
/**
+ * A value indicating a {@code null} value - to be used as a placeholder
+ * where {@code null}s are not allowed
+ */
+ public static final Object NULL = new Object();
+
+ /**
* The complement of {@link String#CASE_INSENSITIVE_ORDER}
*/
public static final Comparator<String> CASE_SENSITIVE_ORDER = new Comparator<String>() {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/test/java/org/apache/sshd/LoadTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/LoadTest.java b/sshd-core/src/test/java/org/apache/sshd/LoadTest.java
index 4d900c7..63272bd 100644
--- a/sshd-core/src/test/java/org/apache/sshd/LoadTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/LoadTest.java
@@ -54,6 +54,10 @@ public class LoadTest extends BaseTestSupport {
private SshServer sshd;
private int port;
+ public LoadTest() {
+ super();
+ }
+
@Before
public void setUp() throws Exception {
sshd = setupTestServer();
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/test/java/org/apache/sshd/PortForwardingLoadTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/PortForwardingLoadTest.java b/sshd-core/src/test/java/org/apache/sshd/PortForwardingLoadTest.java
deleted file mode 100644
index fe89fba..0000000
--- a/sshd-core/src/test/java/org/apache/sshd/PortForwardingLoadTest.java
+++ /dev/null
@@ -1,438 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sshd;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.httpclient.HostConfiguration;
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.HttpVersion;
-import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
-import org.apache.commons.httpclient.methods.GetMethod;
-import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.service.IoAcceptor;
-import org.apache.mina.core.service.IoHandlerAdapter;
-import org.apache.mina.core.session.IoSession;
-import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
-import org.apache.sshd.server.SshServer;
-import org.apache.sshd.server.forward.AcceptAllForwardingFilter;
-import org.apache.sshd.util.test.BaseTestSupport;
-import org.apache.sshd.util.test.JSchLogger;
-import org.apache.sshd.util.test.SimpleUserInfo;
-import org.apache.sshd.util.test.Utils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.FixMethodOrder;
-import org.junit.Test;
-import org.junit.runners.MethodSorters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.jcraft.jsch.JSch;
-import com.jcraft.jsch.JSchException;
-import com.jcraft.jsch.Session;
-
-/**
- * Port forwarding tests
- */
-@FixMethodOrder(MethodSorters.NAME_ASCENDING)
-public class PortForwardingLoadTest extends BaseTestSupport {
- private final Logger log = LoggerFactory.getLogger(getClass());
- private SshServer sshd;
- private int sshPort;
- private int echoPort;
- private IoAcceptor acceptor;
-
- public PortForwardingLoadTest() {
- super();
- }
-
- @BeforeClass
- public static void jschInit() {
- JSchLogger.init();
- }
-
- @Before
- public void setUp() throws Exception {
- sshd = setupTestServer();
- sshd.setTcpipForwardingFilter(AcceptAllForwardingFilter.INSTANCE);
- sshd.start();
- sshPort = sshd.getPort();
-
- NioSocketAcceptor acceptor = new NioSocketAcceptor();
- acceptor.setHandler(new IoHandlerAdapter() {
- @Override
- public void messageReceived(IoSession session, Object message) throws Exception {
- IoBuffer recv = (IoBuffer) message;
- IoBuffer sent = IoBuffer.allocate(recv.remaining());
- sent.put(recv);
- sent.flip();
- session.write(sent);
- }
- });
- acceptor.setReuseAddress(true);
- acceptor.bind(new InetSocketAddress(0));
- echoPort = acceptor.getLocalAddress().getPort();
- this.acceptor = acceptor;
-
- }
-
- @After
- public void tearDown() throws Exception {
- if (sshd != null) {
- sshd.stop(true);
- }
- if (acceptor != null) {
- acceptor.dispose(true);
- }
- }
-
- @Test
- public void testLocalForwardingPayload() throws Exception {
- final int NUM_ITERATIONS = 100;
- final String PAYLOAD_TMP = "This is significantly longer Test Data. This is significantly " +
- "longer Test Data. This is significantly longer Test Data. This is significantly " +
- "longer Test Data. This is significantly longer Test Data. This is significantly " +
- "longer Test Data. This is significantly longer Test Data. This is significantly " +
- "longer Test Data. This is significantly longer Test Data. This is significantly " +
- "longer Test Data. ";
- StringBuilder sb = new StringBuilder(PAYLOAD_TMP.length() * 1000);
- for (int i = 0; i < 1000; i++) {
- sb.append(PAYLOAD_TMP);
- }
- final String PAYLOAD = sb.toString();
-
- Session session = createSession();
- try (final ServerSocket ss = new ServerSocket()) {
- ss.setReuseAddress(true);
- ss.bind(new InetSocketAddress((InetAddress) null, 0));
- int forwardedPort = ss.getLocalPort();
- int sinkPort = session.setPortForwardingL(0, TEST_LOCALHOST, forwardedPort);
- final AtomicInteger conCount = new AtomicInteger(0);
-
- Thread tAcceptor = new Thread(getCurrentTestName() + "Acceptor") {
- @SuppressWarnings("synthetic-access")
- @Override
- public void run() {
- try {
- byte[] buf = new byte[8192];
- log.info("Started...");
- for (int i = 0; i < NUM_ITERATIONS; ++i) {
- try (Socket s = ss.accept()) {
- conCount.incrementAndGet();
-
- try (InputStream sockIn = s.getInputStream();
- ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-
- int l;
- while ((baos.size() < PAYLOAD.length()) && ((l = sockIn.read(buf)) > 0)) {
- baos.write(buf, 0, l);
- }
-
- assertEquals("Mismatched received data at iteration #" + i, PAYLOAD, baos.toString());
-
- try (InputStream inputCopy = new ByteArrayInputStream(baos.toByteArray());
- OutputStream sockOut = s.getOutputStream()) {
-
- while ((l = sockIn.read(buf)) > 0) {
- sockOut.write(buf, 0, l);
- }
- }
- }
- }
- }
- log.info("Done");
- } catch (Exception e) {
- log.error("Failed to complete run loop", e);
- }
- }
- };
- tAcceptor.start();
- Thread.sleep(50);
-
- byte[] buf = new byte[8192];
- byte[] bytes = PAYLOAD.getBytes(StandardCharsets.UTF_8);
- for (int i = 0; i < NUM_ITERATIONS; i++) {
- log.info("Iteration {}", Integer.valueOf(i));
- try (Socket s = new Socket(TEST_LOCALHOST, sinkPort);
- OutputStream sockOut = s.getOutputStream()) {
-
- s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
-
- sockOut.write(bytes);
- sockOut.flush();
-
- try (InputStream sockIn = s.getInputStream();
- ByteArrayOutputStream baos = new ByteArrayOutputStream(bytes.length)) {
- int l;
- while ((baos.size() < PAYLOAD.length()) && ((l = sockIn.read(buf)) > 0)) {
- baos.write(buf, 0, l);
- }
- assertEquals("Mismatched payload at iteration #" + i, PAYLOAD, baos.toString());
- }
- } catch (Exception e) {
- log.error("Error in iteration #" + i, e);
- }
- }
- session.delPortForwardingL(sinkPort);
-
- ss.close();
- tAcceptor.join(TimeUnit.SECONDS.toMillis(5L));
- } finally {
- session.disconnect();
- }
- }
-
- @Test
- public void testRemoteForwardingPayload() throws Exception {
- final int NUM_ITERATIONS = 100;
- final String PAYLOAD = "This is significantly longer Test Data. This is significantly " +
- "longer Test Data. This is significantly longer Test Data. This is significantly " +
- "longer Test Data. This is significantly longer Test Data. This is significantly " +
- "longer Test Data. This is significantly longer Test Data. This is significantly " +
- "longer Test Data. This is significantly longer Test Data. This is significantly " +
- "longer Test Data. ";
- Session session = createSession();
- try (final ServerSocket ss = new ServerSocket()) {
- ss.setReuseAddress(true);
- ss.bind(new InetSocketAddress((InetAddress) null, 0));
- int forwardedPort = ss.getLocalPort();
- int sinkPort = Utils.getFreePort();
- session.setPortForwardingR(sinkPort, TEST_LOCALHOST, forwardedPort);
- final boolean started[] = new boolean[1];
- started[0] = false;
- final AtomicInteger conCount = new AtomicInteger(0);
-
- Thread tWriter = new Thread(getCurrentTestName() + "Writer") {
- @SuppressWarnings("synthetic-access")
- @Override
- public void run() {
- started[0] = true;
- try {
- byte[] bytes = PAYLOAD.getBytes(StandardCharsets.UTF_8);
- for (int i = 0; i < NUM_ITERATIONS; ++i) {
- try (Socket s = ss.accept()) {
- conCount.incrementAndGet();
-
- try (OutputStream sockOut = s.getOutputStream()) {
- sockOut.write(bytes);
- sockOut.flush();
- }
- }
- }
- } catch (Exception e) {
- log.error("Failed to complete run loop", e);
- }
- }
- };
- tWriter.start();
- Thread.sleep(50);
- assertTrue("Server not started", started[0]);
-
- final boolean lenOK[] = new boolean[NUM_ITERATIONS];
- final boolean dataOK[] = new boolean[NUM_ITERATIONS];
- byte b2[] = new byte[PAYLOAD.length()];
- byte b1[] = new byte[b2.length / 2];
-
- for (int i = 0; i < NUM_ITERATIONS; i++) {
- final int ii = i;
- try (Socket s = new Socket(TEST_LOCALHOST, sinkPort);
- InputStream sockIn = s.getInputStream()) {
-
- s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
-
- int read1 = sockIn.read(b1);
- String part1 = new String(b1, 0, read1);
- Thread.sleep(50);
-
- int read2 = sockIn.read(b2);
- String part2 = new String(b2, 0, read2);
- int totalRead = read1 + read2;
- lenOK[ii] = PAYLOAD.length() == totalRead;
-
- String readData = part1 + part2;
- dataOK[ii] = PAYLOAD.equals(readData);
- if (!lenOK[ii]) {
- throw new IndexOutOfBoundsException("Mismatched length: expected=" + PAYLOAD.length() + ", actual=" + totalRead);
- }
-
- if (!dataOK[ii]) {
- throw new IllegalStateException("Mismatched content");
- }
- } catch (Exception e) {
- log.error("Failed to complete iteration #" + i, e);
- }
- }
- int ok = 0;
- for (int i = 0; i < NUM_ITERATIONS; i++) {
- ok += lenOK[i] ? 1 : 0;
- }
- Thread.sleep(50);
- for (int i = 0; i < NUM_ITERATIONS; i++) {
- assertTrue("Bad length at iteration " + i, lenOK[i]);
- assertTrue("Bad data at iteration " + i, dataOK[i]);
- }
- session.delPortForwardingR(forwardedPort);
- ss.close();
- tWriter.join(TimeUnit.SECONDS.toMillis(5L));
- } finally {
- session.disconnect();
- }
- }
-
- @Test
- public void testForwardingOnLoad() throws Exception {
-// final String path = "/history/recent/troubles/";
-// final String host = "www.bbc.co.uk";
-// final String path = "";
-// final String host = "www.bahn.de";
- final String path = "";
- final String host = TEST_LOCALHOST;
- final int nbThread = 2;
- final int nbDownloads = 2;
- final int nbLoops = 2;
-
- StringBuilder resp = new StringBuilder();
- resp.append("<html><body>\n");
- for (int i = 0; i < 1000; i++) {
- resp.append("0123456789\n");
- }
- resp.append("</body></html>\n");
- final StringBuilder sb = new StringBuilder();
- sb.append("HTTP/1.1 200 OK").append('\n');
- sb.append("Content-Type: text/HTML").append('\n');
- sb.append("Content-Length: ").append(resp.length()).append('\n');
- sb.append('\n');
- sb.append(resp);
- NioSocketAcceptor acceptor = new NioSocketAcceptor();
- acceptor.setHandler(new IoHandlerAdapter() {
- @Override
- public void messageReceived(IoSession session, Object message) throws Exception {
- session.write(IoBuffer.wrap(sb.toString().getBytes(StandardCharsets.UTF_8)));
- }
- });
- acceptor.setReuseAddress(true);
- acceptor.bind(new InetSocketAddress(0));
- final int port = acceptor.getLocalAddress().getPort();
-
- Session session = createSession();
- try {
- final int forwardedPort1 = session.setPortForwardingL(0, host, port);
- final int forwardedPort2 = Utils.getFreePort();
- session.setPortForwardingR(forwardedPort2, TEST_LOCALHOST, forwardedPort1);
- System.err.println("URL: http://localhost:" + forwardedPort2);
-
- final CountDownLatch latch = new CountDownLatch(nbThread * nbDownloads * nbLoops);
- final Thread[] threads = new Thread[nbThread];
- final List<Throwable> errors = new CopyOnWriteArrayList<Throwable>();
- for (int i = 0; i < threads.length; i++) {
- threads[i] = new Thread(getCurrentTestName() + "[" + i + "]") {
- @Override
- public void run() {
- for (int j = 0; j < nbLoops; j++) {
- final MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
- final HttpClient client = new HttpClient(mgr);
- client.getHttpConnectionManager().getParams().setDefaultMaxConnectionsPerHost(100);
- client.getHttpConnectionManager().getParams().setMaxTotalConnections(1000);
- for (int i = 0; i < nbDownloads; i++) {
- try {
- checkHtmlPage(client, new URL("http://localhost:" + forwardedPort2 + path));
- } catch (Throwable e) {
- errors.add(e);
- } finally {
- latch.countDown();
- System.err.println("Remaining: " + latch.getCount());
- }
- }
- mgr.shutdown();
- }
- }
- };
- }
- for (int i = 0; i < threads.length; i++) {
- threads[i].start();
- }
- latch.await();
- for (Throwable t : errors) {
- t.printStackTrace();
- }
- assertEquals(0, errors.size());
- } finally {
- session.disconnect();
- }
- }
-
- protected Session createSession() throws JSchException {
- JSch sch = new JSch();
- Session session = sch.getSession("sshd", TEST_LOCALHOST, sshPort);
- session.setUserInfo(new SimpleUserInfo("sshd"));
- session.connect();
- return session;
- }
-
- protected void checkHtmlPage(HttpClient client, URL url) throws IOException {
- client.setHostConfiguration(new HostConfiguration());
- client.getHostConfiguration().setHost(url.getHost(), url.getPort());
- GetMethod get = new GetMethod("");
- get.getParams().setVersion(HttpVersion.HTTP_1_1);
- client.executeMethod(get);
- String str = get.getResponseBodyAsString();
- if (str.indexOf("</html>") <= 0) {
- System.err.println(str);
- }
- assertTrue((str.indexOf("</html>") > 0));
- get.releaseConnection();
-// url.openConnection().setDefaultUseCaches(false);
-// Reader reader = new BufferedReader(new InputStreamReader(url.openStream()));
-// try {
-// StringWriter sw = new StringWriter();
-// char[] buf = new char[8192];
-// while (true) {
-// int len = reader.read(buf);
-// if (len < 0) {
-// break;
-// }
-// sw.write(buf, 0, len);
-// }
-// assertTrue(sw.toString().indexOf("</html>") > 0);
-// } finally {
-// reader.close();
-// }
- }
-
-
-}
-
-
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java b/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java
deleted file mode 100644
index 3b5ddb2..0000000
--- a/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java
+++ /dev/null
@@ -1,610 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sshd;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.charset.StandardCharsets;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.service.IoAcceptor;
-import org.apache.mina.core.service.IoHandlerAdapter;
-import org.apache.mina.core.session.IoSession;
-import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
-import org.apache.sshd.client.SshClient;
-import org.apache.sshd.client.channel.ChannelDirectTcpip;
-import org.apache.sshd.client.session.ClientSession;
-import org.apache.sshd.common.FactoryManager;
-import org.apache.sshd.common.PropertyResolverUtils;
-import org.apache.sshd.common.SshdSocketAddress;
-import org.apache.sshd.common.forward.TcpipForwarder;
-import org.apache.sshd.common.forward.TcpipForwarderFactory;
-import org.apache.sshd.common.session.ConnectionService;
-import org.apache.sshd.common.util.GenericUtils;
-import org.apache.sshd.common.util.ValidateUtils;
-import org.apache.sshd.server.SshServer;
-import org.apache.sshd.server.forward.AcceptAllForwardingFilter;
-import org.apache.sshd.server.global.CancelTcpipForwardHandler;
-import org.apache.sshd.server.global.TcpipForwardHandler;
-import org.apache.sshd.util.test.BaseTestSupport;
-import org.apache.sshd.util.test.JSchLogger;
-import org.apache.sshd.util.test.SimpleUserInfo;
-import org.apache.sshd.util.test.Utils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.FixMethodOrder;
-import org.junit.Test;
-import org.junit.runners.MethodSorters;
-import org.slf4j.LoggerFactory;
-
-import com.jcraft.jsch.JSch;
-import com.jcraft.jsch.JSchException;
-import com.jcraft.jsch.Session;
-
-/**
- * Port forwarding tests
- */
-@FixMethodOrder(MethodSorters.NAME_ASCENDING)
-public class PortForwardingTest extends BaseTestSupport {
-
- private final org.slf4j.Logger log = LoggerFactory.getLogger(getClass());
-
- private final BlockingQueue<String> requestsQ = new LinkedBlockingDeque<String>();
-
- private SshServer sshd;
- private int sshPort;
- private int echoPort;
- private IoAcceptor acceptor;
- private SshClient client;
-
- public PortForwardingTest() {
- super();
- }
-
- @BeforeClass
- public static void jschInit() {
- JSchLogger.init();
- }
-
- @Before
- public void setUp() throws Exception {
- sshd = setupTestServer();
- PropertyResolverUtils.updateProperty(sshd, FactoryManager.WINDOW_SIZE, 2048);
- PropertyResolverUtils.updateProperty(sshd, FactoryManager.MAX_PACKET_SIZE, 256);
- sshd.setTcpipForwardingFilter(AcceptAllForwardingFilter.INSTANCE);
- sshd.start();
-
- if (!requestsQ.isEmpty()) {
- requestsQ.clear();
- }
-
- final TcpipForwarderFactory factory = ValidateUtils.checkNotNull(sshd.getTcpipForwarderFactory(), "No TcpipForwarderFactory");
- sshd.setTcpipForwarderFactory(new TcpipForwarderFactory() {
- private final Class<?>[] interfaces = {TcpipForwarder.class};
- private final Map<String, String> method2req = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER) {
- private static final long serialVersionUID = 1L; // we're not serializing it...
-
- {
- put("localPortForwardingRequested", TcpipForwardHandler.REQUEST);
- put("localPortForwardingCancelled", CancelTcpipForwardHandler.REQUEST);
- }
- };
-
- @Override
- public TcpipForwarder create(ConnectionService service) {
- Thread thread = Thread.currentThread();
- ClassLoader cl = thread.getContextClassLoader();
-
- final TcpipForwarder forwarder = factory.create(service);
- return (TcpipForwarder) Proxy.newProxyInstance(cl, interfaces, new InvocationHandler() {
- @SuppressWarnings("synthetic-access")
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- Object result = method.invoke(forwarder, args);
- String name = method.getName();
- String request = method2req.get(name);
- if (GenericUtils.length(request) > 0) {
- if (requestsQ.offer(request)) {
- log.info("Signal " + request);
- } else {
- log.error("Failed to offer request=" + request);
- }
- }
- return result;
- }
- });
- }
- });
- sshPort = sshd.getPort();
-
- NioSocketAcceptor acceptor = new NioSocketAcceptor();
- acceptor.setHandler(new IoHandlerAdapter() {
- @Override
- public void messageReceived(IoSession session, Object message) throws Exception {
- IoBuffer recv = (IoBuffer) message;
- IoBuffer sent = IoBuffer.allocate(recv.remaining());
- sent.put(recv);
- sent.flip();
- session.write(sent);
- }
- });
- acceptor.setReuseAddress(true);
- acceptor.bind(new InetSocketAddress(0));
- echoPort = acceptor.getLocalAddress().getPort();
- this.acceptor = acceptor;
- }
-
- private void waitForForwardingRequest(String expected, long timeout) throws InterruptedException {
- for (long remaining = timeout; remaining > 0L; ) {
- long waitStart = System.currentTimeMillis();
- String actual = requestsQ.poll(remaining, TimeUnit.MILLISECONDS);
- long waitEnd = System.currentTimeMillis();
- if (GenericUtils.isEmpty(actual)) {
- throw new IllegalStateException("Failed to retrieve request=" + expected);
- }
-
- if (expected.equals(actual)) {
- return;
- }
-
- long waitDuration = waitEnd - waitStart;
- remaining -= waitDuration;
- }
-
- throw new IllegalStateException("Timeout while waiting to retrieve request=" + expected);
- }
-
- @After
- public void tearDown() throws Exception {
- if (sshd != null) {
- sshd.stop(true);
- }
- if (acceptor != null) {
- acceptor.dispose(true);
- }
- if (client != null) {
- client.stop();
- }
- }
-
- @Test
- public void testRemoteForwarding() throws Exception {
- Session session = createSession();
- try {
- int forwardedPort = Utils.getFreePort();
- session.setPortForwardingR(forwardedPort, TEST_LOCALHOST, echoPort);
- waitForForwardingRequest(TcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
-
- try (Socket s = new Socket(TEST_LOCALHOST, forwardedPort);
- OutputStream output = s.getOutputStream();
- InputStream input = s.getInputStream()) {
-
- s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
-
- String expected = getCurrentTestName();
- byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
- output.write(bytes);
- output.flush();
-
- byte[] buf = new byte[bytes.length + Long.SIZE];
- int n = input.read(buf);
- String res = new String(buf, 0, n, StandardCharsets.UTF_8);
- assertEquals("Mismatched data", expected, res);
- } finally {
- session.delPortForwardingR(forwardedPort);
- }
- } finally {
- session.disconnect();
- }
- }
-
- @Test
- public void testRemoteForwardingSecondTimeInSameSession() throws Exception {
- Session session = createSession();
- try {
- int forwardedPort = Utils.getFreePort();
- session.setPortForwardingR(forwardedPort, TEST_LOCALHOST, echoPort);
- waitForForwardingRequest(TcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
-
- session.delPortForwardingR(TEST_LOCALHOST, forwardedPort);
- waitForForwardingRequest(CancelTcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
-
- session.setPortForwardingR(forwardedPort, TEST_LOCALHOST, echoPort);
- waitForForwardingRequest(TcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
-
- try (Socket s = new Socket(TEST_LOCALHOST, forwardedPort);
- OutputStream output = s.getOutputStream();
- InputStream input = s.getInputStream()) {
-
- s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
-
- String expected = getCurrentTestName();
- byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
- output.write(bytes);
- output.flush();
-
- byte[] buf = new byte[bytes.length + Long.SIZE];
- int n = input.read(buf);
- String res = new String(buf, 0, n, StandardCharsets.UTF_8);
- assertEquals("Mismatched data", expected, res);
- } finally {
- session.delPortForwardingR(TEST_LOCALHOST, forwardedPort);
- }
- } finally {
- session.disconnect();
- }
- }
-
- @Test
- public void testRemoteForwardingNative() throws Exception {
- try (ClientSession session = createNativeSession()) {
- SshdSocketAddress remote = new SshdSocketAddress("", 0);
- SshdSocketAddress local = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
- SshdSocketAddress bound = session.startRemotePortForwarding(remote, local);
-
- try (Socket s = new Socket(bound.getHostName(), bound.getPort());
- OutputStream output = s.getOutputStream();
- InputStream input = s.getInputStream()) {
-
- s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
-
- String expected = getCurrentTestName();
- byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
- output.write(bytes);
- output.flush();
-
- byte[] buf = new byte[bytes.length + Long.SIZE];
- int n = input.read(buf);
- String res = new String(buf, 0, n);
- assertEquals("Mismatched data", expected, res);
- } finally {
- session.stopRemotePortForwarding(remote);
- }
- }
- }
-
- @Test
- public void testRemoteForwardingNativeBigPayload() throws Exception {
- try (ClientSession session = createNativeSession()) {
- SshdSocketAddress remote = new SshdSocketAddress("", 0);
- SshdSocketAddress local = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
- SshdSocketAddress bound = session.startRemotePortForwarding(remote, local);
-
- try (Socket s = new Socket(bound.getHostName(), bound.getPort());
- OutputStream output = s.getOutputStream();
- InputStream input = s.getInputStream()) {
-
- s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
-
- String expected = getCurrentTestName();
- byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
- byte[] buf = new byte[bytes.length + Long.SIZE];
-
- for (int i = 0; i < 1000; i++) {
- output.write(bytes);
- output.flush();
-
- int n = input.read(buf);
- String res = new String(buf, 0, n);
- assertEquals("Mismatched data at iteration #" + i, expected, res);
- }
- } finally {
- session.stopRemotePortForwarding(remote);
- }
- }
- }
-
- @Test
- public void testLocalForwarding() throws Exception {
- Session session = createSession();
- try {
- int forwardedPort = Utils.getFreePort();
- session.setPortForwardingL(forwardedPort, TEST_LOCALHOST, echoPort);
-
- try (Socket s = new Socket(TEST_LOCALHOST, forwardedPort);
- OutputStream output = s.getOutputStream();
- InputStream input = s.getInputStream()) {
-
- s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
-
- String expected = getCurrentTestName();
- byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
-
- output.write(bytes);
- output.flush();
-
- byte[] buf = new byte[bytes.length + Long.SIZE];
- int n = input.read(buf);
- String res = new String(buf, 0, n);
- assertEquals("Mismatched data", expected, res);
- } finally {
- session.delPortForwardingL(forwardedPort);
- }
- } finally {
- session.disconnect();
- }
- }
-
- @Test
- public void testLocalForwardingNative() throws Exception {
- try (ClientSession session = createNativeSession()) {
- SshdSocketAddress local = new SshdSocketAddress("", 0);
- SshdSocketAddress remote = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
- SshdSocketAddress bound = session.startLocalPortForwarding(local, remote);
-
- try (Socket s = new Socket(bound.getHostName(), bound.getPort());
- OutputStream output = s.getOutputStream();
- InputStream input = s.getInputStream()) {
-
- s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
-
- String expected = getCurrentTestName();
- byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
-
- output.write(bytes);
- output.flush();
-
- byte[] buf = new byte[bytes.length + Long.SIZE];
- int n = input.read(buf);
- String res = new String(buf, 0, n);
- assertEquals("Mismatched data", expected, res);
- } finally {
- session.stopLocalPortForwarding(bound);
- }
- }
- }
-
- @Test
- public void testLocalForwardingNativeReuse() throws Exception {
- try (ClientSession session = createNativeSession()) {
- SshdSocketAddress local = new SshdSocketAddress("", 0);
- SshdSocketAddress remote = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
- SshdSocketAddress bound = session.startLocalPortForwarding(local, remote);
-
- session.stopLocalPortForwarding(bound);
-
- SshdSocketAddress bound2 = session.startLocalPortForwarding(local, remote);
- session.stopLocalPortForwarding(bound2);
- }
- }
-
- @Test
- public void testLocalForwardingNativeBigPayload() throws Exception {
- try (ClientSession session = createNativeSession()) {
- String expected = getCurrentTestName();
- byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
- byte[] buf = new byte[bytes.length + Long.SIZE];
-
- SshdSocketAddress local = new SshdSocketAddress("", 0);
- SshdSocketAddress remote = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
- SshdSocketAddress bound = session.startLocalPortForwarding(local, remote);
- try (Socket s = new Socket(bound.getHostName(), bound.getPort());
- OutputStream output = s.getOutputStream();
- InputStream input = s.getInputStream()) {
-
- s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
-
- for (int i = 0; i < 1000; i++) {
- output.write(bytes);
- output.flush();
-
- int n = input.read(buf);
- String res = new String(buf, 0, n);
- assertEquals("Mismatched data at iteration #" + i, expected, res);
- }
- } finally {
- session.stopLocalPortForwarding(bound);
- }
- }
- }
-
- @Test
- public void testForwardingChannel() throws Exception {
- try (ClientSession session = createNativeSession()) {
- SshdSocketAddress local = new SshdSocketAddress("", 0);
- SshdSocketAddress remote = new SshdSocketAddress(TEST_LOCALHOST, echoPort);
-
- try (ChannelDirectTcpip channel = session.createDirectTcpipChannel(local, remote)) {
- channel.open().verify(9L, TimeUnit.SECONDS);
-
- String expected = getCurrentTestName();
- byte[] bytes = expected.getBytes(StandardCharsets.UTF_8);
-
- try (OutputStream output = channel.getInvertedIn();
- InputStream input = channel.getInvertedOut()) {
- output.write(bytes);
- output.flush();
-
- byte[] buf = new byte[bytes.length + Long.SIZE];
- int n = input.read(buf);
- String res = new String(buf, 0, n);
- assertEquals("Mismatched data", expected, res);
- }
- channel.close(false);
- }
- }
- }
-
- @Test(timeout = 45000)
- public void testRemoteForwardingWithDisconnect() throws Exception {
- Session session = createSession();
- try {
- // 1. Create a Port Forward
- int forwardedPort = Utils.getFreePort();
- session.setPortForwardingR(forwardedPort, TEST_LOCALHOST, echoPort);
- waitForForwardingRequest(TcpipForwardHandler.REQUEST, TimeUnit.SECONDS.toMillis(5L));
-
- // 2. Establish a connection through it
- try (Socket s = new Socket(TEST_LOCALHOST, forwardedPort)) {
- s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
-
- // 3. Simulate the client going away
- rudelyDisconnectJschSession(session);
-
- // 4. Make sure the NIOprocessor is not stuck
- {
- Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
- // from here, we need to check all the threads running and find a
- // "NioProcessor-"
- // that is stuck on a PortForward.dispose
- ThreadGroup root = Thread.currentThread().getThreadGroup().getParent();
- while (root.getParent() != null) {
- root = root.getParent();
- }
-
- for (int index = 0; ; index++) {
- Collection<Thread> pending = findThreads(root, "NioProcessor-");
- if (GenericUtils.size(pending) <= 0) {
- log.info("Finished after " + index + " iterations");
- break;
- }
- try {
- Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
- } catch (InterruptedException e) {
- // ignored
- }
- }
- }
-
- session.delPortForwardingR(forwardedPort);
- }
- } finally {
- session.disconnect();
- }
- }
-
- /**
- * Close the socket inside this JSCH session. Use reflection to find it and
- * just close it.
- *
- * @param session the Session to violate
- * @throws Exception
- */
- private void rudelyDisconnectJschSession(Session session) throws Exception {
- Field fSocket = session.getClass().getDeclaredField("socket");
- fSocket.setAccessible(true);
-
- try (Socket socket = (Socket) fSocket.get(session)) {
- assertTrue("socket is not connected", socket.isConnected());
- assertFalse("socket should not be closed", socket.isClosed());
- socket.close();
- assertTrue("socket has not closed", socket.isClosed());
- }
- }
-
- private Set<Thread> findThreads(ThreadGroup group, String name) {
- int numThreads = group.activeCount();
- Thread[] threads = new Thread[numThreads * 2];
- numThreads = group.enumerate(threads, false);
- Set<Thread> ret = new HashSet<Thread>();
-
- // Enumerate each thread in `group'
- for (int i = 0; i < numThreads; ++i) {
- Thread t = threads[i];
- // Get thread
- // log.debug("Thread name: " + threads[i].getName());
- if (checkThreadForPortForward(t, name)) {
- ret.add(t);
- }
- }
- // didn't find the thread to check the
- int numGroups = group.activeGroupCount();
- ThreadGroup[] groups = new ThreadGroup[numGroups * 2];
- numGroups = group.enumerate(groups, false);
- for (int i = 0; i < numGroups; ++i) {
- ThreadGroup g = groups[i];
- Collection<Thread> c = findThreads(g, name);
- if (GenericUtils.isEmpty(c)) {
- continue; // debug breakpoint
- }
- ret.addAll(c);
- }
- return ret;
- }
-
- private boolean checkThreadForPortForward(Thread thread, String name) {
- if (thread == null)
- return false;
- // does it contain the name we're looking for?
- if (thread.getName().contains(name)) {
- // look at the stack
- StackTraceElement[] stack = thread.getStackTrace();
- if (stack.length == 0)
- return false;
- else {
- // does it have
- // 'org.apache.sshd.server.session.TcpipForwardSupport.close'?
- for (int i = 0; i < stack.length; ++i) {
- String clazzName = stack[i].getClassName();
- String methodName = stack[i].getMethodName();
- // log.debug("Class: " + clazzName);
- // log.debug("Method: " + methodName);
- if (clazzName
- .equals("org.apache.sshd.server.session.TcpipForwardSupport")
- && (methodName.equals("close") || methodName
- .equals("sessionCreated"))) {
- log.warn(thread.getName() + " stuck at " + clazzName
- + "." + methodName + ": "
- + stack[i].getLineNumber());
- return true;
- }
- }
- }
- }
- return false;
- }
-
- protected Session createSession() throws JSchException {
- JSch sch = new JSch();
- Session session = sch.getSession(getCurrentTestName(), TEST_LOCALHOST, sshPort);
- session.setUserInfo(new SimpleUserInfo(getCurrentTestName()));
- session.connect();
- return session;
- }
-
- protected ClientSession createNativeSession() throws Exception {
- client = setupTestClient();
- PropertyResolverUtils.updateProperty(client, FactoryManager.WINDOW_SIZE, 2048);
- PropertyResolverUtils.updateProperty(client, FactoryManager.MAX_PACKET_SIZE, 256);
- client.setTcpipForwardingFilter(AcceptAllForwardingFilter.INSTANCE);
- client.start();
-
- ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, sshPort).verify(7L, TimeUnit.SECONDS).getSession();
- session.addPasswordIdentity(getCurrentTestName());
- session.auth().verify(11L, TimeUnit.SECONDS);
- return session;
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/test/java/org/apache/sshd/SinglePublicKeyAuthTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/SinglePublicKeyAuthTest.java b/sshd-core/src/test/java/org/apache/sshd/SinglePublicKeyAuthTest.java
deleted file mode 100644
index d6a4561..0000000
--- a/sshd-core/src/test/java/org/apache/sshd/SinglePublicKeyAuthTest.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sshd;
-
-import java.security.KeyPair;
-import java.security.PublicKey;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.sshd.client.SshClient;
-import org.apache.sshd.client.future.AuthFuture;
-import org.apache.sshd.client.session.ClientSession;
-import org.apache.sshd.common.PropertyResolverUtils;
-import org.apache.sshd.common.config.keys.KeyUtils;
-import org.apache.sshd.common.keyprovider.KeyPairProvider;
-import org.apache.sshd.server.ServerFactoryManager;
-import org.apache.sshd.server.SshServer;
-import org.apache.sshd.server.auth.pubkey.CachingPublicKeyAuthenticator;
-import org.apache.sshd.server.auth.pubkey.PublickeyAuthenticator;
-import org.apache.sshd.server.auth.pubkey.UserAuthPublicKeyFactory;
-import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
-import org.apache.sshd.server.session.ServerSession;
-import org.apache.sshd.util.test.BaseTestSupport;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.FixMethodOrder;
-import org.junit.Test;
-import org.junit.runners.MethodSorters;
-
-/**
- * TODO Add javadoc
- *
- * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
- */
-@FixMethodOrder(MethodSorters.NAME_ASCENDING)
-public class SinglePublicKeyAuthTest extends BaseTestSupport {
-
- private SshServer sshd;
- private int port;
- private KeyPair pairRsa = createTestHostKeyProvider().loadKey(KeyPairProvider.SSH_RSA);
- private KeyPair pairRsaBad;
- private PublickeyAuthenticator delegate;
-
- public SinglePublicKeyAuthTest() {
- SimpleGeneratorHostKeyProvider provider = new SimpleGeneratorHostKeyProvider();
- provider.setAlgorithm("RSA");
- pairRsaBad = provider.loadKey(KeyPairProvider.SSH_RSA);
- }
-
- @Before
- public void setUp() throws Exception {
- sshd = setupTestServer();
- PropertyResolverUtils.updateProperty(sshd, ServerFactoryManager.AUTH_METHODS, UserAuthPublicKeyFactory.NAME);
- sshd.setPublickeyAuthenticator(new PublickeyAuthenticator() {
- @SuppressWarnings("synthetic-access")
- @Override
- public boolean authenticate(String username, PublicKey key, ServerSession session) {
- return delegate.authenticate(username, key, session);
- }
- });
- sshd.start();
- port = sshd.getPort();
- }
-
- @After
- public void tearDown() throws Exception {
- if (sshd != null) {
- sshd.stop(true);
- }
- }
-
- @Test
- public void testPublicKeyAuthWithCache() throws Exception {
- final ConcurrentHashMap<String, AtomicInteger> count = new ConcurrentHashMap<String, AtomicInteger>();
- TestCachingPublicKeyAuthenticator auth = new TestCachingPublicKeyAuthenticator(new PublickeyAuthenticator() {
- @SuppressWarnings("synthetic-access")
- @Override
- public boolean authenticate(String username, PublicKey key, ServerSession session) {
- String fp = KeyUtils.getFingerPrint(key);
- count.putIfAbsent(fp, new AtomicInteger());
- count.get(fp).incrementAndGet();
- return key.equals(pairRsa.getPublic());
- }
- });
- delegate = auth;
-
- try (SshClient client = setupTestClient()) {
- client.start();
-
- try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession()) {
- session.addPublicKeyIdentity(pairRsaBad);
- session.addPublicKeyIdentity(pairRsa);
- session.auth().verify(5L, TimeUnit.SECONDS);
-
- assertEquals("Mismatched authentication invocations count", 2, count.size());
-
- String fpBad = KeyUtils.getFingerPrint(pairRsaBad.getPublic());
- String fpGood = KeyUtils.getFingerPrint(pairRsa.getPublic());
- assertTrue("Missing bad public key", count.containsKey(fpBad));
- assertTrue("Missing good public key", count.containsKey(fpGood));
- assertEquals("Mismatched bad key authentication attempts", 1, count.get(fpBad).get());
- assertEquals("Mismatched good key authentication attempts", 1, count.get(fpGood).get());
- } finally {
- client.stop();
- }
- }
-
- Thread.sleep(100L);
- assertTrue("Cache not empty", auth.getCache().isEmpty());
- }
-
- @Test
- public void testPublicKeyAuthWithoutCache() throws Exception {
- final ConcurrentHashMap<String, AtomicInteger> count = new ConcurrentHashMap<String, AtomicInteger>();
- delegate = new PublickeyAuthenticator() {
- @SuppressWarnings("synthetic-access")
- @Override
- public boolean authenticate(String username, PublicKey key, ServerSession session) {
- String fp = KeyUtils.getFingerPrint(key);
- count.putIfAbsent(fp, new AtomicInteger());
- count.get(fp).incrementAndGet();
- return key.equals(pairRsa.getPublic());
- }
- };
-
- try (SshClient client = setupTestClient()) {
- client.start();
-
- try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession()) {
- session.addPublicKeyIdentity(pairRsaBad);
- session.addPublicKeyIdentity(pairRsa);
-
- AuthFuture auth = session.auth();
- assertTrue("Failed to authenticate on time", auth.await(5L, TimeUnit.SECONDS));
- assertTrue("Authentication failed", auth.isSuccess());
- } finally {
- client.stop();
- }
- }
-
- assertEquals("Mismatched attempted keys count", 2, count.size());
-
- String badFingerPrint = KeyUtils.getFingerPrint(pairRsaBad.getPublic());
- Number badIndex = count.get(badFingerPrint);
- assertNotNull("Missing bad RSA key", badIndex);
- assertEquals("Mismatched attempt index for bad key", 1, badIndex.intValue());
-
- String goodFingerPrint = KeyUtils.getFingerPrint(pairRsa.getPublic());
- Number goodIndex = count.get(goodFingerPrint);
- assertNotNull("Missing good RSA key", goodIndex);
- assertEquals("Mismatched attempt index for good key", 2, goodIndex.intValue());
- }
-
- public static class TestCachingPublicKeyAuthenticator extends CachingPublicKeyAuthenticator {
- public TestCachingPublicKeyAuthenticator(PublickeyAuthenticator authenticator) {
- super(authenticator);
- }
-
- public Map<ServerSession, Map<PublicKey, Boolean>> getCache() {
- return cache;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f45ee0ef/sshd-core/src/test/java/org/apache/sshd/common/auth/SinglePublicKeyAuthTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/auth/SinglePublicKeyAuthTest.java b/sshd-core/src/test/java/org/apache/sshd/common/auth/SinglePublicKeyAuthTest.java
new file mode 100644
index 0000000..f1ca19c
--- /dev/null
+++ b/sshd-core/src/test/java/org/apache/sshd/common/auth/SinglePublicKeyAuthTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.auth;
+
+import java.security.KeyPair;
+import java.security.PublicKey;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.sshd.client.SshClient;
+import org.apache.sshd.client.future.AuthFuture;
+import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.common.PropertyResolverUtils;
+import org.apache.sshd.common.config.keys.KeyUtils;
+import org.apache.sshd.common.keyprovider.KeyPairProvider;
+import org.apache.sshd.server.ServerFactoryManager;
+import org.apache.sshd.server.SshServer;
+import org.apache.sshd.server.auth.pubkey.CachingPublicKeyAuthenticator;
+import org.apache.sshd.server.auth.pubkey.PublickeyAuthenticator;
+import org.apache.sshd.server.auth.pubkey.UserAuthPublicKeyFactory;
+import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
+import org.apache.sshd.server.session.ServerSession;
+import org.apache.sshd.util.test.BaseTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+/**
+ * TODO Add javadoc
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class SinglePublicKeyAuthTest extends BaseTestSupport {
+
+ private SshServer sshd;
+ private int port;
+ private KeyPair pairRsa = createTestHostKeyProvider().loadKey(KeyPairProvider.SSH_RSA);
+ private KeyPair pairRsaBad;
+ private PublickeyAuthenticator delegate;
+
+ public SinglePublicKeyAuthTest() {
+ SimpleGeneratorHostKeyProvider provider = new SimpleGeneratorHostKeyProvider();
+ provider.setAlgorithm("RSA");
+ pairRsaBad = provider.loadKey(KeyPairProvider.SSH_RSA);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ sshd = setupTestServer();
+ PropertyResolverUtils.updateProperty(sshd, ServerFactoryManager.AUTH_METHODS, UserAuthPublicKeyFactory.NAME);
+ sshd.setPublickeyAuthenticator(new PublickeyAuthenticator() {
+ @SuppressWarnings("synthetic-access")
+ @Override
+ public boolean authenticate(String username, PublicKey key, ServerSession session) {
+ return delegate.authenticate(username, key, session);
+ }
+ });
+ sshd.start();
+ port = sshd.getPort();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (sshd != null) {
+ sshd.stop(true);
+ }
+ }
+
+ @Test
+ public void testPublicKeyAuthWithCache() throws Exception {
+ final ConcurrentHashMap<String, AtomicInteger> count = new ConcurrentHashMap<String, AtomicInteger>();
+ TestCachingPublicKeyAuthenticator auth = new TestCachingPublicKeyAuthenticator(new PublickeyAuthenticator() {
+ @SuppressWarnings("synthetic-access")
+ @Override
+ public boolean authenticate(String username, PublicKey key, ServerSession session) {
+ String fp = KeyUtils.getFingerPrint(key);
+ count.putIfAbsent(fp, new AtomicInteger());
+ count.get(fp).incrementAndGet();
+ return key.equals(pairRsa.getPublic());
+ }
+ });
+ delegate = auth;
+
+ try (SshClient client = setupTestClient()) {
+ client.start();
+
+ try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession()) {
+ session.addPublicKeyIdentity(pairRsaBad);
+ session.addPublicKeyIdentity(pairRsa);
+ session.auth().verify(5L, TimeUnit.SECONDS);
+
+ assertEquals("Mismatched authentication invocations count", 2, count.size());
+
+ String fpBad = KeyUtils.getFingerPrint(pairRsaBad.getPublic());
+ String fpGood = KeyUtils.getFingerPrint(pairRsa.getPublic());
+ assertTrue("Missing bad public key", count.containsKey(fpBad));
+ assertTrue("Missing good public key", count.containsKey(fpGood));
+ assertEquals("Mismatched bad key authentication attempts", 1, count.get(fpBad).get());
+ assertEquals("Mismatched good key authentication attempts", 1, count.get(fpGood).get());
+ } finally {
+ client.stop();
+ }
+ }
+
+ Thread.sleep(100L);
+ assertTrue("Cache not empty", auth.getCache().isEmpty());
+ }
+
+ @Test
+ public void testPublicKeyAuthWithoutCache() throws Exception {
+ final ConcurrentHashMap<String, AtomicInteger> count = new ConcurrentHashMap<String, AtomicInteger>();
+ delegate = new PublickeyAuthenticator() {
+ @SuppressWarnings("synthetic-access")
+ @Override
+ public boolean authenticate(String username, PublicKey key, ServerSession session) {
+ String fp = KeyUtils.getFingerPrint(key);
+ count.putIfAbsent(fp, new AtomicInteger());
+ count.get(fp).incrementAndGet();
+ return key.equals(pairRsa.getPublic());
+ }
+ };
+
+ try (SshClient client = setupTestClient()) {
+ client.start();
+
+ try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession()) {
+ session.addPublicKeyIdentity(pairRsaBad);
+ session.addPublicKeyIdentity(pairRsa);
+
+ AuthFuture auth = session.auth();
+ assertTrue("Failed to authenticate on time", auth.await(5L, TimeUnit.SECONDS));
+ assertTrue("Authentication failed", auth.isSuccess());
+ } finally {
+ client.stop();
+ }
+ }
+
+ assertEquals("Mismatched attempted keys count", 2, count.size());
+
+ String badFingerPrint = KeyUtils.getFingerPrint(pairRsaBad.getPublic());
+ Number badIndex = count.get(badFingerPrint);
+ assertNotNull("Missing bad RSA key", badIndex);
+ assertEquals("Mismatched attempt index for bad key", 1, badIndex.intValue());
+
+ String goodFingerPrint = KeyUtils.getFingerPrint(pairRsa.getPublic());
+ Number goodIndex = count.get(goodFingerPrint);
+ assertNotNull("Missing good RSA key", goodIndex);
+ assertEquals("Mismatched attempt index for good key", 2, goodIndex.intValue());
+ }
+
+ public static class TestCachingPublicKeyAuthenticator extends CachingPublicKeyAuthenticator {
+ public TestCachingPublicKeyAuthenticator(PublickeyAuthenticator authenticator) {
+ super(authenticator);
+ }
+
+ public Map<ServerSession, Map<PublicKey, Boolean>> getCache() {
+ return cache;
+ }
+ }
+}
\ No newline at end of file