You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2018/08/26 05:51:45 UTC

[3/6] mina-sshd git commit: [SSHD-839] Added more detailed log messages in PortForwardingLoadTest

[SSHD-839] Added more detailed log messages in PortForwardingLoadTest


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/aa7547b7
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/aa7547b7
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/aa7547b7

Branch: refs/heads/master
Commit: aa7547b7b58e49c59302407e9c8f765fa22de695
Parents: 6ddbe1a
Author: Goldstein Lyor <ly...@cb4.com>
Authored: Wed Aug 22 14:09:38 2018 +0300
Committer: Goldstein Lyor <ly...@cb4.com>
Committed: Sun Aug 26 08:50:42 2018 +0300

----------------------------------------------------------------------
 .../common/forward/PortForwardingLoadTest.java  | 124 ++++++++++++++-----
 .../sshd/common/forward/PortForwardingTest.java |  57 +++++----
 2 files changed, 123 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa7547b7/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java
index dd076ae..dc90b6d 100644
--- a/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java
@@ -77,42 +77,48 @@ public class PortForwardingLoadTest extends BaseTestSupport {
     @SuppressWarnings({ "checkstyle:anoninnerlength", "synthetic-access" })
     private final PortForwardingEventListener serverSideListener = new PortForwardingEventListener() {
         @Override
-        public void establishingExplicitTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local,
-                SshdSocketAddress remote, boolean localForwarding) throws IOException {
+        public void establishingExplicitTunnel(
+                org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding)
+                    throws IOException {
             log.info("establishingExplicitTunnel(session={}, local={}, remote={}, localForwarding={})",
-                     session, local, remote, localForwarding);
+                session, local, remote, localForwarding);
         }
 
         @Override
-        public void establishedExplicitTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local,
+        public void establishedExplicitTunnel(
+                org.apache.sshd.common.session.Session session, SshdSocketAddress local,
                 SshdSocketAddress remote, boolean localForwarding, SshdSocketAddress boundAddress, Throwable reason)
-                throws IOException {
+                    throws IOException {
             log.info("establishedExplicitTunnel(session={}, local={}, remote={}, bound={}, localForwarding={}): {}",
-                    session, local, remote, boundAddress, localForwarding, reason);
+                session, local, remote, boundAddress, localForwarding, reason);
         }
 
         @Override
-        public void tearingDownExplicitTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress address,
-                boolean localForwarding) throws IOException {
+        public void tearingDownExplicitTunnel(
+                org.apache.sshd.common.session.Session session, SshdSocketAddress address, boolean localForwarding)
+                    throws IOException {
             log.info("tearingDownExplicitTunnel(session={}, address={}, localForwarding={})", session, address, localForwarding);
         }
 
         @Override
-        public void tornDownExplicitTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress address,
-                boolean localForwarding, Throwable reason) throws IOException {
+        public void tornDownExplicitTunnel(
+                org.apache.sshd.common.session.Session session, SshdSocketAddress address, boolean localForwarding, Throwable reason)
+                    throws IOException {
             log.info("tornDownExplicitTunnel(session={}, address={}, localForwarding={}, reason={})",
-                     session, address, localForwarding, reason);
+                 session, address, localForwarding, reason);
         }
 
         @Override
-        public void establishingDynamicTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local)
-                throws IOException {
+        public void establishingDynamicTunnel(
+                org.apache.sshd.common.session.Session session, SshdSocketAddress local)
+                    throws IOException {
             log.info("establishingDynamicTunnel(session={}, local={})", session, local);
         }
 
         @Override
-        public void establishedDynamicTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local,
-                SshdSocketAddress boundAddress, Throwable reason) throws IOException {
+        public void establishedDynamicTunnel(
+                org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress boundAddress, Throwable reason)
+                    throws IOException {
             log.info("establishedDynamicTunnel(session={}, local={}, bound={}, reason={})", session, local, boundAddress, reason);
         }
 
@@ -123,8 +129,9 @@ public class PortForwardingLoadTest extends BaseTestSupport {
         }
 
         @Override
-        public void tornDownDynamicTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress address,
-                Throwable reason) throws IOException {
+        public void tornDownDynamicTunnel(
+                org.apache.sshd.common.session.Session session, SshdSocketAddress address, Throwable reason)
+                    throws IOException {
             log.info("tornDownDynamicTunnel(session={}, address={}, reason={})", session, address, reason);
         }
     };
@@ -191,7 +198,11 @@ public class PortForwardingLoadTest extends BaseTestSupport {
         for (int i = 0; i < 1000; i++) {
             sb.append(payloadTmpData);
         }
-        final String payload = sb.toString();
+        String payload = sb.toString();
+
+        final byte[] dataBytes = payload.getBytes(StandardCharsets.UTF_8);
+        final int reportPhase = dataBytes.length / 10;
+        log.info("{} using payload size={}", getCurrentTestName(), dataBytes.length);
 
         Session session = createSession();
         try (ServerSocket ss = new ServerSocket()) {
@@ -199,8 +210,11 @@ public class PortForwardingLoadTest extends BaseTestSupport {
             ss.bind(new InetSocketAddress((InetAddress) null, 0));
             int forwardedPort = ss.getLocalPort();
             int sinkPort = session.setPortForwardingL(0, TEST_LOCALHOST, forwardedPort);
-            final AtomicInteger conCount = new AtomicInteger(0);
-            final Semaphore iterationsSignal = new Semaphore(0);
+            log.info("{} forwardedPort={}, sinkPort={}", getCurrentTestName(), forwardedPort, sinkPort);
+
+            AtomicInteger conCount = new AtomicInteger(0);
+            Semaphore iterationsSignal = new Semaphore(0);
+            @SuppressWarnings("checkstyle:anoninnerlength")
             Thread tAcceptor = new Thread(getCurrentTestName() + "Acceptor") {
                 @SuppressWarnings("synthetic-access")
                 @Override
@@ -210,30 +224,44 @@ public class PortForwardingLoadTest extends BaseTestSupport {
                         log.info("Started...");
                         for (int i = 0; i < numIterations; ++i) {
                             try (Socket s = ss.accept()) {
-                                conCount.incrementAndGet();
+                                int totalConns = conCount.incrementAndGet();
+                                log.info("Accepted connection #{} from {}", totalConns, s.getRemoteSocketAddress());
 
                                 try (InputStream sockIn = s.getInputStream();
                                      ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
 
-                                    while (baos.size() < payload.length()) {
+                                    for (int readSize = 0, lastReport = 0; readSize < dataBytes.length;) {
                                         int l = sockIn.read(buf);
                                         if (l < 0) {
                                             break;
                                         }
+
                                         baos.write(buf, 0, l);
+                                        readSize += l;
+
+                                        if ((readSize - lastReport) >= reportPhase) {
+                                            log.info("Read {}/{} bytes of iteration #{}", readSize, dataBytes.length, i);
+                                            lastReport = readSize;
+                                        }
                                     }
 
-                                    assertEquals("Mismatched received data at iteration #" + i, payload, baos.toString());
+                                    assertPayloadEquals("Mismatched received data at iteration #" + i, dataBytes, baos.toByteArray());
 
-                                    try (InputStream inputCopy = new ByteArrayInputStream(baos.toByteArray());
+                                    byte[] outBytes = baos.toByteArray();
+                                    try (InputStream inputCopy = new ByteArrayInputStream(outBytes);
                                          OutputStream sockOut = s.getOutputStream()) {
 
-                                        while (true) {
+                                        for (int writeSize = 0, lastReport = 0; writeSize < outBytes.length;) {
                                             int l = sockIn.read(buf);
                                             if (l < 0) {
                                                 break;
                                             }
                                             sockOut.write(buf, 0, l);
+                                            writeSize += l;
+                                            if ((writeSize - lastReport) >= reportPhase) {
+                                                log.info("Written {}/{} bytes of iteration #{}", writeSize, dataBytes.length, i);
+                                                lastReport = writeSize;
+                                            }
                                         }
                                     }
                                 }
@@ -248,30 +276,38 @@ public class PortForwardingLoadTest extends BaseTestSupport {
                 }
             };
             tAcceptor.start();
-            Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
+            Thread.sleep(TimeUnit.SECONDS.toMillis(3L));
 
             byte[] buf = new byte[8192];
-            byte[] bytes = payload.getBytes(StandardCharsets.UTF_8);
             for (int i = 0; i < numIterations; i++) {
-                log.info("Iteration {}", i);
+                log.info("Iteration {} started", i);
                 try (Socket s = new Socket(TEST_LOCALHOST, sinkPort);
                      OutputStream sockOut = s.getOutputStream()) {
 
+                    log.info("Iteration {} connected to {}", i, s.getRemoteSocketAddress());
                     s.setSoTimeout((int) FactoryManager.DEFAULT_NIO2_MIN_WRITE_TIMEOUT);
 
-                    sockOut.write(bytes);
+                    sockOut.write(dataBytes);
                     sockOut.flush();
 
+                    log.info("Iteration {} awaiting echoed data", i);
                     try (InputStream sockIn = s.getInputStream();
-                         ByteArrayOutputStream baos = new ByteArrayOutputStream(bytes.length)) {
-                        while (baos.size() < payload.length()) {
+                         ByteArrayOutputStream baos = new ByteArrayOutputStream(dataBytes.length)) {
+                        for (int readSize = 0, lastReport = 0; readSize < dataBytes.length;) {
                             int l = sockIn.read(buf);
                             if (l < 0) {
                                 break;
                             }
+
                             baos.write(buf, 0, l);
+                            readSize += l;
+
+                            if ((readSize - lastReport) >= reportPhase) {
+                                log.info("Read {}/{} bytes of iteration #{}", readSize, dataBytes.length, i);
+                                lastReport = readSize;
+                            }
                         }
-                        assertEquals("Mismatched payload at iteration #" + i, payload, baos.toString());
+                        assertPayloadEquals("Mismatched payload at iteration #" + i, dataBytes, baos.toByteArray());
                     }
                 } catch (Exception e) {
                     log.error("Error in iteration #" + i, e);
@@ -280,18 +316,40 @@ public class PortForwardingLoadTest extends BaseTestSupport {
 
             try {
                 assertTrue("Failed to await pending iterations=" + numIterations,
-                           iterationsSignal.tryAcquire(numIterations, numIterations, TimeUnit.SECONDS));
+                   iterationsSignal.tryAcquire(numIterations, numIterations, TimeUnit.SECONDS));
             } finally {
+                log.info("{} remove port forwarding for {}", getCurrentTestName(), sinkPort);
                 session.delPortForwardingL(sinkPort);
             }
 
             ss.close();
+            log.info("{} awaiting acceptor finish", getCurrentTestName());
             tAcceptor.join(TimeUnit.SECONDS.toMillis(11L));
         } finally {
             session.disconnect();
         }
     }
 
+    private static void assertPayloadEquals(String message, byte[] expectedBytes, byte[] actualBytes) {
+        assertEquals(message + ": mismatched payload length", expectedBytes.length, actualBytes.length);
+        for (int index = 0; index < expectedBytes.length; index++) {
+            if (expectedBytes[index] == actualBytes[index]) {
+                continue;
+            }
+
+            int startPos = Math.max(0, index - Byte.SIZE);
+            int endPos = Math.min(startPos + Short.SIZE, expectedBytes.length);
+            if ((endPos - startPos) < Byte.SIZE) {
+                startPos = expectedBytes.length - Byte.SIZE;
+                endPos = expectedBytes.length;
+            }
+
+            String expected = new String(expectedBytes, startPos, endPos - startPos, StandardCharsets.UTF_8);
+            String actual = new String(actualBytes, startPos, endPos - startPos, StandardCharsets.UTF_8);
+            fail("Mismatched data around offset " + index + ": expected='" + expected + "', actual='" + actual + "'");
+        }
+    }
+
     @Test
     public void testRemoteForwardingPayload() throws Exception {
         final int numIterations = 100;

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa7547b7/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java
index 9b88a84..450184e 100644
--- a/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java
@@ -83,31 +83,35 @@ public class PortForwardingTest extends BaseTestSupport {
         private final org.slf4j.Logger log = LoggerFactory.getLogger(PortForwardingEventListener.class);
 
         @Override
-        public void establishingExplicitTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local,
-                SshdSocketAddress remote, boolean localForwarding) throws IOException {
+        public void establishingExplicitTunnel(
+                org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding)
+                    throws IOException {
             log.info("establishingExplicitTunnel(session={}, local={}, remote={}, localForwarding={})",
-                     session, local, remote, localForwarding);
+                 session, local, remote, localForwarding);
         }
 
         @Override
-        public void establishedExplicitTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local,
+        public void establishedExplicitTunnel(
+                org.apache.sshd.common.session.Session session, SshdSocketAddress local,
                 SshdSocketAddress remote, boolean localForwarding, SshdSocketAddress boundAddress, Throwable reason)
-                throws IOException {
+                    throws IOException {
             log.info("establishedExplicitTunnel(session={}, local={}, remote={}, bound={}, localForwarding={}): {}",
-                    session, local, remote, boundAddress, localForwarding, reason);
+                session, local, remote, boundAddress, localForwarding, reason);
         }
 
         @Override
-        public void tearingDownExplicitTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress address,
-                boolean localForwarding) throws IOException {
+        public void tearingDownExplicitTunnel(
+                org.apache.sshd.common.session.Session session, SshdSocketAddress address, boolean localForwarding)
+                    throws IOException {
             log.info("tearingDownExplicitTunnel(session={}, address={}, localForwarding={})", session, address, localForwarding);
         }
 
         @Override
-        public void tornDownExplicitTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress address,
-                boolean localForwarding, Throwable reason) throws IOException {
+        public void tornDownExplicitTunnel(
+                org.apache.sshd.common.session.Session session, SshdSocketAddress address, boolean localForwarding, Throwable reason)
+                    throws IOException {
             log.info("tornDownExplicitTunnel(session={}, address={}, localForwarding={}, reason={})",
-                     session, address, localForwarding, reason);
+                 session, address, localForwarding, reason);
         }
 
         @Override
@@ -117,8 +121,9 @@ public class PortForwardingTest extends BaseTestSupport {
         }
 
         @Override
-        public void establishedDynamicTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local,
-                SshdSocketAddress boundAddress, Throwable reason) throws IOException {
+        public void establishedDynamicTunnel(
+                org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress boundAddress, Throwable reason)
+                    throws IOException {
             log.info("establishedDynamicTunnel(session={}, local={}, bound={}, reason={})", session, local, boundAddress, reason);
         }
 
@@ -129,8 +134,9 @@ public class PortForwardingTest extends BaseTestSupport {
         }
 
         @Override
-        public void tornDownDynamicTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress address,
-                Throwable reason) throws IOException {
+        public void tornDownDynamicTunnel(
+                org.apache.sshd.common.session.Session session, SshdSocketAddress address, Throwable reason)
+                    throws IOException {
             log.info("tornDownDynamicTunnel(session={}, address={}, reason={})", session, address, reason);
         }
     };
@@ -487,30 +493,31 @@ public class PortForwardingTest extends BaseTestSupport {
 
     @Test
     public void testLocalForwardingNative() throws Exception {
-        final AtomicReference<SshdSocketAddress> localAddressHolder = new AtomicReference<>();
-        final AtomicReference<SshdSocketAddress> remoteAddressHolder = new AtomicReference<>();
-        final AtomicReference<SshdSocketAddress> boundAddressHolder = new AtomicReference<>();
-        final AtomicInteger tearDownSignal = new AtomicInteger(0);
+        AtomicReference<SshdSocketAddress> localAddressHolder = new AtomicReference<>();
+        AtomicReference<SshdSocketAddress> remoteAddressHolder = new AtomicReference<>();
+        AtomicReference<SshdSocketAddress> boundAddressHolder = new AtomicReference<>();
+        AtomicInteger tearDownSignal = new AtomicInteger(0);
         @SuppressWarnings("checkstyle:anoninnerlength")
         PortForwardingEventListener listener = new PortForwardingEventListener() {
             @Override
             public void tornDownExplicitTunnel(
                     org.apache.sshd.common.session.Session session, SshdSocketAddress address, boolean localForwarding, Throwable reason)
-                            throws IOException {
+                        throws IOException {
                 assertTrue("Unexpected remote tunnel has been torn down: address=" + address, localForwarding);
                 assertEquals("Tear down indication not invoked", 1, tearDownSignal.get());
             }
 
             @Override
             public void tornDownDynamicTunnel(
-                    org.apache.sshd.common.session.Session session, SshdSocketAddress address, Throwable reason) throws IOException {
+                    org.apache.sshd.common.session.Session session, SshdSocketAddress address, Throwable reason)
+                        throws IOException {
                 throw new UnsupportedOperationException("Unexpected dynamic tunnel torn down indication: session=" + session + ", address=" + address);
             }
 
             @Override
             public void tearingDownExplicitTunnel(
                     org.apache.sshd.common.session.Session session, SshdSocketAddress address, boolean localForwarding)
-                            throws IOException {
+                        throws IOException {
                 assertTrue("Unexpected remote tunnel being torn down: address=" + address, localForwarding);
                 assertEquals("Duplicate tear down signalling", 1, tearDownSignal.incrementAndGet());
             }
@@ -524,7 +531,7 @@ public class PortForwardingTest extends BaseTestSupport {
             @Override
             public void establishingExplicitTunnel(
                     org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding)
-                            throws IOException {
+                        throws IOException {
                 assertTrue("Unexpected remote tunnel being established: local=" + local + ", remote=" + remote, localForwarding);
                 assertNull("Duplicate establishment indication call for local address=" + local, localAddressHolder.getAndSet(local));
                 assertNull("Duplicate establishment indication call for remote address=" + remote, remoteAddressHolder.getAndSet(remote));
@@ -539,7 +546,7 @@ public class PortForwardingTest extends BaseTestSupport {
             @Override
             public void establishedExplicitTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local,
                     SshdSocketAddress remote, boolean localForwarding, SshdSocketAddress boundAddress, Throwable reason)
-                            throws IOException {
+                        throws IOException {
                 assertTrue("Unexpected remote tunnel has been established: local=" + local + ", remote=" + remote + ", bound=" + boundAddress, localForwarding);
                 assertSame("Mismatched established tunnel local address", local, localAddressHolder.get());
                 assertSame("Mismatched established tunnel remote address", remote, remoteAddressHolder.get());
@@ -549,7 +556,7 @@ public class PortForwardingTest extends BaseTestSupport {
             @Override
             public void establishedDynamicTunnel(
                     org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress boundAddress, Throwable reason)
-                            throws IOException {
+                        throws IOException {
                 throw new UnsupportedOperationException("Unexpected dynamic tunnel established indication: session=" + session + ", address=" + boundAddress);
             }
         };