You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2016/07/26 16:49:52 UTC

mina-sshd git commit: [SSHD-651] Add an option to specify read/write Nio2 socket timeouts

Repository: mina-sshd
Updated Branches:
  refs/heads/master 59cad4512 -> 61feea8b3


[SSHD-651] Add an option to specify read/write Nio2 socket timeouts


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/61feea8b
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/61feea8b
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/61feea8b

Branch: refs/heads/master
Commit: 61feea8b37d45c330392a1be904ff31f35bea675
Parents: 59cad45
Author: Lyor Goldstein <ly...@gmail.com>
Authored: Tue Jul 26 19:51:16 2016 +0300
Committer: Lyor Goldstein <ly...@gmail.com>
Committed: Tue Jul 26 19:51:16 2016 +0300

----------------------------------------------------------------------
 .../org/apache/sshd/common/FactoryManager.java  | 24 ++++++++++++++++
 .../apache/sshd/common/io/nio2/Nio2Session.java |  9 ++++--
 .../common/forward/PortForwardingLoadTest.java  | 29 ++++++++++++++------
 3 files changed, 50 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/61feea8b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
index 14c093d..516da0b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
@@ -125,6 +125,30 @@ public interface FactoryManager
     long DEFAULT_IDLE_TIMEOUT = TimeUnit.MINUTES.toMillis(10L);
 
     /**
+     * Key used to retrieve the value of the socket read timeout
+     * for NIO2 session implementation - in milliseconds.
+     * @see #DEFAULT_NIO2_READ_TIMEOUT
+     */
+    String NIO2_READ_TIMEOUT = "nio2-read-timeout";
+
+    /**
+     * Default value for {@link #NIO2_READ_TIMEOUT} if none set
+     */
+    long DEFAULT_NIO2_READ_TIMEOUT = DEFAULT_IDLE_TIMEOUT + TimeUnit.SECONDS.toMillis(15L);
+
+    /**
+     * Minimum NIO2 write wait timeout for a single outgoing
+     * packet - in milliseconds
+     * @see #DEFAULT_NIO2_MIN_WRITE_TIMEOUT
+     */
+    String NIO2_MIN_WRITE_TIMEOUT = "nio2-min-write-timeout";
+
+    /**
+     * Default value for {@link #NIO2_MIN_WRITE_TIMEOUT} if none set
+     */
+    long DEFAULT_NIO2_MIN_WRITE_TIMEOUT = TimeUnit.SECONDS.toMillis(30L);
+
+    /**
      * Key used to retrieve the value of the disconnect timeout which
      * is used when a disconnection is attempted.  If the disconnect
      * message has not been sent before the timeout, the underlying socket

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/61feea8b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
index ba68609..df40722 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -55,7 +56,7 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
     private final Nio2Service service;
     private final IoHandler ioHandler;
     private final AsynchronousSocketChannel socketChannel;
-    private final Map<Object, Object> attributes = new HashMap<Object, Object>();
+    private final Map<Object, Object> attributes = new HashMap<>();
     private final SocketAddress localAddress;
     private final SocketAddress remoteAddress;
     private final FactoryManager manager;
@@ -309,7 +310,8 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
 
     protected void doReadCycle(ByteBuffer buffer, Nio2CompletionHandler<Integer, Object> completion) {
         AsynchronousSocketChannel socket = getSocket();
-        socket.read(buffer, null, completion);
+        long readTimeout = PropertyResolverUtils.getLongProperty(manager, FactoryManager.NIO2_READ_TIMEOUT, FactoryManager.DEFAULT_NIO2_READ_TIMEOUT);
+        socket.read(buffer, readTimeout, TimeUnit.MILLISECONDS, null, completion);
     }
 
     protected void startWriting() {
@@ -337,7 +339,8 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
 
     protected void doWriteCycle(ByteBuffer buffer, Nio2CompletionHandler<Integer, Object> completion) {
         AsynchronousSocketChannel socket = getSocket();
-        socket.write(buffer, null, completion);
+        long writeTimeout = PropertyResolverUtils.getLongProperty(manager, FactoryManager.NIO2_MIN_WRITE_TIMEOUT, FactoryManager.DEFAULT_NIO2_MIN_WRITE_TIMEOUT);
+        socket.write(buffer, writeTimeout, TimeUnit.MILLISECONDS, null, completion);
     }
 
     protected Nio2CompletionHandler<Integer, Object> createWriteCycleCompletionHandler(

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/61feea8b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java
index 1c6e615..a2282c7 100644
--- a/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java
@@ -32,6 +32,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -49,6 +50,7 @@ import org.apache.mina.core.service.IoAcceptor;
 import org.apache.mina.core.service.IoHandlerAdapter;
 import org.apache.mina.core.session.IoSession;
 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.server.SshServer;
 import org.apache.sshd.server.forward.AcceptAllForwardingFilter;
 import org.apache.sshd.util.test.BaseTestSupport;
@@ -140,7 +142,7 @@ public class PortForwardingLoadTest extends BaseTestSupport {
             int forwardedPort = ss.getLocalPort();
             int sinkPort = session.setPortForwardingL(0, TEST_LOCALHOST, forwardedPort);
             final AtomicInteger conCount = new AtomicInteger(0);
-
+            final Semaphore iterationsSignal = new Semaphore(0);
             Thread tAcceptor = new Thread(getCurrentTestName() + "Acceptor") {
                 @SuppressWarnings("synthetic-access")
                 @Override
@@ -178,6 +180,8 @@ public class PortForwardingLoadTest extends BaseTestSupport {
                                     }
                                 }
                             }
+                            log.info("Finished iteration {}", Integer.valueOf(i));
+                            iterationsSignal.release();
                         }
                         log.info("Done");
                     } catch (Exception e) {
@@ -186,7 +190,7 @@ public class PortForwardingLoadTest extends BaseTestSupport {
                 }
             };
             tAcceptor.start();
-            Thread.sleep(50);
+            Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
 
             byte[] buf = new byte[8192];
             byte[] bytes = payload.getBytes(StandardCharsets.UTF_8);
@@ -195,7 +199,7 @@ public class PortForwardingLoadTest extends BaseTestSupport {
                 try (Socket s = new Socket(TEST_LOCALHOST, sinkPort);
                      OutputStream sockOut = s.getOutputStream()) {
 
-                    s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
+                    s.setSoTimeout((int) FactoryManager.DEFAULT_NIO2_MIN_WRITE_TIMEOUT);
 
                     sockOut.write(bytes);
                     sockOut.flush();
@@ -215,10 +219,16 @@ public class PortForwardingLoadTest extends BaseTestSupport {
                     log.error("Error in iteration #" + i, e);
                 }
             }
-            session.delPortForwardingL(sinkPort);
+
+            try {
+                assertTrue("Failed to await pending iterations=" + numIterations,
+                           iterationsSignal.tryAcquire(numIterations, numIterations, TimeUnit.SECONDS));
+            } finally {
+                session.delPortForwardingL(sinkPort);
+            }
 
             ss.close();
-            tAcceptor.join(TimeUnit.SECONDS.toMillis(5L));
+            tAcceptor.join(TimeUnit.SECONDS.toMillis(11L));
         } finally {
             session.disconnect();
         }
@@ -266,7 +276,7 @@ public class PortForwardingLoadTest extends BaseTestSupport {
                 }
             };
             tWriter.start();
-            Thread.sleep(50);
+            Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
             assertTrue("Server not started", started[0]);
 
             final RuntimeException lenOK[] = new RuntimeException[numIterations];
@@ -313,14 +323,15 @@ public class PortForwardingLoadTest extends BaseTestSupport {
                 ok += (lenOK[i] == null) ? 1 : 0;
             }
             log.info("Successful iteration: " + ok + " out of " + numIterations);
-            Thread.sleep(55L);
+            Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
             for (int i = 0; i < numIterations; i++) {
                 assertNull("Bad length at iteration " + i, lenOK[i]);
                 assertNull("Bad data at iteration " + i, dataOK[i]);
             }
+            Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
             session.delPortForwardingR(forwardedPort);
             ss.close();
-            tWriter.join(TimeUnit.SECONDS.toMillis(5L));
+            tWriter.join(TimeUnit.SECONDS.toMillis(11L));
         } finally {
             session.disconnect();
         }
@@ -370,7 +381,7 @@ public class PortForwardingLoadTest extends BaseTestSupport {
 
             final CountDownLatch latch = new CountDownLatch(nbThread * nbDownloads * nbLoops);
             final Thread[] threads = new Thread[nbThread];
-            final List<Throwable> errors = new CopyOnWriteArrayList<Throwable>();
+            final List<Throwable> errors = new CopyOnWriteArrayList<>();
             for (int i = 0; i < threads.length; i++) {
                 threads[i] = new Thread(getCurrentTestName() + "[" + i + "]") {
                     @Override