You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2018/04/16 12:49:28 UTC
[1/5] mina-sshd git commit: [SSHD-721] I/O workers exhaustion in
tcpip forwarding
Repository: mina-sshd
Updated Branches:
refs/heads/master 251db9b9d -> 721f399bd
[SSHD-721] I/O workers exhaustion in tcpip forwarding
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/ca578eb1
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/ca578eb1
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/ca578eb1
Branch: refs/heads/master
Commit: ca578eb1c74d94ad89e2d4ab0d94489eace7a0c6
Parents: 3c9efa8
Author: Guillaume Nodet <gn...@apache.org>
Authored: Sun Apr 15 21:01:09 2018 +0200
Committer: Guillaume Nodet <gn...@apache.org>
Committed: Sun Apr 15 21:01:09 2018 +0200
----------------------------------------------------------------------
.../common/forward/DefaultForwardingFilter.java | 24 +++++++++++++-------
1 file changed, 16 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/ca578eb1/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java
index 7936415..751f73a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java
@@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.sshd.client.channel.ClientChannelEvent;
+import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.common.Closeable;
import org.apache.sshd.common.Factory;
import org.apache.sshd.common.FactoryManager;
@@ -979,15 +980,22 @@ public class DefaultForwardingFilter
session, channel, totalMessages, message.available());
}
- Collection<ClientChannelEvent> result = channel.waitFor(STATIC_IO_MSG_RECEIVED_EVENTS, Long.MAX_VALUE);
- if (traceEnabled) {
- log.trace("messageReceived({}) channel={}, count={}, len={} wait result: {}",
- session, channel, totalMessages, message.available(), result);
+ OpenFuture future = channel.getOpenFuture();
+ if (future.isOpened()) {
+ OutputStream outputStream = channel.getInvertedIn();
+ outputStream.write(buffer.array(), buffer.rpos(), buffer.available());
+ outputStream.flush();
+ } else {
+ future.addListener(f -> {
+ try {
+ OutputStream outputStream = channel.getInvertedIn();
+ outputStream.write(buffer.array(), buffer.rpos(), buffer.available());
+ outputStream.flush();
+ } catch (IOException e) {
+ channel.getSession().exceptionCaught(e);
+ }
+ });
}
-
- OutputStream outputStream = channel.getInvertedIn();
- outputStream.write(buffer.array(), buffer.rpos(), buffer.available());
- outputStream.flush();
}
@Override
[4/5] mina-sshd git commit: Fix SSHD-721 unit test formatting
Posted by gn...@apache.org.
Fix SSHD-721 unit test formatting
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/999c9868
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/999c9868
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/999c9868
Branch: refs/heads/master
Commit: 999c98689013a7f041ed235e7438633281c1a021
Parents: f029d16
Author: Guillaume Nodet <gn...@apache.org>
Authored: Mon Apr 16 14:42:38 2018 +0200
Committer: Guillaume Nodet <gn...@apache.org>
Committed: Mon Apr 16 14:42:38 2018 +0200
----------------------------------------------------------------------
.../forward/ConcurrentConnectionTest.java | 442 +++++++++----------
1 file changed, 209 insertions(+), 233 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/999c9868/sshd-core/src/test/java/org/apache/sshd/common/forward/ConcurrentConnectionTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/forward/ConcurrentConnectionTest.java b/sshd-core/src/test/java/org/apache/sshd/common/forward/ConcurrentConnectionTest.java
index 38434df..89a282c 100644
--- a/sshd-core/src/test/java/org/apache/sshd/common/forward/ConcurrentConnectionTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/common/forward/ConcurrentConnectionTest.java
@@ -29,11 +29,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.session.ClientSession;
-import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.net.SshdSocketAddress;
import org.apache.sshd.server.SshServer;
import org.apache.sshd.server.forward.AcceptAllForwardingFilter;
-import org.apache.sshd.server.forward.ForwardingFilter;
import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
import org.apache.sshd.util.test.BaseTestSupport;
import org.junit.After;
@@ -48,234 +46,212 @@ import org.slf4j.LoggerFactory;
* Port forwarding test multiple clients connecting at once.
*/
public class ConcurrentConnectionTest extends BaseTestSupport {
- private static final byte[] PAYLOAD_TO_SERVER = "To Server -> To Server -> To Server".getBytes();
- private static final byte[] PAYLOAD_TO_CLIENT = "<- To Client <- To Client <-".getBytes();
- private final static Logger LOG = LoggerFactory.getLogger(ConcurrentConnectionTest.class);
-
- // These are the critical test parameters.
- // When the number of clients is greater than or equal to the number of IO
- // Workers, the server deadlocks
- private static final int SSHD_NIO_WORKERS = 8;
- private static final int PORT_FORWARD_CLIENT_COUNT = 12;
-
- // For very large numbers of clients and small numbers of threads this may
- // need to be increased
- private static final int TIMEOUT = (int) TimeUnit.SECONDS.toMillis(10L);
-
- // Test Server State
- private int testServerPort;
- private ServerSocket testServerSock;
- private Thread testServerThread;
-
- // SSHD Server State
- private static int sshServerPort;
- private static SshServer server;
-
- // SSH Client State
- private ClientSession session;
-
- /*
- * Start a server to forward to.
- *
- * Reads PAYLOAD_TO_SERVER from client and then sends PAYLOAD_TO_CLIENT to
- * client. This server emulates a web server, closely enough for thie test
- */
- @Before
- public void startTestServer() throws Exception {
- final AtomicInteger activeServers = new AtomicInteger(0);
- testServerThread = new Thread(() -> {
- try {
- testServerSock = new ServerSocket(0);
- testServerPort = testServerSock.getLocalPort();
- LOG.debug("Listening on {}", testServerPort);
- while (true) {
- final Socket s = testServerSock.accept();
- LOG.debug("Got connection");
- final Thread server = new Thread(() -> {
- try {
- LOG.debug("Active Servers: {}", activeServers.incrementAndGet());
- final byte[] buf = new byte[PAYLOAD_TO_SERVER.length];
- final long r = s.getInputStream().read(buf);
- LOG.debug("Read {} payload from client", r);
- s.getOutputStream().write(PAYLOAD_TO_CLIENT);
- LOG.debug("Wrote payload to client");
- s.close();
- LOG.debug("Active Servers: {}", activeServers.decrementAndGet());
- } catch (final Throwable t) {
- LOG.error("Error", t);
- }
- });
- server.setDaemon(true);
- server.setName("Server " + s.getPort());
- server.start();
- }
- } catch (final SocketException e) {
- LOG.debug("Shutting down test server");
- } catch (final Throwable t) {
- LOG.error("Error", t);
- }
- });
- testServerThread.setDaemon(true);
- testServerThread.setName("Server Acceptor");
- testServerThread.start();
- Thread.sleep(100);
- }
-
- @After
- public void stopTestServer() throws Exception {
- testServerSock.close();
- testServerThread.interrupt();
- }
-
- @BeforeClass
- public static void startSshServer() throws IOException {
- LOG.debug("Starting SSHD...");
- server = SshServer.setUpDefaultServer();
- server.setPasswordAuthenticator((u, p, s) -> true);
- server.setForwardingFilter(AcceptAllForwardingFilter.INSTANCE);
- server.setKeyPairProvider(new SimpleGeneratorHostKeyProvider());
- server.setNioWorkers(SSHD_NIO_WORKERS);
- server.setForwardingFilter(new ForwardingFilter() {
-
- @Override
- public boolean canListen(SshdSocketAddress address, Session session) {
- // TODO Auto-generated method stub
- return true;
- }
-
- @Override
- public boolean canConnect(Type type, SshdSocketAddress address, Session session) {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public boolean canForwardX11(Session session, String requestType) {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public boolean canForwardAgent(Session session, String requestType) {
- // TODO Auto-generated method stub
- return false;
- }
- });
- server.start();
- sshServerPort = server.getPort();
- LOG.debug("SSHD Running on port {}", server.getPort());
- }
-
- @AfterClass
- public static void stopServer() throws IOException {
- if (!server.close(true).await(TIMEOUT)) {
- LOG.warn("Failed to close server within {} sec.", TimeUnit.MILLISECONDS.toSeconds(TIMEOUT));
- }
- }
-
- @Before
- public void createClient() throws IOException {
- final SshClient client = SshClient.setUpDefaultClient();
- client.setForwardingFilter(AcceptAllForwardingFilter.INSTANCE);
- client.start();
- LOG.debug("Connecting...");
- session = client.connect("user", TEST_LOCALHOST, sshServerPort).verify(TIMEOUT).getSession();
- LOG.debug("Authenticating...");
- session.addPasswordIdentity("foo");
- session.auth().verify(TIMEOUT);
- LOG.debug("Authenticated");
- }
-
- @After
- public void stopClient() throws Exception {
- LOG.debug("Disconnecting Client");
- try {
- assertTrue("Failed to close session", session.close(true).await(TIMEOUT));
- } finally {
- session = null;
- }
- }
-
- @Test
- /*
- * Run PORT_FORWARD_CLIENT_COUNT simultaneous server threads.
- *
- * Emulates a web browser making a number of simultaneous requests on
- * different connections to the same server HTTP specifies no more than two,
- * but most modern browsers do 6 or more.
- */
- public void testConcurrentConnectionsToPortForward() throws Exception {
- final SshdSocketAddress remote = new SshdSocketAddress(TEST_LOCALHOST, 0);
- final SshdSocketAddress local = new SshdSocketAddress(TEST_LOCALHOST, testServerPort);
- final SshdSocketAddress bound = session.startRemotePortForwarding(remote, local);
- final int forwardedPort = bound.getPort();
-
- final CyclicBarrier b = new CyclicBarrier(PORT_FORWARD_CLIENT_COUNT, () -> {
- LOG.debug("And away we go.");
- });
-
- final AtomicInteger success = new AtomicInteger(0);
- final AtomicInteger fail = new AtomicInteger(0);
- final long[] bytesRead = new long[PORT_FORWARD_CLIENT_COUNT];
-
- for (int i = 0; i < PORT_FORWARD_CLIENT_COUNT; i++) {
- final long wait = 100 * i;
- final int n = i;
- final Thread t = new Thread(() -> {
- try {
- bytesRead[n] = makeClientRequest(forwardedPort, b, wait);
- LOG.debug("Complete, received full payload from server.");
- success.incrementAndGet();
- } catch (final Exception e) {
- fail.incrementAndGet();
- LOG.error("Error in client code", e);
- }
- });
- t.setName("Client " + i);
- t.setDaemon(true);
- t.start();
- }
-
- while (true) {
- if (success.get() + fail.get() == PORT_FORWARD_CLIENT_COUNT) {
- break;
- }
- Thread.sleep(100);
- }
-
- for (int i = 0; i < PORT_FORWARD_CLIENT_COUNT; i++) {
- assertEquals("Mismatched data length read from server for client " + i, PAYLOAD_TO_CLIENT.length,
- bytesRead[i]);
- }
-
- assertEquals("Not all clients succeeded", PORT_FORWARD_CLIENT_COUNT, success.get());
- }
-
- /**
- * Send PAYLOAD_TO_SERVER to the server, then read PAYLOAD_TO_CLIENT from
- * server. Emulates a web browser making a request
- */
- private long makeClientRequest(final int serverPort, final CyclicBarrier barrier, final long wait)
- throws Exception {
- outputDebugMessage("readInLoop(port=%d)", serverPort);
-
- final Socket s = new Socket();
- s.setSoTimeout(TIMEOUT);
-
- barrier.await();
-
- s.connect(new InetSocketAddress(TEST_LOCALHOST, serverPort));
-
- s.getOutputStream().write(PAYLOAD_TO_SERVER);
-
- final byte[] buf = new byte[PAYLOAD_TO_CLIENT.length];
- final long r = s.getInputStream().read(buf);
- LOG.debug("Read {} payload from server", r);
-
- assertEquals("Mismatched data length", PAYLOAD_TO_CLIENT.length, r);
- s.close();
-
- return r;
- }
-
-}
\ No newline at end of file
+ private static final byte[] PAYLOAD_TO_SERVER = "To Server -> To Server -> To Server".getBytes();
+ private static final byte[] PAYLOAD_TO_CLIENT = "<- To Client <- To Client <-".getBytes();
+ private static final Logger LOG = LoggerFactory.getLogger(ConcurrentConnectionTest.class);
+
+ // These are the critical test parameters.
+ // When the number of clients is greater than or equal to the number of IO
+ // Workers, the server deadlocks
+ private static final int SSHD_NIO_WORKERS = 8;
+ private static final int PORT_FORWARD_CLIENT_COUNT = 12;
+
+ // For very large numbers of clients and small numbers of threads this may
+ // need to be increased
+ private static final int TIMEOUT = (int) TimeUnit.SECONDS.toMillis(10L);
+
+ // SSHD Server State
+ private static int sshServerPort;
+ private static SshServer server;
+
+ // Test Server State
+ private int testServerPort;
+ private ServerSocket testServerSock;
+ private Thread testServerThread;
+
+ // SSH Client State
+ private ClientSession session;
+
+ /*
+ * Start a server to forward to.
+ *
+ * Reads PAYLOAD_TO_SERVER from client and then sends PAYLOAD_TO_CLIENT to
+ * client. This server emulates a web server, closely enough for thie test
+ */
+ @Before
+ public void startTestServer() throws Exception {
+ testServerThread = new Thread(this::serverAcceptLoop);
+ testServerThread.setDaemon(true);
+ testServerThread.setName("Server Acceptor");
+ testServerThread.start();
+ Thread.sleep(100);
+ }
+
+ protected void serverAcceptLoop() {
+ try {
+ final AtomicInteger activeServers = new AtomicInteger(0);
+ testServerSock = new ServerSocket(0);
+ testServerPort = testServerSock.getLocalPort();
+ LOG.debug("Listening on {}", testServerPort);
+ while (true) {
+ final Socket s = testServerSock.accept();
+ LOG.debug("Got connection");
+ final Thread server = new Thread(() -> serverSocketLoop(activeServers, s));
+ server.setDaemon(true);
+ server.setName("Server " + s.getPort());
+ server.start();
+ }
+ } catch (final SocketException e) {
+ LOG.debug("Shutting down test server");
+ } catch (final Throwable t) {
+ LOG.error("Error", t);
+ }
+ }
+
+ private void serverSocketLoop(AtomicInteger activeServers, Socket s) {
+ try {
+ LOG.debug("Active Servers: {}", activeServers.incrementAndGet());
+ final byte[] buf = new byte[PAYLOAD_TO_SERVER.length];
+ final long r = s.getInputStream().read(buf);
+ LOG.debug("Read {} payload from client", r);
+ s.getOutputStream().write(PAYLOAD_TO_CLIENT);
+ LOG.debug("Wrote payload to client");
+ s.close();
+ LOG.debug("Active Servers: {}", activeServers.decrementAndGet());
+ } catch (final Throwable t) {
+ LOG.error("Error", t);
+ }
+ }
+
+ @After
+ public void stopTestServer() throws Exception {
+ testServerSock.close();
+ testServerThread.interrupt();
+ }
+
+ @BeforeClass
+ public static void startSshServer() throws IOException {
+ LOG.debug("Starting SSHD...");
+ server = SshServer.setUpDefaultServer();
+ server.setPasswordAuthenticator((u, p, s) -> true);
+ server.setKeyPairProvider(new SimpleGeneratorHostKeyProvider());
+ server.setNioWorkers(SSHD_NIO_WORKERS);
+ server.setForwardingFilter(AcceptAllForwardingFilter.INSTANCE);
+ server.start();
+ sshServerPort = server.getPort();
+ LOG.debug("SSHD Running on port {}", server.getPort());
+ }
+
+ @AfterClass
+ public static void stopServer() throws IOException {
+ if (!server.close(true).await(TIMEOUT)) {
+ LOG.warn("Failed to close server within {} sec.", TimeUnit.MILLISECONDS.toSeconds(TIMEOUT));
+ }
+ }
+
+ @Before
+ public void createClient() throws IOException {
+ final SshClient client = SshClient.setUpDefaultClient();
+ client.setForwardingFilter(AcceptAllForwardingFilter.INSTANCE);
+ client.start();
+ LOG.debug("Connecting...");
+ session = client.connect("user", TEST_LOCALHOST, sshServerPort).verify(TIMEOUT).getSession();
+ LOG.debug("Authenticating...");
+ session.addPasswordIdentity("foo");
+ session.auth().verify(TIMEOUT);
+ LOG.debug("Authenticated");
+ }
+
+ @After
+ public void stopClient() throws Exception {
+ LOG.debug("Disconnecting Client");
+ try {
+ assertTrue("Failed to close session", session.close(true).await(TIMEOUT));
+ } finally {
+ session = null;
+ }
+ }
+
+ @Test
+ /*
+ * Run PORT_FORWARD_CLIENT_COUNT simultaneous server threads.
+ *
+ * Emulates a web browser making a number of simultaneous requests on
+ * different connections to the same server HTTP specifies no more than two,
+ * but most modern browsers do 6 or more.
+ */
+ public void testConcurrentConnectionsToPortForward() throws Exception {
+ final SshdSocketAddress remote = new SshdSocketAddress(TEST_LOCALHOST, 0);
+ final SshdSocketAddress local = new SshdSocketAddress(TEST_LOCALHOST, testServerPort);
+ final SshdSocketAddress bound = session.startRemotePortForwarding(remote, local);
+ final int forwardedPort = bound.getPort();
+
+ final CyclicBarrier b = new CyclicBarrier(PORT_FORWARD_CLIENT_COUNT, () -> {
+ LOG.debug("And away we go.");
+ });
+
+ final AtomicInteger success = new AtomicInteger(0);
+ final AtomicInteger fail = new AtomicInteger(0);
+ final long[] bytesRead = new long[PORT_FORWARD_CLIENT_COUNT];
+
+ for (int i = 0; i < PORT_FORWARD_CLIENT_COUNT; i++) {
+ final long wait = 100 * i;
+ final int n = i;
+ final Thread t = new Thread(() -> {
+ try {
+ bytesRead[n] = makeClientRequest(forwardedPort, b, wait);
+ LOG.debug("Complete, received full payload from server.");
+ success.incrementAndGet();
+ } catch (final Exception e) {
+ fail.incrementAndGet();
+ LOG.error("Error in client code", e);
+ }
+ });
+ t.setName("Client " + i);
+ t.setDaemon(true);
+ t.start();
+ }
+
+ while (true) {
+ if (success.get() + fail.get() == PORT_FORWARD_CLIENT_COUNT) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+
+ for (int i = 0; i < PORT_FORWARD_CLIENT_COUNT; i++) {
+ assertEquals("Mismatched data length read from server for client " + i, PAYLOAD_TO_CLIENT.length,
+ bytesRead[i]);
+ }
+
+ assertEquals("Not all clients succeeded", PORT_FORWARD_CLIENT_COUNT, success.get());
+ }
+
+ /**
+ * Send PAYLOAD_TO_SERVER to the server, then read PAYLOAD_TO_CLIENT from
+ * server. Emulates a web browser making a request
+ */
+ private long makeClientRequest(final int serverPort, final CyclicBarrier barrier, final long wait)
+ throws Exception {
+ outputDebugMessage("readInLoop(port=%d)", serverPort);
+
+ final Socket s = new Socket();
+ s.setSoTimeout(TIMEOUT);
+
+ barrier.await();
+
+ s.connect(new InetSocketAddress(TEST_LOCALHOST, serverPort));
+
+ s.getOutputStream().write(PAYLOAD_TO_SERVER);
+
+ final byte[] buf = new byte[PAYLOAD_TO_CLIENT.length];
+ final long r = s.getInputStream().read(buf);
+ LOG.debug("Read {} payload from server", r);
+
+ assertEquals("Mismatched data length", PAYLOAD_TO_CLIENT.length, r);
+ s.close();
+
+ return r;
+ }
+
+}
[5/5] mina-sshd git commit: Merge branch 'SSHD-721'
Posted by gn...@apache.org.
Merge branch 'SSHD-721'
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/721f399b
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/721f399b
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/721f399b
Branch: refs/heads/master
Commit: 721f399bd919845a15e678336baabaed2bf068d5
Parents: 251db9b 999c986
Author: Guillaume Nodet <gn...@apache.org>
Authored: Mon Apr 16 14:43:20 2018 +0200
Committer: Guillaume Nodet <gn...@apache.org>
Committed: Mon Apr 16 14:43:20 2018 +0200
----------------------------------------------------------------------
.../common/forward/DefaultForwardingFilter.java | 24 +-
.../forward/ConcurrentConnectionTest.java | 257 +++++++++++++++++++
2 files changed, 273 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
[3/5] mina-sshd git commit: Merge pull request #1 from bkuker/sshd-721
Posted by gn...@apache.org.
Merge pull request #1 from bkuker/sshd-721
[SSHD-721] I/O workers exhaustion in tcpip forwarding, test
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/f029d162
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/f029d162
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/f029d162
Branch: refs/heads/master
Commit: f029d162c2dc732dabfd4b25c5790def917cba77
Parents: ca578eb 5abc47e
Author: Guillaume Nodet <gn...@gmail.com>
Authored: Mon Apr 16 14:30:10 2018 +0200
Committer: GitHub <no...@github.com>
Committed: Mon Apr 16 14:30:10 2018 +0200
----------------------------------------------------------------------
.../forward/ConcurrentConnectionTest.java | 281 +++++++++++++++++++
1 file changed, 281 insertions(+)
----------------------------------------------------------------------
[2/5] mina-sshd git commit: Test for SSHD-721. Many concurrent port
forward connections deadlock.
Posted by gn...@apache.org.
Test for SSHD-721. Many concurrent port forward connections deadlock.
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/5abc47e8
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/5abc47e8
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/5abc47e8
Branch: refs/heads/master
Commit: 5abc47e8cc4655c8d58b6d8b41c291c33c2e13db
Parents: 83925cb
Author: bkuker <bk...@martellotech.com>
Authored: Fri Apr 13 16:19:46 2018 -0400
Committer: bkuker <bk...@martellotech.com>
Committed: Mon Apr 16 08:20:54 2018 -0400
----------------------------------------------------------------------
.../forward/ConcurrentConnectionTest.java | 281 +++++++++++++++++++
1 file changed, 281 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/5abc47e8/sshd-core/src/test/java/org/apache/sshd/common/forward/ConcurrentConnectionTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/forward/ConcurrentConnectionTest.java b/sshd-core/src/test/java/org/apache/sshd/common/forward/ConcurrentConnectionTest.java
new file mode 100644
index 0000000..38434df
--- /dev/null
+++ b/sshd-core/src/test/java/org/apache/sshd/common/forward/ConcurrentConnectionTest.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.forward;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.sshd.client.SshClient;
+import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.common.session.Session;
+import org.apache.sshd.common.util.net.SshdSocketAddress;
+import org.apache.sshd.server.SshServer;
+import org.apache.sshd.server.forward.AcceptAllForwardingFilter;
+import org.apache.sshd.server.forward.ForwardingFilter;
+import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
+import org.apache.sshd.util.test.BaseTestSupport;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Port forwarding test multiple clients connecting at once.
+ */
+public class ConcurrentConnectionTest extends BaseTestSupport {
+ private static final byte[] PAYLOAD_TO_SERVER = "To Server -> To Server -> To Server".getBytes();
+ private static final byte[] PAYLOAD_TO_CLIENT = "<- To Client <- To Client <-".getBytes();
+ private final static Logger LOG = LoggerFactory.getLogger(ConcurrentConnectionTest.class);
+
+ // These are the critical test parameters.
+ // When the number of clients is greater than or equal to the number of IO
+ // Workers, the server deadlocks
+ private static final int SSHD_NIO_WORKERS = 8;
+ private static final int PORT_FORWARD_CLIENT_COUNT = 12;
+
+ // For very large numbers of clients and small numbers of threads this may
+ // need to be increased
+ private static final int TIMEOUT = (int) TimeUnit.SECONDS.toMillis(10L);
+
+ // Test Server State
+ private int testServerPort;
+ private ServerSocket testServerSock;
+ private Thread testServerThread;
+
+ // SSHD Server State
+ private static int sshServerPort;
+ private static SshServer server;
+
+ // SSH Client State
+ private ClientSession session;
+
+ /*
+ * Start a server to forward to.
+ *
+ * Reads PAYLOAD_TO_SERVER from client and then sends PAYLOAD_TO_CLIENT to
+ * client. This server emulates a web server, closely enough for thie test
+ */
+ @Before
+ public void startTestServer() throws Exception {
+ final AtomicInteger activeServers = new AtomicInteger(0);
+ testServerThread = new Thread(() -> {
+ try {
+ testServerSock = new ServerSocket(0);
+ testServerPort = testServerSock.getLocalPort();
+ LOG.debug("Listening on {}", testServerPort);
+ while (true) {
+ final Socket s = testServerSock.accept();
+ LOG.debug("Got connection");
+ final Thread server = new Thread(() -> {
+ try {
+ LOG.debug("Active Servers: {}", activeServers.incrementAndGet());
+ final byte[] buf = new byte[PAYLOAD_TO_SERVER.length];
+ final long r = s.getInputStream().read(buf);
+ LOG.debug("Read {} payload from client", r);
+ s.getOutputStream().write(PAYLOAD_TO_CLIENT);
+ LOG.debug("Wrote payload to client");
+ s.close();
+ LOG.debug("Active Servers: {}", activeServers.decrementAndGet());
+ } catch (final Throwable t) {
+ LOG.error("Error", t);
+ }
+ });
+ server.setDaemon(true);
+ server.setName("Server " + s.getPort());
+ server.start();
+ }
+ } catch (final SocketException e) {
+ LOG.debug("Shutting down test server");
+ } catch (final Throwable t) {
+ LOG.error("Error", t);
+ }
+ });
+ testServerThread.setDaemon(true);
+ testServerThread.setName("Server Acceptor");
+ testServerThread.start();
+ Thread.sleep(100);
+ }
+
+ @After
+ public void stopTestServer() throws Exception {
+ testServerSock.close();
+ testServerThread.interrupt();
+ }
+
+ @BeforeClass
+ public static void startSshServer() throws IOException {
+ LOG.debug("Starting SSHD...");
+ server = SshServer.setUpDefaultServer();
+ server.setPasswordAuthenticator((u, p, s) -> true);
+ server.setForwardingFilter(AcceptAllForwardingFilter.INSTANCE);
+ server.setKeyPairProvider(new SimpleGeneratorHostKeyProvider());
+ server.setNioWorkers(SSHD_NIO_WORKERS);
+ server.setForwardingFilter(new ForwardingFilter() {
+
+ @Override
+ public boolean canListen(SshdSocketAddress address, Session session) {
+ // TODO Auto-generated method stub
+ return true;
+ }
+
+ @Override
+ public boolean canConnect(Type type, SshdSocketAddress address, Session session) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean canForwardX11(Session session, String requestType) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean canForwardAgent(Session session, String requestType) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+ });
+ server.start();
+ sshServerPort = server.getPort();
+ LOG.debug("SSHD Running on port {}", server.getPort());
+ }
+
+ @AfterClass
+ public static void stopServer() throws IOException {
+ if (!server.close(true).await(TIMEOUT)) {
+ LOG.warn("Failed to close server within {} sec.", TimeUnit.MILLISECONDS.toSeconds(TIMEOUT));
+ }
+ }
+
+ @Before
+ public void createClient() throws IOException {
+ final SshClient client = SshClient.setUpDefaultClient();
+ client.setForwardingFilter(AcceptAllForwardingFilter.INSTANCE);
+ client.start();
+ LOG.debug("Connecting...");
+ session = client.connect("user", TEST_LOCALHOST, sshServerPort).verify(TIMEOUT).getSession();
+ LOG.debug("Authenticating...");
+ session.addPasswordIdentity("foo");
+ session.auth().verify(TIMEOUT);
+ LOG.debug("Authenticated");
+ }
+
+ @After
+ public void stopClient() throws Exception {
+ LOG.debug("Disconnecting Client");
+ try {
+ assertTrue("Failed to close session", session.close(true).await(TIMEOUT));
+ } finally {
+ session = null;
+ }
+ }
+
+ @Test
+ /*
+ * Run PORT_FORWARD_CLIENT_COUNT simultaneous server threads.
+ *
+ * Emulates a web browser making a number of simultaneous requests on
+ * different connections to the same server HTTP specifies no more than two,
+ * but most modern browsers do 6 or more.
+ */
+ public void testConcurrentConnectionsToPortForward() throws Exception {
+ final SshdSocketAddress remote = new SshdSocketAddress(TEST_LOCALHOST, 0);
+ final SshdSocketAddress local = new SshdSocketAddress(TEST_LOCALHOST, testServerPort);
+ final SshdSocketAddress bound = session.startRemotePortForwarding(remote, local);
+ final int forwardedPort = bound.getPort();
+
+ final CyclicBarrier b = new CyclicBarrier(PORT_FORWARD_CLIENT_COUNT, () -> {
+ LOG.debug("And away we go.");
+ });
+
+ final AtomicInteger success = new AtomicInteger(0);
+ final AtomicInteger fail = new AtomicInteger(0);
+ final long[] bytesRead = new long[PORT_FORWARD_CLIENT_COUNT];
+
+ for (int i = 0; i < PORT_FORWARD_CLIENT_COUNT; i++) {
+ final long wait = 100 * i;
+ final int n = i;
+ final Thread t = new Thread(() -> {
+ try {
+ bytesRead[n] = makeClientRequest(forwardedPort, b, wait);
+ LOG.debug("Complete, received full payload from server.");
+ success.incrementAndGet();
+ } catch (final Exception e) {
+ fail.incrementAndGet();
+ LOG.error("Error in client code", e);
+ }
+ });
+ t.setName("Client " + i);
+ t.setDaemon(true);
+ t.start();
+ }
+
+ while (true) {
+ if (success.get() + fail.get() == PORT_FORWARD_CLIENT_COUNT) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+
+ for (int i = 0; i < PORT_FORWARD_CLIENT_COUNT; i++) {
+ assertEquals("Mismatched data length read from server for client " + i, PAYLOAD_TO_CLIENT.length,
+ bytesRead[i]);
+ }
+
+ assertEquals("Not all clients succeeded", PORT_FORWARD_CLIENT_COUNT, success.get());
+ }
+
+ /**
+ * Send PAYLOAD_TO_SERVER to the server, then read PAYLOAD_TO_CLIENT from
+ * server. Emulates a web browser making a request
+ */
+ private long makeClientRequest(final int serverPort, final CyclicBarrier barrier, final long wait)
+ throws Exception {
+ outputDebugMessage("readInLoop(port=%d)", serverPort);
+
+ final Socket s = new Socket();
+ s.setSoTimeout(TIMEOUT);
+
+ barrier.await();
+
+ s.connect(new InetSocketAddress(TEST_LOCALHOST, serverPort));
+
+ s.getOutputStream().write(PAYLOAD_TO_SERVER);
+
+ final byte[] buf = new byte[PAYLOAD_TO_CLIENT.length];
+ final long r = s.getInputStream().read(buf);
+ LOG.debug("Read {} payload from server", r);
+
+ assertEquals("Mismatched data length", PAYLOAD_TO_CLIENT.length, r);
+ s.close();
+
+ return r;
+ }
+
+}
\ No newline at end of file