You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2015/05/17 10:41:03 UTC
mina-sshd git commit: [SSHD-461] Provide 'verify' timeout overloads
for AuthFuture, OpenFuture and IoReadFuture
Repository: mina-sshd
Updated Branches:
refs/heads/master 84c23ddab -> dafaa6228
[SSHD-461] Provide 'verify' timeout overloads for AuthFuture, OpenFuture and IoReadFuture
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/dafaa622
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/dafaa622
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/dafaa622
Branch: refs/heads/master
Commit: dafaa6228f9e4c3574c1ac241727478147c609e9
Parents: 84c23dd
Author: Lyor Goldstein <lg...@vmware.com>
Authored: Sun May 17 11:40:49 2015 +0300
Committer: Lyor Goldstein <lg...@vmware.com>
Committed: Sun May 17 11:40:49 2015 +0300
----------------------------------------------------------------------
.../apache/sshd/client/channel/ChannelExec.java | 1 +
.../apache/sshd/client/future/AuthFuture.java | 16 +-
.../sshd/client/future/DefaultAuthFuture.java | 31 +-
.../sshd/client/future/DefaultOpenFuture.java | 26 +-
.../apache/sshd/client/future/OpenFuture.java | 19 +-
.../common/channel/ChannelAsyncInputStream.java | 30 +-
.../sshd/common/future/DefaultSshFuture.java | 35 +-
.../org/apache/sshd/common/io/IoReadFuture.java | 18 +-
.../test/java/org/apache/sshd/ClientTest.java | 917 +++++++++++--------
.../test/java/org/apache/sshd/ServerTest.java | 10 +-
10 files changed, 647 insertions(+), 456 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/dafaa622/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelExec.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelExec.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelExec.java
index 0d4dcfe..125cdbb 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelExec.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelExec.java
@@ -40,6 +40,7 @@ public class ChannelExec extends PtyCapableChannelSession {
this.command = command;
}
+ @Override
protected void doOpen() throws IOException {
doOpenPty();
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/dafaa622/sshd-core/src/main/java/org/apache/sshd/client/future/AuthFuture.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/future/AuthFuture.java b/sshd-core/src/main/java/org/apache/sshd/client/future/AuthFuture.java
index 081474a..99adeab 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/future/AuthFuture.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/future/AuthFuture.java
@@ -18,6 +18,8 @@
*/
package org.apache.sshd.client.future;
+import java.util.concurrent.TimeUnit;
+
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.future.SshFuture;
@@ -27,15 +29,25 @@ import org.apache.sshd.common.future.SshFuture;
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
public interface AuthFuture extends SshFuture<AuthFuture> {
-
/**
* Wait and verify that the authentication succeeded.
- *
* @throws SshException if the authentication failed for any reason
*/
void verify() throws SshException;
/**
+ * Wait and verify that the authentication succeeded within the specified timeout.
+ * @throws SshException if the authentication failed for any reason
+ */
+ void verify(long timeout, TimeUnit unit) throws SshException;
+
+ /**
+ * Wait and verify that the authentication succeeded within the specified timeout.
+ * @throws SshException if the authentication failed for any reason
+ */
+ void verify(long timeoutMillis) throws SshException;
+
+ /**
* Returns the cause of the connection failure.
*
* @return <tt>null</tt> if the connect operation is not finished yet,
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/dafaa622/sshd-core/src/main/java/org/apache/sshd/client/future/DefaultAuthFuture.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/future/DefaultAuthFuture.java b/sshd-core/src/main/java/org/apache/sshd/client/future/DefaultAuthFuture.java
index 176eadc..d79cbb1 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/future/DefaultAuthFuture.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/future/DefaultAuthFuture.java
@@ -18,6 +18,8 @@
*/
package org.apache.sshd.client.future;
+import java.util.concurrent.TimeUnit;
+
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.future.DefaultSshFuture;
@@ -28,23 +30,36 @@ import org.apache.sshd.common.future.DefaultSshFuture;
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
public class DefaultAuthFuture extends DefaultSshFuture<AuthFuture> implements AuthFuture {
-
public DefaultAuthFuture( Object lock) {
super(lock);
}
+ @Override // TODO for JDK-8 make this a default method
public void verify() throws SshException {
+ verify(Long.MAX_VALUE);
+ }
+
+ @Override // TODO for JDK-8 make this a default method
+ public void verify(long timeout, TimeUnit unit) throws SshException {
+ verify(unit.toMillis(timeout));
+ }
+
+ @Override
+ public void verify(long timeoutMillis) throws SshException {
try {
- await();
- }
- catch (InterruptedException e) {
+ if (!await(timeoutMillis)) {
+ throw new SshException("Authentication timeout afer " + timeoutMillis);
+ }
+ } catch (InterruptedException e) {
throw new SshException("Authentication interrupted", e);
}
+
if (!isSuccess()) {
throw new SshException("Authentication failed", getException());
}
}
+ @Override
public Throwable getException() {
Object v = getValue();
if (v instanceof Throwable) {
@@ -54,20 +69,24 @@ public class DefaultAuthFuture extends DefaultSshFuture<AuthFuture> implements A
}
}
+ @Override
public boolean isSuccess() {
Object v = getValue();
- return v instanceof Boolean && (Boolean) v;
+ return (v instanceof Boolean) && ((Boolean) v).booleanValue();
}
+ @Override
public boolean isFailure() {
Object v = getValue();
- return v instanceof Boolean && !((Boolean) v);
+ return (v instanceof Boolean) && (!((Boolean) v).booleanValue());
}
+ @Override
public void setAuthed(boolean authed) {
setValue(Boolean.valueOf(authed));
}
+ @Override
public void setException(Throwable exception) {
if (exception == null) {
throw new NullPointerException("exception");
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/dafaa622/sshd-core/src/main/java/org/apache/sshd/client/future/DefaultOpenFuture.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/future/DefaultOpenFuture.java b/sshd-core/src/main/java/org/apache/sshd/client/future/DefaultOpenFuture.java
index ca6cb07..9baab23 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/future/DefaultOpenFuture.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/future/DefaultOpenFuture.java
@@ -18,6 +18,8 @@
*/
package org.apache.sshd.client.future;
+import java.util.concurrent.TimeUnit;
+
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.future.DefaultSshFuture;
@@ -27,23 +29,36 @@ import org.apache.sshd.common.future.DefaultSshFuture;
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
public class DefaultOpenFuture extends DefaultSshFuture<OpenFuture> implements OpenFuture {
-
public DefaultOpenFuture(Object lock) {
super(lock);
}
+ @Override // TODO for JDK-8 make this a default method
public void verify() throws SshException {
+ verify(Long.MAX_VALUE);
+ }
+
+ @Override // TODO for JDK-8 make this a default method
+ public void verify(long timeout, TimeUnit unit) throws SshException {
+ verify(unit.toMillis(timeout));
+ }
+
+ @Override
+ public void verify(long timeoutMillis) throws SshException {
try {
- await();
- }
- catch (InterruptedException e) {
+ if (!await(timeoutMillis)) {
+ throw new SshException("Channel opening time out after " + timeoutMillis);
+ }
+ } catch (InterruptedException e) {
throw new SshException("Channel opening interrupted", e);
}
+
if (!isOpened()) {
throw new SshException("Channel opening failed", getException());
}
}
+ @Override
public Throwable getException() {
Object v = getValue();
if (v instanceof Throwable) {
@@ -53,14 +68,17 @@ public class DefaultOpenFuture extends DefaultSshFuture<OpenFuture> implements O
}
}
+ @Override
public boolean isOpened() {
return getValue() instanceof Boolean;
}
+ @Override
public void setOpened() {
setValue(Boolean.TRUE);
}
+ @Override
public void setException(Throwable exception) {
if (exception == null) {
throw new NullPointerException("exception");
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/dafaa622/sshd-core/src/main/java/org/apache/sshd/client/future/OpenFuture.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/future/OpenFuture.java b/sshd-core/src/main/java/org/apache/sshd/client/future/OpenFuture.java
index 3b833dc..fcb3ae9 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/future/OpenFuture.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/future/OpenFuture.java
@@ -18,6 +18,8 @@
*/
package org.apache.sshd.client.future;
+import java.util.concurrent.TimeUnit;
+
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.future.SshFuture;
@@ -29,13 +31,24 @@ import org.apache.sshd.common.future.SshFuture;
public interface OpenFuture extends SshFuture<OpenFuture> {
/**
- * Wait and verify that the channel has been successfuly opened.
- *
- * @throws org.apache.sshd.common.SshException if the authentication failed for any reason
+ * Wait and verify that the channel has been successfully opened.
+ * @throws SshException if the action failed for any reason
*/
void verify() throws SshException;
/**
+ * Wait and verify that the channel has been successfully opened within the specified timeout.
+ * @throws SshException if the action failed for any reason
+ */
+ void verify(long timeout, TimeUnit unit) throws SshException;
+
+ /**
+ * Wait and verify that the authentication succeeded within the specified timeout.
+ * @throws SshException if the action failed for any reason
+ */
+ void verify(long timeoutMillis) throws SshException;
+
+ /**
* Returns the cause of the connection failure.
*
* @return <tt>null</tt> if the connect operation is not finished yet,
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/dafaa622/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
index 5a41887..d21eb9c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
@@ -19,6 +19,7 @@
package org.apache.sshd.common.channel;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
import org.apache.sshd.common.Channel;
import org.apache.sshd.common.RuntimeSshException;
@@ -85,6 +86,7 @@ public class ChannelAsyncInputStream extends CloseableUtils.AbstractCloseable im
}
}
+ @SuppressWarnings("synthetic-access")
private void doRead(boolean resume) {
IoReadFutureImpl future = null;
int nbRead = 0;
@@ -111,7 +113,7 @@ public class ChannelAsyncInputStream extends CloseableUtils.AbstractCloseable im
} catch (IOException e) {
channel.getSession().exceptionCaught(e);
}
- future.setValue(nbRead);
+ future.setValue(Integer.valueOf(nbRead));
}
}
@@ -121,8 +123,7 @@ public class ChannelAsyncInputStream extends CloseableUtils.AbstractCloseable im
}
public static class IoReadFutureImpl extends DefaultSshFuture<IoReadFuture> implements IoReadFuture {
-
- final Buffer buffer;
+ private final Buffer buffer;
public IoReadFutureImpl(Buffer buffer) {
super(null);
@@ -134,12 +135,23 @@ public class ChannelAsyncInputStream extends CloseableUtils.AbstractCloseable im
return buffer;
}
- @Override
+ @Override // TODO for JDK-8 make this a default method
public void verify() throws SshException {
+ verify(Long.MAX_VALUE);
+ }
+
+ @Override // TODO for JDK-8 make this a default method
+ public void verify(long timeout, TimeUnit unit) throws SshException {
+ verify(unit.toMillis(timeout));
+ }
+
+ @Override
+ public void verify(long timeoutMillis) throws SshException {
try {
- await();
- }
- catch (InterruptedException e) {
+ if (!await(timeoutMillis)) {
+ throw new SshException("Timed out after " + timeoutMillis);
+ }
+ } catch (InterruptedException e) {
throw new SshException("Interrupted", e);
}
if (getValue() instanceof Throwable) {
@@ -156,9 +168,9 @@ public class ChannelAsyncInputStream extends CloseableUtils.AbstractCloseable im
} else if (v instanceof Throwable) {
throw (RuntimeSshException) new RuntimeSshException("Error reading from channel.").initCause((Throwable) v);
} else if (v instanceof Integer) {
- return (Integer) v;
+ return ((Integer) v).intValue();
} else {
- throw new IllegalStateException();
+ throw new IllegalStateException("Unknown read value type: " + ((v == null) ? "null" : v.getClass().getName()));
}
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/dafaa622/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java b/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
index 634eb58..2e9b8af 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
@@ -55,11 +55,10 @@ public class DefaultSshFuture<T extends SshFuture> implements SshFuture<T> {
*/
@Override
public T await() throws InterruptedException {
- synchronized (lock) {
- while (result == null) {
- lock.wait();
- }
+ if (await0(Long.MAX_VALUE, true) == null) {
+ throw new InternalError("No result while await completion");
}
+
return asT();
}
@@ -76,7 +75,7 @@ public class DefaultSshFuture<T extends SshFuture> implements SshFuture<T> {
*/
@Override
public boolean await(long timeoutMillis) throws InterruptedException {
- return await0(timeoutMillis, true);
+ return await0(timeoutMillis, true) != null;
}
/**
@@ -107,32 +106,29 @@ public class DefaultSshFuture<T extends SshFuture> implements SshFuture<T> {
@Override
public boolean awaitUninterruptibly(long timeoutMillis) {
try {
- return await0(timeoutMillis, false);
+ return await0(timeoutMillis, false) != null;
} catch (InterruptedException e) {
- throw new InternalError();
+ throw new InternalError("Unexpected interrupted exception wile awaitUninterruptibly " + timeoutMillis + " msec.: " + e.getMessage(), e);
}
}
/**
* Wait for the Future to be ready. If the requested delay is 0 or
- * negative, this method immediately returns the value of the
- * 'ready' flag.
- * Every 5 second, the wait will be suspended to be able to check if
- * there is a deadlock or not.
- *
+ * negative, this method immediately returns.
* @param timeoutMillis The delay we will wait for the Future to be ready
* @param interruptable Tells if the wait can be interrupted or not
- * @return <code>true</code> if the Future is ready
+ * @return The non-{@code null} result object if the Future is ready,
+ * {@code null} if the timeout expired and no result was received
* @throws InterruptedException If the thread has been interrupted
* when it's not allowed.
*/
- private boolean await0(long timeoutMillis, boolean interruptable) throws InterruptedException {
+ protected Object await0(long timeoutMillis, boolean interruptable) throws InterruptedException {
long curTime = System.currentTimeMillis();
- long endTime = Long.MAX_VALUE - timeoutMillis < curTime ? Long.MAX_VALUE : curTime + timeoutMillis;
+ long endTime = ((Long.MAX_VALUE - timeoutMillis) < curTime) ? Long.MAX_VALUE : (curTime + timeoutMillis);
synchronized (lock) {
- if (result != null || timeoutMillis <= 0) {
- return result != null;
+ if ((result != null) || (timeoutMillis <= 0)) {
+ return result;
}
for (;;) {
@@ -145,14 +141,13 @@ public class DefaultSshFuture<T extends SshFuture> implements SshFuture<T> {
}
curTime = System.currentTimeMillis();
- if (result != null || curTime >= endTime) {
- return result != null;
+ if ((result != null) || (curTime >= endTime)) {
+ return result;
}
}
}
}
-
/**
* {@inheritDoc}
*/
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/dafaa622/sshd-core/src/main/java/org/apache/sshd/common/io/IoReadFuture.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoReadFuture.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoReadFuture.java
index ebf9c20..3865ed4 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/IoReadFuture.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoReadFuture.java
@@ -18,6 +18,8 @@
*/
package org.apache.sshd.common.io;
+import java.util.concurrent.TimeUnit;
+
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.future.SshFuture;
import org.apache.sshd.common.util.buffer.Buffer;
@@ -26,11 +28,23 @@ public interface IoReadFuture extends SshFuture<IoReadFuture> {
/**
* Wait and verify that the read succeeded.
- *
- * @throws org.apache.sshd.common.SshException if the write failed for any reason
+ * @throws SshException if the action failed for any reason
*/
void verify() throws SshException;
+
+ /**
+ * Wait and verify that the read succeeded within the specified timeout.
+ * @throws SshException if the action failed for any reason
+ */
+ void verify(long timeout, TimeUnit unit) throws SshException;
+
+ /**
+ * Wait and verify that the authentication succeeded within the specified timeout.
+ * @throws SshException if the action failed for any reason
+ */
+ void verify(long timeoutMillis) throws SshException;
+
Buffer getBuffer();
int getRead();
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/dafaa622/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/ClientTest.java b/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
index 4521016..5c75dc5 100644
--- a/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
@@ -35,6 +36,7 @@ import java.security.KeyPair;
import java.security.PublicKey;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -51,6 +53,7 @@ import org.apache.sshd.client.future.AuthFuture;
import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.common.Channel;
import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.FactoryManagerUtils;
import org.apache.sshd.common.KeyPairProvider;
import org.apache.sshd.common.NamedFactory;
import org.apache.sshd.common.RuntimeSshException;
@@ -69,8 +72,8 @@ import org.apache.sshd.common.io.mina.MinaSession;
import org.apache.sshd.common.io.nio2.Nio2Session;
import org.apache.sshd.common.session.AbstractSession;
import org.apache.sshd.common.session.ConnectionService;
+import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.buffer.Buffer;
-import org.apache.sshd.common.util.buffer.BufferUtils;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
import org.apache.sshd.common.util.io.NoCloseOutputStream;
import org.apache.sshd.server.Command;
@@ -127,6 +130,7 @@ public class ClientTest extends BaseTest {
@Override
public Service create(Session session) throws IOException {
return new ServerUserAuthService(session) {
+ @SuppressWarnings("synthetic-access")
@Override
public void process(byte cmd, Buffer buffer) throws Exception {
authLatch.await();
@@ -142,6 +146,7 @@ public class ClientTest extends BaseTest {
@Override
public Channel create() {
return new ChannelSession() {
+ @SuppressWarnings("synthetic-access")
@Override
public OpenFuture open(int recipient, int rwsize, int rmpsize, Buffer buffer) {
try {
@@ -183,248 +188,277 @@ public class ClientTest extends BaseTest {
client.getProperties().put(FactoryManager.WINDOW_SIZE, "1024");
client.start();
- ClientSession session = client.connect("smx", "localhost", port).await().getSession();
- session.addPasswordIdentity("smx");
- session.auth().verify();
- final ChannelShell channel = session.createShellChannel();
- channel.setStreaming(ClientChannel.Streaming.Async);
- channel.open().verify();
-
- final byte[] message = "0123456789\n".getBytes();
- final int nbMessages = 1000;
-
- final ByteArrayOutputStream baosOut = new ByteArrayOutputStream();
- final ByteArrayOutputStream baosErr = new ByteArrayOutputStream();
- final AtomicInteger writes = new AtomicInteger(nbMessages);
-
- channel.getAsyncIn().write(new ByteArrayBuffer(message))
- .addListener(new SshFutureListener<IoWriteFuture>() {
- @Override
- public void operationComplete(IoWriteFuture future) {
- try {
- if (future.isWritten()) {
- if (writes.decrementAndGet() > 0) {
- channel.getAsyncIn().write(new ByteArrayBuffer(message)).addListener(this);
- } else {
- channel.getAsyncIn().close(false);
+ try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+ session.addPasswordIdentity("smx");
+ session.auth().verify(5L, TimeUnit.SECONDS);
+
+ try(final ChannelShell channel = session.createShellChannel()) {
+ channel.setStreaming(ClientChannel.Streaming.Async);
+ channel.open().verify(5L, TimeUnit.SECONDS);
+
+ final byte[] message = "0123456789\n".getBytes();
+ final int nbMessages = 1000;
+
+ try(final ByteArrayOutputStream baosOut = new ByteArrayOutputStream();
+ final ByteArrayOutputStream baosErr = new ByteArrayOutputStream()) {
+ final AtomicInteger writes = new AtomicInteger(nbMessages);
+
+ channel.getAsyncIn().write(new ByteArrayBuffer(message))
+ .addListener(new SshFutureListener<IoWriteFuture>() {
+ @Override
+ public void operationComplete(IoWriteFuture future) {
+ try {
+ if (future.isWritten()) {
+ if (writes.decrementAndGet() > 0) {
+ channel.getAsyncIn().write(new ByteArrayBuffer(message)).addListener(this);
+ } else {
+ channel.getAsyncIn().close(false);
+ }
+ } else {
+ throw new SshException("Error writing", future.getException());
+ }
+ } catch (IOException e) {
+ if (!channel.isClosing()) {
+ e.printStackTrace();
+ channel.close(true);
+ }
+ }
}
- } else {
- throw new SshException("Error writing", future.getException());
- }
- } catch (IOException e) {
- if (!channel.isClosing()) {
- e.printStackTrace();
- channel.close(true);
- }
- }
- }
- });
- channel.getAsyncOut().read(new ByteArrayBuffer())
- .addListener(new SshFutureListener<IoReadFuture>() {
- @Override
- public void operationComplete(IoReadFuture future) {
- try {
- future.verify();
- Buffer buffer = future.getBuffer();
- baosOut.write(buffer.array(), buffer.rpos(), buffer.available());
- buffer.rpos(buffer.rpos() + buffer.available());
- buffer.compact();
- channel.getAsyncOut().read(buffer).addListener(this);
- } catch (IOException e) {
- if (!channel.isClosing()) {
- e.printStackTrace();
- channel.close(true);
- }
- }
- }
- });
- channel.getAsyncErr().read(new ByteArrayBuffer())
- .addListener(new SshFutureListener<IoReadFuture>() {
- @Override
- public void operationComplete(IoReadFuture future) {
- try {
- future.verify();
- Buffer buffer = future.getBuffer();
- baosErr.write(buffer.array(), buffer.rpos(), buffer.available());
- buffer.rpos(buffer.rpos() + buffer.available());
- buffer.compact();
- channel.getAsyncErr().read(buffer).addListener(this);
- } catch (IOException e) {
- if (!channel.isClosing()) {
- e.printStackTrace();
- channel.close(true);
- }
- }
- }
- });
-
- channel.waitFor(ClientChannel.CLOSED, 0);
-
- assertEquals(nbMessages * message.length, baosOut.size());
+ });
+ channel.getAsyncOut().read(new ByteArrayBuffer())
+ .addListener(new SshFutureListener<IoReadFuture>() {
+ @Override
+ public void operationComplete(IoReadFuture future) {
+ try {
+ future.verify(5L, TimeUnit.SECONDS);
+ Buffer buffer = future.getBuffer();
+ baosOut.write(buffer.array(), buffer.rpos(), buffer.available());
+ buffer.rpos(buffer.rpos() + buffer.available());
+ buffer.compact();
+ channel.getAsyncOut().read(buffer).addListener(this);
+ } catch (IOException e) {
+ if (!channel.isClosing()) {
+ e.printStackTrace();
+ channel.close(true);
+ }
+ }
+ }
+ });
+ channel.getAsyncErr().read(new ByteArrayBuffer())
+ .addListener(new SshFutureListener<IoReadFuture>() {
+ @Override
+ public void operationComplete(IoReadFuture future) {
+ try {
+ future.verify(5L, TimeUnit.SECONDS);
+ Buffer buffer = future.getBuffer();
+ baosErr.write(buffer.array(), buffer.rpos(), buffer.available());
+ buffer.rpos(buffer.rpos() + buffer.available());
+ buffer.compact();
+ channel.getAsyncErr().read(buffer).addListener(this);
+ } catch (IOException e) {
+ if (!channel.isClosing()) {
+ e.printStackTrace();
+ channel.close(true);
+ }
+ }
+ }
+ });
+
+ channel.waitFor(ClientChannel.CLOSED, 0);
+
+ assertEquals(nbMessages * message.length, baosOut.size());
+ }
+ }
- client.close(true);
+ client.close(true);
+ }
}
@Test
public void testCommandDeadlock() throws Exception {
client.start();
- ClientSession session = client.connect("smx", "localhost", port).await().getSession();
- session.addPasswordIdentity("smx");
- session.auth().verify();
- ChannelExec channel = session.createExecChannel("test");
- channel.setOut(new NoCloseOutputStream(System.out));
- channel.setErr(new NoCloseOutputStream(System.err));
- channel.open().await();
- Thread.sleep(100);
- try {
- for (int i = 0; i < 100; i++) {
- channel.getInvertedIn().write("a".getBytes());
- channel.getInvertedIn().flush();
+ try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+ session.addPasswordIdentity("smx");
+ session.auth().verify(5L, TimeUnit.SECONDS);
+
+ try(ChannelExec channel = session.createExecChannel("test");
+ OutputStream stdout = new NoCloseOutputStream(System.out);
+ OutputStream stderr = new NoCloseOutputStream(System.err)) {
+
+ channel.setOut(stdout);
+ channel.setErr(stderr);
+ channel.open().await();
+ Thread.sleep(100);
+ try {
+ for (int i = 0; i < 100; i++) {
+ channel.getInvertedIn().write("a".getBytes());
+ channel.getInvertedIn().flush();
+ }
+ } catch (SshException e) {
+ // That's ok, the channel is being closed by the other side
+ }
+ assertEquals(ClientChannel.CLOSED, channel.waitFor(ClientChannel.CLOSED, 0) & ClientChannel.CLOSED);
+ session.close(false).await();
}
- } catch (SshException e) {
- // That's ok, the channel is being closed by the other side
+ } finally {
+ client.stop();
}
- assertEquals(ClientChannel.CLOSED, channel.waitFor(ClientChannel.CLOSED, 0) & ClientChannel.CLOSED);
- session.close(false).await();
- client.stop();
}
@Test
public void testClient() throws Exception {
client.start();
- ClientSession session = client.connect("smx", "localhost", port).await().getSession();
- session.addPasswordIdentity("smx");
- session.auth().verify();
- ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
-
- ByteArrayOutputStream sent = new ByteArrayOutputStream();
- PipedOutputStream pipedIn = new PipedOutputStream();
- channel.setIn(new PipedInputStream(pipedIn));
- OutputStream teeOut = new TeeOutputStream(sent, pipedIn);
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ByteArrayOutputStream err = new ByteArrayOutputStream();
- channel.setOut(out);
- channel.setErr(err);
- channel.open();
-
- teeOut.write("this is my command\n".getBytes());
- teeOut.flush();
-
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < 1000; i++) {
- sb.append("0123456789");
- }
- sb.append("\n");
- teeOut.write(sb.toString().getBytes());
-
- teeOut.write("exit\n".getBytes());
- teeOut.flush();
-
- channel.waitFor(ClientChannel.CLOSED, 0);
- channel.close(false);
- client.stop();
-
- assertArrayEquals(sent.toByteArray(), out.toByteArray());
+ try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+ session.addPasswordIdentity("smx");
+ session.auth().verify(5L, TimeUnit.SECONDS);
+
+ try(ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
+ ByteArrayOutputStream sent = new ByteArrayOutputStream();
+ PipedOutputStream pipedIn = new PipedOutputStream();
+ PipedInputStream pipedOut = new PipedInputStream(pipedIn)) {
+
+ channel.setIn(pipedOut);
+
+ try(OutputStream teeOut = new TeeOutputStream(sent, pipedIn);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+
+ channel.setOut(out);
+ channel.setErr(err);
+ channel.open();
+
+ teeOut.write("this is my command\n".getBytes());
+ teeOut.flush();
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 1000; i++) {
+ sb.append("0123456789");
+ }
+ sb.append("\n");
+ teeOut.write(sb.toString().getBytes());
+
+ teeOut.write("exit\n".getBytes());
+ teeOut.flush();
+
+ channel.waitFor(ClientChannel.CLOSED, 0);
+
+ channel.close(false);
+ client.stop();
+
+ assertArrayEquals(sent.toByteArray(), out.toByteArray());
+ }
+ }
+ }
}
@Test
public void testClientInverted() throws Exception {
client.start();
- ClientSession session = client.connect("smx", "localhost", port).await().getSession();
- session.addPasswordIdentity("smx");
- session.auth().verify();
- ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
-
- ByteArrayOutputStream sent = new ByteArrayOutputStream();
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ByteArrayOutputStream err = new ByteArrayOutputStream();
- channel.setOut(out);
- channel.setErr(err);
- channel.open().await();
-
- OutputStream pipedIn = new TeeOutputStream(sent, channel.getInvertedIn());
-
- pipedIn.write("this is my command\n".getBytes());
- pipedIn.flush();
-
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < 1000; i++) {
- sb.append("0123456789");
+
+ try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+ session.addPasswordIdentity("smx");
+ session.auth().verify(5L, TimeUnit.SECONDS);
+
+ try(ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
+ ByteArrayOutputStream sent = new ByteArrayOutputStream();
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+
+ channel.setOut(out);
+ channel.setErr(err);
+ channel.open().await();
+
+ try(OutputStream pipedIn = new TeeOutputStream(sent, channel.getInvertedIn())) {
+ pipedIn.write("this is my command\n".getBytes());
+ pipedIn.flush();
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 1000; i++) {
+ sb.append("0123456789");
+ }
+ sb.append("\n");
+ pipedIn.write(sb.toString().getBytes());
+
+ pipedIn.write("exit\n".getBytes());
+ pipedIn.flush();
+ }
+
+ channel.waitFor(ClientChannel.CLOSED, 0);
+
+ channel.close(false);
+ client.stop();
+
+ assertArrayEquals(sent.toByteArray(), out.toByteArray());
+ }
}
- sb.append("\n");
- pipedIn.write(sb.toString().getBytes());
-
- pipedIn.write("exit\n".getBytes());
- pipedIn.flush();
-
- channel.waitFor(ClientChannel.CLOSED, 0);
-
- channel.close(false);
- client.stop();
-
- assertArrayEquals(sent.toByteArray(), out.toByteArray());
}
@Test
public void testClientWithCustomChannel() throws Exception {
client.start();
- ClientSession session = client.connect("smx", "localhost", port).await().getSession();
- session.addPasswordIdentity("smx");
- session.auth().verify();
-
- ChannelShell channel = new ChannelShell();
- session.getService(ConnectionService.class).registerChannel(channel);
-
- ByteArrayOutputStream sent = new ByteArrayOutputStream();
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ByteArrayOutputStream err = new ByteArrayOutputStream();
- channel.setOut(out);
- channel.setErr(err);
- channel.open().verify();
-
- channel.close(false).await();
- client.stop();
+
+ try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+ session.addPasswordIdentity("smx");
+ session.auth().verify(5L, TimeUnit.SECONDS);
+
+ try(ChannelShell channel = new ChannelShell();
+ ByteArrayOutputStream sent = new ByteArrayOutputStream();
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+
+ session.getService(ConnectionService.class).registerChannel(channel);
+ channel.setOut(out);
+ channel.setErr(err);
+ channel.open().verify(5L, TimeUnit.SECONDS);
+ channel.close(false).await();
+ }
+ } finally {
+ client.stop();
+ }
}
@Test
public void testClientClosingStream() throws Exception {
client.start();
- ClientSession session = client.connect("smx", "localhost", port).await().getSession();
- session.addPasswordIdentity("smx");
- session.auth().verify();
- ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
-
-
- ByteArrayOutputStream sent = new ByteArrayOutputStream();
- PipedOutputStream pipedIn = new PipedOutputStream();
- OutputStream teeOut = new TeeOutputStream(sent, pipedIn);
- channel.setIn(new PipedInputStream(pipedIn));
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ByteArrayOutputStream err = new ByteArrayOutputStream();
- channel.setOut(out);
- channel.setErr(err);
- channel.open();
-
- teeOut.write("this is my command\n".getBytes());
- teeOut.flush();
-
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < 1000; i++) {
- sb.append("0123456789");
+
+ try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+ session.addPasswordIdentity("smx");
+ session.auth().verify(5L, TimeUnit.SECONDS);
+
+ try(ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
+ ByteArrayOutputStream sent = new ByteArrayOutputStream();
+ PipedOutputStream pipedIn = new PipedOutputStream();
+ InputStream inPipe = new PipedInputStream(pipedIn);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+
+ channel.setIn(inPipe);
+ channel.setOut(out);
+ channel.setErr(err);
+ channel.open();
+
+ try(OutputStream teeOut = new TeeOutputStream(sent, pipedIn)) {
+ teeOut.write("this is my command\n".getBytes());
+ teeOut.flush();
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 1000; i++) {
+ sb.append("0123456789");
+ }
+ sb.append("\n");
+ teeOut.write(sb.toString().getBytes());
+ }
+
+ channel.waitFor(ClientChannel.CLOSED, 0);
+
+ channel.close(false);
+ client.stop();
+
+ assertArrayEquals(sent.toByteArray(), out.toByteArray());
+ }
}
- sb.append("\n");
- teeOut.write(sb.toString().getBytes());
-
- teeOut.close();
-
- channel.waitFor(ClientChannel.CLOSED, 0);
-
- channel.close(false);
- client.stop();
-
- assertArrayEquals(sent.toByteArray(), out.toByteArray());
}
@Test
@@ -435,138 +469,175 @@ public class ClientTest extends BaseTest {
// sshd.getProperties().put(SshServer.WINDOW_SIZE, Integer.toString(0x20000));
// sshd.getProperties().put(SshServer.MAX_PACKET_SIZE, Integer.toString(0x1000));
client.start();
- ClientSession session = client.connect("smx", "localhost", port).await().getSession();
- session.addPasswordIdentity("smx");
- session.auth().verify();
- ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
- ByteArrayOutputStream sent = new ByteArrayOutputStream();
- PipedOutputStream pipedIn = new PipedOutputStream();
- OutputStream teeOut = new TeeOutputStream(sent, pipedIn);
- channel.setIn(new PipedInputStream(pipedIn));
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ByteArrayOutputStream err = new ByteArrayOutputStream();
- channel.setOut(out);
- channel.setErr(err);
- channel.open().await();
-
- long t0 = System.currentTimeMillis();
-
- int bytes = 0;
- for (int i = 0; i < 10000; i++) {
- byte[] data = "01234567890123456789012345678901234567890123456789\n".getBytes();
- teeOut.write(data);
- teeOut.flush();
- bytes += data.length;
- if ((bytes & 0xFFF00000) != ((bytes - data.length) & 0xFFF00000)) {
- System.out.println("Bytes written: " + bytes);
+
+ try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+ session.addPasswordIdentity("smx");
+ session.auth().verify(5L, TimeUnit.SECONDS);
+
+ try(ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
+ ByteArrayOutputStream sent = new ByteArrayOutputStream();
+ PipedOutputStream pipedIn = new PipedOutputStream();
+ InputStream inPipe = new PipedInputStream(pipedIn);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+
+ channel.setIn(inPipe);
+ channel.setOut(out);
+ channel.setErr(err);
+ channel.open().await();
+
+
+ int bytes = 0;
+ byte[] data = "01234567890123456789012345678901234567890123456789\n".getBytes();
+ long t0 = System.currentTimeMillis();
+ try(OutputStream teeOut = new TeeOutputStream(sent, pipedIn)) {
+ for (int i = 0; i < 10000; i++) {
+ teeOut.write(data);
+ teeOut.flush();
+ bytes += data.length;
+ if ((bytes & 0xFFF00000) != ((bytes - data.length) & 0xFFF00000)) {
+ System.out.println("Bytes written: " + bytes);
+ }
+ }
+ teeOut.write("exit\n".getBytes());
+ teeOut.flush();
+ }
+ long t1 = System.currentTimeMillis();
+
+ System.out.println("Sent " + (bytes / 1024) + " Kb in " + (t1 - t0) + " ms");
+
+ System.out.println("Waiting for channel to be closed");
+
+ channel.waitFor(ClientChannel.CLOSED, 0);
+
+ channel.close(false);
+ client.stop();
+
+ assertArrayEquals(sent.toByteArray(), out.toByteArray());
+ //assertArrayEquals(sent.toByteArray(), out.toByteArray());
}
}
- teeOut.write("exit\n".getBytes());
- teeOut.flush();
-
- long t1 = System.currentTimeMillis();
-
- System.out.println("Sent " + (bytes / 1024) + " Kb in " + (t1 - t0) + " ms");
-
- System.out.println("Waiting for channel to be closed");
-
- channel.waitFor(ClientChannel.CLOSED, 0);
-
- channel.close(false);
- client.stop();
-
- assertTrue(BufferUtils.equals(sent.toByteArray(), out.toByteArray()));
- //assertArrayEquals(sent.toByteArray(), out.toByteArray());
}
@Test(expected = SshException.class)
public void testOpenChannelOnClosedSession() throws Exception {
client.start();
- ClientSession session = client.connect("smx", "localhost", port).await().getSession();
- session.addPasswordIdentity("smx");
- session.auth().verify();
- ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
- session.close(false);
-
- PipedOutputStream pipedIn = new PipedOutputStream();
- channel.setIn(new PipedInputStream(pipedIn));
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ByteArrayOutputStream err = new ByteArrayOutputStream();
- channel.setOut(out);
- channel.setErr(err);
- channel.open();
+
+ try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+ session.addPasswordIdentity("smx");
+ session.auth().verify(5L, TimeUnit.SECONDS);
+
+ try(ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL)) {
+ session.close(false);
+
+ try(PipedOutputStream pipedIn = new PipedOutputStream();
+ InputStream inPipe = new PipedInputStream(pipedIn);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+
+ channel.setIn(inPipe);
+ channel.setOut(out);
+ channel.setErr(err);
+ channel.open();
+ }
+ }
+ }
}
@Test
public void testCloseBeforeAuthSucceed() throws Exception {
authLatch = new CountDownLatch(1);
client.start();
- ClientSession session = client.connect("smx", "localhost", port).await().getSession();
- session.addPasswordIdentity("smx");
- AuthFuture authFuture = session.auth();
- CloseFuture closeFuture = session.close(false);
- authLatch.countDown();
- authFuture.await();
- closeFuture.await();
- assertNotNull(authFuture.getException());
- assertTrue(closeFuture.isClosed());
+
+ try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+ session.addPasswordIdentity("smx");
+
+ AuthFuture authFuture = session.auth();
+ CloseFuture closeFuture = session.close(false);
+ authLatch.countDown();
+ authFuture.await();
+ closeFuture.await();
+ assertNotNull("No authentication exception", authFuture.getException());
+ assertTrue("Future not closed", closeFuture.isClosed());
+ }
}
@Test
public void testCloseCleanBeforeChannelOpened() throws Exception {
client.start();
- ClientSession session = client.connect("smx", "localhost", port).await().getSession();
- session.addPasswordIdentity("smx");
- session.auth().verify();
- ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
- channel.setIn(new ByteArrayInputStream(new byte[0]));
- channel.setOut(new ByteArrayOutputStream());
- channel.setErr(new ByteArrayOutputStream());
- OpenFuture openFuture = channel.open();
- CloseFuture closeFuture = session.close(false);
- openFuture.await();
- closeFuture.await();
- assertTrue(openFuture.isOpened());
- assertTrue(closeFuture.isClosed());
+
+ try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+ session.addPasswordIdentity("smx");
+ session.auth().verify(5L, TimeUnit.SECONDS);
+
+ try(ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
+ InputStream inp = new ByteArrayInputStream(GenericUtils.EMPTY_BYTE_ARRAY);
+ OutputStream out = new ByteArrayOutputStream();
+ OutputStream err = new ByteArrayOutputStream()) {
+
+ channel.setIn(inp);
+ channel.setOut(out);
+ channel.setErr(err);
+
+ OpenFuture openFuture = channel.open();
+ CloseFuture closeFuture = session.close(false);
+ openFuture.await();
+ closeFuture.await();
+ assertTrue("Not open", openFuture.isOpened());
+ assertTrue("Not closed", closeFuture.isClosed());
+ }
+ }
}
@Test
public void testCloseImmediateBeforeChannelOpened() throws Exception {
channelLatch = new CountDownLatch(1);
client.start();
- ClientSession session = client.connect("smx", "localhost", port).await().getSession();
- session.addPasswordIdentity("smx");
- session.auth().verify();
- ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
- channel.setIn(new ByteArrayInputStream(new byte[0]));
- channel.setOut(new ByteArrayOutputStream());
- channel.setErr(new ByteArrayOutputStream());
- OpenFuture openFuture = channel.open();
- CloseFuture closeFuture = session.close(true);
- channelLatch.countDown();
- openFuture.await();
- closeFuture.await();
- assertNotNull(openFuture.getException());
- assertTrue(closeFuture.isClosed());
+
+ try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+ session.addPasswordIdentity("smx");
+ session.auth().verify(5L, TimeUnit.SECONDS);
+
+ try(ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
+ InputStream inp = new ByteArrayInputStream(GenericUtils.EMPTY_BYTE_ARRAY);
+ OutputStream out = new ByteArrayOutputStream();
+ OutputStream err = new ByteArrayOutputStream()) {
+
+ channel.setIn(inp);
+ channel.setOut(out);
+ channel.setErr(err);
+
+ OpenFuture openFuture = channel.open();
+ CloseFuture closeFuture = session.close(true);
+ channelLatch.countDown();
+ openFuture.await();
+ closeFuture.await();
+ assertNotNull("No open exception", openFuture.getException());
+ assertTrue("Not closed", closeFuture.isClosed());
+ }
+ }
}
@Test
public void testPublicKeyAuth() throws Exception {
client.start();
- ClientSession session = client.connect("smx", "localhost", port).await().getSession();
-
- KeyPair pair = Utils.createTestHostKeyProvider().loadKey(KeyPairProvider.SSH_RSA);
- session.addPublicKeyIdentity(pair);
- session.auth().verify();
+
+ try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+ KeyPair pair = Utils.createTestHostKeyProvider().loadKey(KeyPairProvider.SSH_RSA);
+ session.addPublicKeyIdentity(pair);
+ session.auth().verify(5L, TimeUnit.SECONDS);
+ }
}
@Test
public void testPublicKeyAuthNew() throws Exception {
client.setUserAuthFactories(Arrays.<NamedFactory<UserAuth>>asList(UserAuthPublicKey.UserAuthPublicKeyFactory.INSTANCE));
client.start();
- ClientSession session = client.connect("smx", "localhost", port).await().getSession();
- session.addPublicKeyIdentity(Utils.createTestHostKeyProvider().loadKey(KeyPairProvider.SSH_RSA));
- session.auth().verify();
+
+ try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+ session.addPublicKeyIdentity(Utils.createTestHostKeyProvider().loadKey(KeyPairProvider.SSH_RSA));
+ session.auth().verify(5L, TimeUnit.SECONDS);
+ }
}
@Test
@@ -580,58 +651,71 @@ public class ClientTest extends BaseTest {
});
client.setUserAuthFactories(Arrays.<NamedFactory<UserAuth>>asList(UserAuthPublicKey.UserAuthPublicKeyFactory.INSTANCE));
client.start();
- ClientSession session = client.connect("smx", "localhost", port).await().getSession();
- session.addPublicKeyIdentity(new SimpleGeneratorHostKeyProvider(null, "RSA").loadKey(KeyPairProvider.SSH_RSA));
- session.addPublicKeyIdentity(pair);
- session.auth().verify();
+
+ try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+ session.addPublicKeyIdentity(new SimpleGeneratorHostKeyProvider(null, "RSA").loadKey(KeyPairProvider.SSH_RSA));
+ session.addPublicKeyIdentity(pair);
+ session.auth().verify(5L, TimeUnit.SECONDS);
+ }
}
@Test
public void testPasswordAuthNew() throws Exception {
client.setUserAuthFactories(Arrays.<NamedFactory<UserAuth>>asList(new UserAuthPassword.UserAuthPasswordFactory()));
client.start();
- ClientSession session = client.connect("smx", "localhost", port).await().getSession();
- session.addPasswordIdentity("smx");
- session.auth().verify();
+
+ try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+ session.addPasswordIdentity("smx");
+ session.auth().verify(5L, TimeUnit.SECONDS);
+ }
}
@Test
public void testPasswordAuthNewWithFailureOnFirstIdentity() throws Exception {
client.setUserAuthFactories(Arrays.<NamedFactory<UserAuth>>asList(new UserAuthPassword.UserAuthPasswordFactory()));
client.start();
- ClientSession session = client.connect("smx", "localhost", port).await().getSession();
- session.addPasswordIdentity("bad");
- session.addPasswordIdentity("smx");
- session.auth().verify();
+
+ try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+ session.addPasswordIdentity("bad");
+ session.addPasswordIdentity("smx");
+ session.auth().verify(5L, TimeUnit.SECONDS);
+ }
}
@Test
public void testKeyboardInteractiveAuthNew() throws Exception {
client.setUserAuthFactories(Arrays.<NamedFactory<UserAuth>>asList(UserAuthKeyboardInteractive.UserAuthKeyboardInteractiveFactory.INSTANCE));
client.start();
- ClientSession session = client.connect("smx", "localhost", port).await().getSession();
- session.addPasswordIdentity("smx");
- session.auth().verify();
+
+ try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+ session.addPasswordIdentity("smx");
+ session.auth().verify(5L, TimeUnit.SECONDS);
+ }
}
@Test
public void testKeyboardInteractiveAuthNewWithFailureOnFirstIdentity() throws Exception {
client.setUserAuthFactories(Arrays.<NamedFactory<UserAuth>>asList(UserAuthKeyboardInteractive.UserAuthKeyboardInteractiveFactory.INSTANCE));
client.start();
- ClientSession session = client.connect("smx", "localhost", port).await().getSession();
- session.addPasswordIdentity("bad");
- session.addPasswordIdentity("smx");
- session.auth().verify();
+
+ try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+ session.addPasswordIdentity("bad");
+ session.addPasswordIdentity("smx");
+ session.auth().verify(5L, TimeUnit.SECONDS);
+ }
}
@Test
public void testKeyboardInteractiveWithFailures() throws Exception {
final AtomicInteger count = new AtomicInteger();
- client.getProperties().put(ClientFactoryManager.PASSWORD_PROMPTS, "3");
+ final int MAX_PROMPTS = 3;
+ FactoryManagerUtils.updateProperty(client, ClientFactoryManager.PASSWORD_PROMPTS, MAX_PROMPTS);
+
client.setUserAuthFactories(Arrays.<NamedFactory<UserAuth>>asList(new UserAuthKeyboardInteractive.UserAuthKeyboardInteractiveFactory()));
client.setUserInteraction(new UserInteraction() {
@Override
public void welcome(String banner) {
+ // ignored
}
@Override
public String[] interactive(String destination, String name, String instruction, String[] prompt, boolean[] echo) {
@@ -640,96 +724,111 @@ public class ClientTest extends BaseTest {
}
});
client.start();
- ClientSession session = client.connect("smx", "localhost", port).await().getSession();
- AuthFuture future = session.auth();
- future.await();
- assertTrue(future.isFailure());
- assertEquals(3, count.get());
+
+ try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+ AuthFuture future = session.auth();
+ future.await();
+ assertTrue("Unexpected authentication success", future.isFailure());
+ assertEquals("Mismatched authentication retry count", MAX_PROMPTS, count.get());
+ }
}
-
@Test
public void testKeyboardInteractiveInSessionUserInteractive() throws Exception {
final AtomicInteger count = new AtomicInteger();
- client.getProperties().put(ClientFactoryManager.PASSWORD_PROMPTS, "3");
+ final int MAX_PROMPTS = 3;
+ FactoryManagerUtils.updateProperty(client, ClientFactoryManager.PASSWORD_PROMPTS, MAX_PROMPTS);
+
client.setUserAuthFactories(Arrays
.<NamedFactory<UserAuth>> asList(UserAuthKeyboardInteractive.UserAuthKeyboardInteractiveFactory.INSTANCE));
client.start();
- ClientSession session = client.connect("smx", "localhost", port).await().getSession();
- session.setUserInteraction(new UserInteraction() {
- @Override
- public void welcome(String banner) {
- }
- @Override
- public String[] interactive(String destination, String name, String instruction,
- String[] prompt, boolean[] echo) {
- count.incrementAndGet();
- return new String[] { "smx" };
- }
- });
- AuthFuture future = session.auth();
- future.await();
- assertTrue(future.isSuccess());
- assertFalse(future.isFailure());
- assertEquals(1, count.get());
+ try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+ session.setUserInteraction(new UserInteraction() {
+ @Override
+ public void welcome(String banner) {
+ // ignored
+ }
+
+ @Override
+ public String[] interactive(String destination, String name, String instruction,
+ String[] prompt, boolean[] echo) {
+ count.incrementAndGet();
+ return new String[] { "smx" };
+ }
+ });
+ AuthFuture future = session.auth();
+ future.await();
+ assertTrue("Authentication not marked as success", future.isSuccess());
+ assertFalse("Authentication marked as failure", future.isFailure());
+ assertEquals("Mismatched authentication attempts count", 1, count.get());
+ }
}
@Test
public void testKeyboardInteractiveInSessionUserInteractiveFailure() throws Exception {
final AtomicInteger count = new AtomicInteger();
- client.getProperties().put(ClientFactoryManager.PASSWORD_PROMPTS, "3");
+ final int MAX_PROMPTS = 3;
+ FactoryManagerUtils.updateProperty(client, ClientFactoryManager.PASSWORD_PROMPTS, MAX_PROMPTS);
client.setUserAuthFactories(Arrays
.<NamedFactory<UserAuth>> asList(new UserAuthKeyboardInteractive.UserAuthKeyboardInteractiveFactory()));
client.start();
- ClientSession session = client.connect("smx", "localhost", port).await().getSession();
- session.setUserInteraction(new UserInteraction() {
- @Override
- public void welcome(String banner) {
- }
-
- @Override
- public String[] interactive(String destination, String name, String instruction,
- String[] prompt, boolean[] echo) {
- count.incrementAndGet();
- return new String[] { "bad" };
- }
- });
- AuthFuture future = session.auth();
- future.await();
- assertTrue(future.isFailure());
- assertEquals(3, count.get());
+
+ try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+ session.setUserInteraction(new UserInteraction() {
+ @Override
+ public void welcome(String banner) {
+ // ignored
+ }
+
+ @Override
+ public String[] interactive(String destination, String name, String instruction,
+ String[] prompt, boolean[] echo) {
+ count.incrementAndGet();
+ return new String[] { "bad" };
+ }
+ });
+ AuthFuture future = session.auth();
+ future.await();
+ assertTrue("Authentication not, marked as failure", future.isFailure());
+ assertEquals("Mismatched authentication retry count", MAX_PROMPTS, count.get());
+ }
}
@Test
public void testClientDisconnect() throws Exception {
TestEchoShellFactory.TestEchoShell.latch = new CountDownLatch(1);
- try
- {
+ try {
client.start();
- ClientSession session = client.connect("smx", "localhost", port).await().getSession();
- session.addPasswordIdentity("smx");
- session.auth().verify();
- ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
- PipedOutputStream pipedIn = new PipedOutputStream();
- channel.setIn(new PipedInputStream(pipedIn));
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ByteArrayOutputStream err = new ByteArrayOutputStream();
- channel.setOut(out);
- channel.setErr(err);
- channel.open().await();
-
-// ((AbstractSession) session).disconnect(SshConstants.SSH2_DISCONNECT_BY_APPLICATION, "Cancel");
- AbstractSession cs = (AbstractSession) session;
- Buffer buffer = cs.createBuffer(SshConstants.SSH_MSG_DISCONNECT);
- buffer.putInt(SshConstants.SSH2_DISCONNECT_BY_APPLICATION);
- buffer.putString("Cancel");
- buffer.putString("");
- IoWriteFuture f = cs.writePacket(buffer);
- f.await();
- suspend(cs.getIoSession());
-
- TestEchoShellFactory.TestEchoShell.latch.await();
+
+ try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+ session.addPasswordIdentity("smx");
+ session.auth().verify(5L, TimeUnit.SECONDS);
+
+ try(ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
+ PipedOutputStream pipedIn = new PipedOutputStream();
+ InputStream inPipe = new PipedInputStream(pipedIn);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+
+ channel.setIn(inPipe);
+ channel.setOut(out);
+ channel.setErr(err);
+ channel.open().await();
+
+ // ((AbstractSession) session).disconnect(SshConstants.SSH2_DISCONNECT_BY_APPLICATION, "Cancel");
+ AbstractSession cs = (AbstractSession) session;
+ Buffer buffer = cs.createBuffer(SshConstants.SSH_MSG_DISCONNECT);
+ buffer.putInt(SshConstants.SSH2_DISCONNECT_BY_APPLICATION);
+ buffer.putString("Cancel");
+ buffer.putString("");
+ IoWriteFuture f = cs.writePacket(buffer);
+ f.await();
+ suspend(cs.getIoSession());
+
+ TestEchoShellFactory.TestEchoShell.latch.await();
+ }
+ }
} finally {
TestEchoShellFactory.TestEchoShell.latch = null;
}
@@ -753,10 +852,13 @@ public class ClientTest extends BaseTest {
}
);
client.start();
- ClientSession session = client.connect("smx", "localhost", port).await().getSession();
- session.waitFor(ClientSession.WAIT_AUTH, 10000);
- assertTrue(ok.get());
- client.stop();
+
+ try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+ session.waitFor(ClientSession.WAIT_AUTH, 10000);
+ assertTrue(ok.get());
+ } finally {
+ client.stop();
+ }
}
@Test
@@ -764,15 +866,18 @@ public class ClientTest extends BaseTest {
sshd.getCipherFactories().add(BuiltinCiphers.none);
client.getCipherFactories().add(BuiltinCiphers.none);
client.start();
- ClientSession session = client.connect("smx", "localhost", port).await().getSession();
- session.addPasswordIdentity("smx");
- session.auth().verify();
- session.switchToNoneCipher().await();
-
- ClientChannel channel = session.createSubsystemChannel("sftp");
- channel.open().verify();
-
- client.stop();
+
+ try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+ session.addPasswordIdentity("smx");
+ session.auth().verify(5L, TimeUnit.SECONDS);
+ session.switchToNoneCipher().await();
+
+ try(ClientChannel channel = session.createSubsystemChannel("sftp")) {
+ channel.open().verify(5L, TimeUnit.SECONDS);
+ }
+ } finally {
+ client.stop();
+ }
}
private void suspend(IoSession ioSession) {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/dafaa622/sshd-core/src/test/java/org/apache/sshd/ServerTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/ServerTest.java b/sshd-core/src/test/java/org/apache/sshd/ServerTest.java
index 9b52296..9190800 100644
--- a/sshd-core/src/test/java/org/apache/sshd/ServerTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/ServerTest.java
@@ -208,7 +208,8 @@ public class ServerTest extends BaseTest {
client.start();
try(ClientSession s = client.connect("test", "localhost", port).await().getSession()) {
s.addPasswordIdentity("test");
- s.auth().verify();
+ s.auth().verify(5L, TimeUnit.SECONDS);
+
try(ChannelShell shell = s.createShellChannel();
ByteArrayOutputStream out = new ByteArrayOutputStream();
ByteArrayOutputStream err = new ByteArrayOutputStream()) {
@@ -260,7 +261,8 @@ public class ServerTest extends BaseTest {
try(ClientSession s = client.connect("test", "localhost", port).await().getSession()) {
s.addPasswordIdentity("test");
- s.auth().verify();
+ s.auth().verify(5L, TimeUnit.SECONDS);
+
try(ChannelExec shell = s.createExecChannel("normal");
// Create a pipe that will block reading when the buffer is full
PipedInputStream pis = new PipedInputStream();
@@ -355,7 +357,7 @@ public class ServerTest extends BaseTest {
try(ClientSession s = client.connect("test", "localhost", port).await().getSession()) {
s.addPasswordIdentity("test");
- s.auth().verify();
+ s.auth().verify(5L, TimeUnit.SECONDS);
Assert.assertEquals("Mismatched client events count", 1, clientEventCount.get());
Assert.assertEquals("Mismatched server events count", 1, serverEventCount.get());
s.close(false);
@@ -411,7 +413,7 @@ public class ServerTest extends BaseTest {
try {
try(ClientSession s = client.connect("test", "localhost", port).await().getSession()) {
s.addPasswordIdentity("test");
- s.auth().verify();
+ s.auth().verify(5L, TimeUnit.SECONDS);
}
synchronized(eventsMap) {