You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2016/07/26 16:49:52 UTC
mina-sshd git commit: [SSHD-651] Add an option to specify read/write
Nio2 socket timeouts
Repository: mina-sshd
Updated Branches:
refs/heads/master 59cad4512 -> 61feea8b3
[SSHD-651] Add an option to specify read/write Nio2 socket timeouts
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/61feea8b
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/61feea8b
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/61feea8b
Branch: refs/heads/master
Commit: 61feea8b37d45c330392a1be904ff31f35bea675
Parents: 59cad45
Author: Lyor Goldstein <ly...@gmail.com>
Authored: Tue Jul 26 19:51:16 2016 +0300
Committer: Lyor Goldstein <ly...@gmail.com>
Committed: Tue Jul 26 19:51:16 2016 +0300
----------------------------------------------------------------------
.../org/apache/sshd/common/FactoryManager.java | 24 ++++++++++++++++
.../apache/sshd/common/io/nio2/Nio2Session.java | 9 ++++--
.../common/forward/PortForwardingLoadTest.java | 29 ++++++++++++++------
3 files changed, 50 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/61feea8b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
index 14c093d..516da0b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
@@ -125,6 +125,30 @@ public interface FactoryManager
long DEFAULT_IDLE_TIMEOUT = TimeUnit.MINUTES.toMillis(10L);
/**
+ * Key used to retrieve the value of the socket read timeout
+ * for NIO2 session implementation - in milliseconds.
+ * @see #DEFAULT_NIO2_READ_TIMEOUT
+ */
+ String NIO2_READ_TIMEOUT = "nio2-read-timeout";
+
+ /**
+ * Default value for {@link #NIO2_READ_TIMEOUT} if none set
+ */
+ long DEFAULT_NIO2_READ_TIMEOUT = DEFAULT_IDLE_TIMEOUT + TimeUnit.SECONDS.toMillis(15L);
+
+ /**
+ * Minimum NIO2 write wait timeout for a single outgoing
+ * packet - in milliseconds
+ * @see #DEFAULT_NIO2_MIN_WRITE_TIMEOUT
+ */
+ String NIO2_MIN_WRITE_TIMEOUT = "nio2-min-write-timeout";
+
+ /**
+ * Default value for {@link #NIO2_MIN_WRITE_TIMEOUT} if none set
+ */
+ long DEFAULT_NIO2_MIN_WRITE_TIMEOUT = TimeUnit.SECONDS.toMillis(30L);
+
+ /**
* Key used to retrieve the value of the disconnect timeout which
* is used when a disconnection is attempted. If the disconnect
* message has not been sent before the timeout, the underlying socket
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/61feea8b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
index ba68609..df40722 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -55,7 +56,7 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
private final Nio2Service service;
private final IoHandler ioHandler;
private final AsynchronousSocketChannel socketChannel;
- private final Map<Object, Object> attributes = new HashMap<Object, Object>();
+ private final Map<Object, Object> attributes = new HashMap<>();
private final SocketAddress localAddress;
private final SocketAddress remoteAddress;
private final FactoryManager manager;
@@ -309,7 +310,8 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
protected void doReadCycle(ByteBuffer buffer, Nio2CompletionHandler<Integer, Object> completion) {
AsynchronousSocketChannel socket = getSocket();
- socket.read(buffer, null, completion);
+ long readTimeout = PropertyResolverUtils.getLongProperty(manager, FactoryManager.NIO2_READ_TIMEOUT, FactoryManager.DEFAULT_NIO2_READ_TIMEOUT);
+ socket.read(buffer, readTimeout, TimeUnit.MILLISECONDS, null, completion);
}
protected void startWriting() {
@@ -337,7 +339,8 @@ public class Nio2Session extends AbstractCloseable implements IoSession {
protected void doWriteCycle(ByteBuffer buffer, Nio2CompletionHandler<Integer, Object> completion) {
AsynchronousSocketChannel socket = getSocket();
- socket.write(buffer, null, completion);
+ long writeTimeout = PropertyResolverUtils.getLongProperty(manager, FactoryManager.NIO2_MIN_WRITE_TIMEOUT, FactoryManager.DEFAULT_NIO2_MIN_WRITE_TIMEOUT);
+ socket.write(buffer, writeTimeout, TimeUnit.MILLISECONDS, null, completion);
}
protected Nio2CompletionHandler<Integer, Object> createWriteCycleCompletionHandler(
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/61feea8b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java
index 1c6e615..a2282c7 100644
--- a/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java
@@ -32,6 +32,7 @@ import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -49,6 +50,7 @@ import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.apache.sshd.common.FactoryManager;
import org.apache.sshd.server.SshServer;
import org.apache.sshd.server.forward.AcceptAllForwardingFilter;
import org.apache.sshd.util.test.BaseTestSupport;
@@ -140,7 +142,7 @@ public class PortForwardingLoadTest extends BaseTestSupport {
int forwardedPort = ss.getLocalPort();
int sinkPort = session.setPortForwardingL(0, TEST_LOCALHOST, forwardedPort);
final AtomicInteger conCount = new AtomicInteger(0);
-
+ final Semaphore iterationsSignal = new Semaphore(0);
Thread tAcceptor = new Thread(getCurrentTestName() + "Acceptor") {
@SuppressWarnings("synthetic-access")
@Override
@@ -178,6 +180,8 @@ public class PortForwardingLoadTest extends BaseTestSupport {
}
}
}
+ log.info("Finished iteration {}", Integer.valueOf(i));
+ iterationsSignal.release();
}
log.info("Done");
} catch (Exception e) {
@@ -186,7 +190,7 @@ public class PortForwardingLoadTest extends BaseTestSupport {
}
};
tAcceptor.start();
- Thread.sleep(50);
+ Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
byte[] buf = new byte[8192];
byte[] bytes = payload.getBytes(StandardCharsets.UTF_8);
@@ -195,7 +199,7 @@ public class PortForwardingLoadTest extends BaseTestSupport {
try (Socket s = new Socket(TEST_LOCALHOST, sinkPort);
OutputStream sockOut = s.getOutputStream()) {
- s.setSoTimeout((int) TimeUnit.SECONDS.toMillis(10L));
+ s.setSoTimeout((int) FactoryManager.DEFAULT_NIO2_MIN_WRITE_TIMEOUT);
sockOut.write(bytes);
sockOut.flush();
@@ -215,10 +219,16 @@ public class PortForwardingLoadTest extends BaseTestSupport {
log.error("Error in iteration #" + i, e);
}
}
- session.delPortForwardingL(sinkPort);
+
+ try {
+ assertTrue("Failed to await pending iterations=" + numIterations,
+ iterationsSignal.tryAcquire(numIterations, numIterations, TimeUnit.SECONDS));
+ } finally {
+ session.delPortForwardingL(sinkPort);
+ }
ss.close();
- tAcceptor.join(TimeUnit.SECONDS.toMillis(5L));
+ tAcceptor.join(TimeUnit.SECONDS.toMillis(11L));
} finally {
session.disconnect();
}
@@ -266,7 +276,7 @@ public class PortForwardingLoadTest extends BaseTestSupport {
}
};
tWriter.start();
- Thread.sleep(50);
+ Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
assertTrue("Server not started", started[0]);
final RuntimeException lenOK[] = new RuntimeException[numIterations];
@@ -313,14 +323,15 @@ public class PortForwardingLoadTest extends BaseTestSupport {
ok += (lenOK[i] == null) ? 1 : 0;
}
log.info("Successful iteration: " + ok + " out of " + numIterations);
- Thread.sleep(55L);
+ Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
for (int i = 0; i < numIterations; i++) {
assertNull("Bad length at iteration " + i, lenOK[i]);
assertNull("Bad data at iteration " + i, dataOK[i]);
}
+ Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
session.delPortForwardingR(forwardedPort);
ss.close();
- tWriter.join(TimeUnit.SECONDS.toMillis(5L));
+ tWriter.join(TimeUnit.SECONDS.toMillis(11L));
} finally {
session.disconnect();
}
@@ -370,7 +381,7 @@ public class PortForwardingLoadTest extends BaseTestSupport {
final CountDownLatch latch = new CountDownLatch(nbThread * nbDownloads * nbLoops);
final Thread[] threads = new Thread[nbThread];
- final List<Throwable> errors = new CopyOnWriteArrayList<Throwable>();
+ final List<Throwable> errors = new CopyOnWriteArrayList<>();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(getCurrentTestName() + "[" + i + "]") {
@Override