You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2015/05/17 10:41:03 UTC

mina-sshd git commit: [SSHD-461] Provide 'verify' timeout overloads for AuthFuture, OpenFuture and IoReadFuture

Repository: mina-sshd
Updated Branches:
  refs/heads/master 84c23ddab -> dafaa6228


[SSHD-461] Provide 'verify' timeout overloads for AuthFuture, OpenFuture and IoReadFuture


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/dafaa622
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/dafaa622
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/dafaa622

Branch: refs/heads/master
Commit: dafaa6228f9e4c3574c1ac241727478147c609e9
Parents: 84c23dd
Author: Lyor Goldstein <lg...@vmware.com>
Authored: Sun May 17 11:40:49 2015 +0300
Committer: Lyor Goldstein <lg...@vmware.com>
Committed: Sun May 17 11:40:49 2015 +0300

----------------------------------------------------------------------
 .../apache/sshd/client/channel/ChannelExec.java |   1 +
 .../apache/sshd/client/future/AuthFuture.java   |  16 +-
 .../sshd/client/future/DefaultAuthFuture.java   |  31 +-
 .../sshd/client/future/DefaultOpenFuture.java   |  26 +-
 .../apache/sshd/client/future/OpenFuture.java   |  19 +-
 .../common/channel/ChannelAsyncInputStream.java |  30 +-
 .../sshd/common/future/DefaultSshFuture.java    |  35 +-
 .../org/apache/sshd/common/io/IoReadFuture.java |  18 +-
 .../test/java/org/apache/sshd/ClientTest.java   | 917 +++++++++++--------
 .../test/java/org/apache/sshd/ServerTest.java   |  10 +-
 10 files changed, 647 insertions(+), 456 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/dafaa622/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelExec.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelExec.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelExec.java
index 0d4dcfe..125cdbb 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelExec.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelExec.java
@@ -40,6 +40,7 @@ public class ChannelExec extends PtyCapableChannelSession {
         this.command = command;
     }
 
+    @Override
     protected void doOpen() throws IOException {
         doOpenPty();
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/dafaa622/sshd-core/src/main/java/org/apache/sshd/client/future/AuthFuture.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/future/AuthFuture.java b/sshd-core/src/main/java/org/apache/sshd/client/future/AuthFuture.java
index 081474a..99adeab 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/future/AuthFuture.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/future/AuthFuture.java
@@ -18,6 +18,8 @@
  */
 package org.apache.sshd.client.future;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.future.SshFuture;
 
@@ -27,15 +29,25 @@ import org.apache.sshd.common.future.SshFuture;
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
 public interface AuthFuture extends SshFuture<AuthFuture> {
-
     /**
      * Wait and verify that the authentication succeeded.
-     *
      * @throws SshException if the authentication failed for any reason
      */
     void verify() throws SshException;
 
     /**
+     * Wait and verify that the authentication succeeded within the specified timeout.
+     * @throws SshException if the authentication failed for any reason
+     */
+    void verify(long timeout, TimeUnit unit) throws SshException;
+
+    /**
+     * Wait and verify that the authentication succeeded within the specified timeout.
+     * @throws SshException if the authentication failed for any reason
+     */
+    void verify(long timeoutMillis) throws SshException;
+
+    /**
      * Returns the cause of the connection failure.
      *
      * @return <tt>null</tt> if the connect operation is not finished yet,

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/dafaa622/sshd-core/src/main/java/org/apache/sshd/client/future/DefaultAuthFuture.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/future/DefaultAuthFuture.java b/sshd-core/src/main/java/org/apache/sshd/client/future/DefaultAuthFuture.java
index 176eadc..d79cbb1 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/future/DefaultAuthFuture.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/future/DefaultAuthFuture.java
@@ -18,6 +18,8 @@
  */
 package org.apache.sshd.client.future;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.future.DefaultSshFuture;
 
@@ -28,23 +30,36 @@ import org.apache.sshd.common.future.DefaultSshFuture;
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
 public class DefaultAuthFuture extends DefaultSshFuture<AuthFuture> implements AuthFuture {
-
     public DefaultAuthFuture( Object lock) {
         super(lock);
     }
 
+    @Override   // TODO for JDK-8 make this a default method
     public void verify() throws SshException {
+        verify(Long.MAX_VALUE);
+    }
+
+    @Override   // TODO for JDK-8 make this a default method
+    public void verify(long timeout, TimeUnit unit) throws SshException {
+        verify(unit.toMillis(timeout));        
+    }
+
+    @Override
+    public void verify(long timeoutMillis) throws SshException {
         try {
-            await();
-        }
-        catch (InterruptedException e) {
+            if (!await(timeoutMillis)) {
+                throw new SshException("Authentication timeout afer " + timeoutMillis);
+            }
+        } catch (InterruptedException e) {
             throw new SshException("Authentication interrupted", e);
         }
+
         if (!isSuccess()) {
             throw new SshException("Authentication failed", getException());
         }
     }
 
+    @Override
     public Throwable getException() {
         Object v = getValue();
         if (v instanceof Throwable) {
@@ -54,20 +69,24 @@ public class DefaultAuthFuture extends DefaultSshFuture<AuthFuture> implements A
         }
     }
 
+    @Override
     public boolean isSuccess() {
         Object v = getValue();
-        return v instanceof Boolean && (Boolean) v;
+        return (v instanceof Boolean) && ((Boolean) v).booleanValue();
     }
 
+    @Override
     public boolean isFailure() {
         Object v = getValue();
-        return v instanceof Boolean && !((Boolean) v);
+        return (v instanceof Boolean) && (!((Boolean) v).booleanValue());
     }
 
+    @Override
     public void setAuthed(boolean authed) {
         setValue(Boolean.valueOf(authed));
     }
 
+    @Override
     public void setException(Throwable exception) {
         if (exception == null) {
             throw new NullPointerException("exception");

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/dafaa622/sshd-core/src/main/java/org/apache/sshd/client/future/DefaultOpenFuture.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/future/DefaultOpenFuture.java b/sshd-core/src/main/java/org/apache/sshd/client/future/DefaultOpenFuture.java
index ca6cb07..9baab23 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/future/DefaultOpenFuture.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/future/DefaultOpenFuture.java
@@ -18,6 +18,8 @@
  */
 package org.apache.sshd.client.future;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.future.DefaultSshFuture;
 
@@ -27,23 +29,36 @@ import org.apache.sshd.common.future.DefaultSshFuture;
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
 public class DefaultOpenFuture extends DefaultSshFuture<OpenFuture> implements OpenFuture {
-
     public DefaultOpenFuture(Object lock) {
         super(lock);
     }
 
+    @Override   // TODO for JDK-8 make this a default method
     public void verify() throws SshException {
+        verify(Long.MAX_VALUE);
+    }
+
+    @Override   // TODO for JDK-8 make this a default method
+    public void verify(long timeout, TimeUnit unit) throws SshException {
+        verify(unit.toMillis(timeout));        
+    }
+
+    @Override
+    public void verify(long timeoutMillis) throws SshException {
         try {
-            await();
-        }
-        catch (InterruptedException e) {
+            if (!await(timeoutMillis)) {
+                throw new SshException("Channel opening time out after " + timeoutMillis);
+            }
+        } catch (InterruptedException e) {
             throw new SshException("Channel opening interrupted", e);
         }
+
         if (!isOpened()) {
             throw new SshException("Channel opening failed", getException());
         }
     }
 
+    @Override
     public Throwable getException() {
         Object v = getValue();
         if (v instanceof Throwable) {
@@ -53,14 +68,17 @@ public class DefaultOpenFuture extends DefaultSshFuture<OpenFuture> implements O
         }
     }
 
+    @Override
     public boolean isOpened() {
         return getValue() instanceof Boolean;
     }
 
+    @Override
     public void setOpened() {
         setValue(Boolean.TRUE);
     }
 
+    @Override
     public void setException(Throwable exception) {
         if (exception == null) {
             throw new NullPointerException("exception");

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/dafaa622/sshd-core/src/main/java/org/apache/sshd/client/future/OpenFuture.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/future/OpenFuture.java b/sshd-core/src/main/java/org/apache/sshd/client/future/OpenFuture.java
index 3b833dc..fcb3ae9 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/future/OpenFuture.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/future/OpenFuture.java
@@ -18,6 +18,8 @@
  */
 package org.apache.sshd.client.future;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.future.SshFuture;
 
@@ -29,13 +31,24 @@ import org.apache.sshd.common.future.SshFuture;
 public interface OpenFuture extends SshFuture<OpenFuture> {
 
     /**
-     * Wait and verify that the channel has been successfuly opened.
-     *
-     * @throws org.apache.sshd.common.SshException if the authentication failed for any reason
+     * Wait and verify that the channel has been successfully opened.
+     * @throws SshException if the action failed for any reason
      */
     void verify() throws SshException;
 
     /**
+     * Wait and verify that the channel has been successfully opened within the specified timeout.
+     * @throws SshException if the action failed for any reason
+     */
+    void verify(long timeout, TimeUnit unit) throws SshException;
+
+    /**
+     * Wait and verify that the authentication succeeded within the specified timeout.
+     * @throws SshException if the action failed for any reason
+     */
+    void verify(long timeoutMillis) throws SshException;
+
+    /**
      * Returns the cause of the connection failure.
      *
      * @return <tt>null</tt> if the connect operation is not finished yet,

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/dafaa622/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
index 5a41887..d21eb9c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
@@ -19,6 +19,7 @@
 package org.apache.sshd.common.channel;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.sshd.common.Channel;
 import org.apache.sshd.common.RuntimeSshException;
@@ -85,6 +86,7 @@ public class ChannelAsyncInputStream extends CloseableUtils.AbstractCloseable im
         }
     }
 
+    @SuppressWarnings("synthetic-access")
     private void doRead(boolean resume) {
         IoReadFutureImpl future = null;
         int nbRead = 0;
@@ -111,7 +113,7 @@ public class ChannelAsyncInputStream extends CloseableUtils.AbstractCloseable im
             } catch (IOException e) {
                 channel.getSession().exceptionCaught(e);
             }
-            future.setValue(nbRead);
+            future.setValue(Integer.valueOf(nbRead));
         }
     }
 
@@ -121,8 +123,7 @@ public class ChannelAsyncInputStream extends CloseableUtils.AbstractCloseable im
     }
 
     public static class IoReadFutureImpl extends DefaultSshFuture<IoReadFuture> implements IoReadFuture {
-
-        final Buffer buffer;
+        private final Buffer buffer;
 
         public IoReadFutureImpl(Buffer buffer) {
             super(null);
@@ -134,12 +135,23 @@ public class ChannelAsyncInputStream extends CloseableUtils.AbstractCloseable im
             return buffer;
         }
 
-        @Override
+        @Override   // TODO for JDK-8 make this a default method
         public void verify() throws SshException {
+            verify(Long.MAX_VALUE);
+        }
+
+        @Override   // TODO for JDK-8 make this a default method
+        public void verify(long timeout, TimeUnit unit) throws SshException {
+            verify(unit.toMillis(timeout));        
+        }
+
+        @Override
+        public void verify(long timeoutMillis) throws SshException {
             try {
-                await();
-            }
-            catch (InterruptedException e) {
+                if (!await(timeoutMillis)) {
+                    throw new SshException("Timed out after " + timeoutMillis);
+                }
+            } catch (InterruptedException e) {
                 throw new SshException("Interrupted", e);
             }
             if (getValue() instanceof Throwable) {
@@ -156,9 +168,9 @@ public class ChannelAsyncInputStream extends CloseableUtils.AbstractCloseable im
             } else if (v instanceof Throwable) {
                 throw (RuntimeSshException) new RuntimeSshException("Error reading from channel.").initCause((Throwable) v);
             } else if (v instanceof Integer) {
-                return (Integer) v;
+                return ((Integer) v).intValue();
             } else {
-                throw new IllegalStateException();
+                throw new IllegalStateException("Unknown read value type: " + ((v == null) ? "null" : v.getClass().getName()));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/dafaa622/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java b/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
index 634eb58..2e9b8af 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/future/DefaultSshFuture.java
@@ -55,11 +55,10 @@ public class DefaultSshFuture<T extends SshFuture> implements SshFuture<T> {
      */
     @Override
     public T await() throws InterruptedException {
-        synchronized (lock) {
-            while (result == null) {
-                lock.wait();
-            }
+        if (await0(Long.MAX_VALUE, true) == null) {
+            throw new InternalError("No result while await completion");
         }
+
         return asT();
     }
 
@@ -76,7 +75,7 @@ public class DefaultSshFuture<T extends SshFuture> implements SshFuture<T> {
      */
     @Override
     public boolean await(long timeoutMillis) throws InterruptedException {
-        return await0(timeoutMillis, true);
+        return await0(timeoutMillis, true) != null;
     }
 
     /**
@@ -107,32 +106,29 @@ public class DefaultSshFuture<T extends SshFuture> implements SshFuture<T> {
     @Override
     public boolean awaitUninterruptibly(long timeoutMillis) {
         try {
-            return await0(timeoutMillis, false);
+            return await0(timeoutMillis, false) != null;
         } catch (InterruptedException e) {
-            throw new InternalError();
+            throw new InternalError("Unexpected interrupted exception wile awaitUninterruptibly " + timeoutMillis + " msec.: " + e.getMessage(), e);
         }
     }
 
     /**
      * Wait for the Future to be ready. If the requested delay is 0 or
-     * negative, this method immediately returns the value of the
-     * 'ready' flag.
-     * Every 5 second, the wait will be suspended to be able to check if
-     * there is a deadlock or not.
-     *
+     * negative, this method immediately returns.
      * @param timeoutMillis The delay we will wait for the Future to be ready
      * @param interruptable Tells if the wait can be interrupted or not
-     * @return <code>true</code> if the Future is ready
+     * @return The non-{@code null} result object if the Future is ready,
+     * {@code null} if the timeout expired and no result was received
      * @throws InterruptedException If the thread has been interrupted
      * when it's not allowed.
      */
-    private boolean await0(long timeoutMillis, boolean interruptable) throws InterruptedException {
+    protected Object await0(long timeoutMillis, boolean interruptable) throws InterruptedException {
         long curTime = System.currentTimeMillis();
-        long endTime = Long.MAX_VALUE - timeoutMillis < curTime ? Long.MAX_VALUE : curTime + timeoutMillis;
+        long endTime = ((Long.MAX_VALUE - timeoutMillis) < curTime) ? Long.MAX_VALUE : (curTime + timeoutMillis);
 
         synchronized (lock) {
-            if (result != null || timeoutMillis <= 0) {
-                return result != null;
+            if ((result != null) || (timeoutMillis <= 0)) {
+                return result;
             }
 
             for (;;) {
@@ -145,14 +141,13 @@ public class DefaultSshFuture<T extends SshFuture> implements SshFuture<T> {
                 }
 
                 curTime = System.currentTimeMillis();
-                if (result != null || curTime >= endTime) {
-                    return result != null;
+                if ((result != null) || (curTime >= endTime)) {
+                    return result;
                 }
             }
         }
     }
 
-
     /**
      * {@inheritDoc}
      */

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/dafaa622/sshd-core/src/main/java/org/apache/sshd/common/io/IoReadFuture.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoReadFuture.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoReadFuture.java
index ebf9c20..3865ed4 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/IoReadFuture.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoReadFuture.java
@@ -18,6 +18,8 @@
  */
 package org.apache.sshd.common.io;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.future.SshFuture;
 import org.apache.sshd.common.util.buffer.Buffer;
@@ -26,11 +28,23 @@ public interface IoReadFuture extends SshFuture<IoReadFuture> {
 
     /**
      * Wait and verify that the read succeeded.
-     *
-     * @throws org.apache.sshd.common.SshException if the write failed for any reason
+     * @throws SshException if the action failed for any reason
      */
     void verify() throws SshException;
 
+
+    /**
+     * Wait and verify that the read succeeded within the specified timeout.
+     * @throws SshException if the action failed for any reason
+     */
+    void verify(long timeout, TimeUnit unit) throws SshException;
+
+    /**
+     * Wait and verify that the authentication succeeded within the specified timeout.
+     * @throws SshException if the action failed for any reason
+     */
+    void verify(long timeoutMillis) throws SshException;
+
     Buffer getBuffer();
 
     int getRead();

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/dafaa622/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/ClientTest.java b/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
index 4521016..5c75dc5 100644
--- a/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
@@ -35,6 +36,7 @@ import java.security.KeyPair;
 import java.security.PublicKey;
 import java.util.Arrays;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -51,6 +53,7 @@ import org.apache.sshd.client.future.AuthFuture;
 import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.Channel;
 import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.FactoryManagerUtils;
 import org.apache.sshd.common.KeyPairProvider;
 import org.apache.sshd.common.NamedFactory;
 import org.apache.sshd.common.RuntimeSshException;
@@ -69,8 +72,8 @@ import org.apache.sshd.common.io.mina.MinaSession;
 import org.apache.sshd.common.io.nio2.Nio2Session;
 import org.apache.sshd.common.session.AbstractSession;
 import org.apache.sshd.common.session.ConnectionService;
+import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
-import org.apache.sshd.common.util.buffer.BufferUtils;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.common.util.io.NoCloseOutputStream;
 import org.apache.sshd.server.Command;
@@ -127,6 +130,7 @@ public class ClientTest extends BaseTest {
                     @Override
                     public Service create(Session session) throws IOException {
                         return new ServerUserAuthService(session) {
+                            @SuppressWarnings("synthetic-access")
                             @Override
                             public void process(byte cmd, Buffer buffer) throws Exception {
                                 authLatch.await();
@@ -142,6 +146,7 @@ public class ClientTest extends BaseTest {
                     @Override
                     public Channel create() {
                         return new ChannelSession() {
+                            @SuppressWarnings("synthetic-access")
                             @Override
                             public OpenFuture open(int recipient, int rwsize, int rmpsize, Buffer buffer) {
                                 try {
@@ -183,248 +188,277 @@ public class ClientTest extends BaseTest {
 
         client.getProperties().put(FactoryManager.WINDOW_SIZE, "1024");
         client.start();
-        ClientSession session = client.connect("smx", "localhost", port).await().getSession();
-        session.addPasswordIdentity("smx");
-        session.auth().verify();
-        final ChannelShell channel = session.createShellChannel();
-        channel.setStreaming(ClientChannel.Streaming.Async);
-        channel.open().verify();
 
-
-        final byte[] message = "0123456789\n".getBytes();
-        final int nbMessages = 1000;
-
-        final ByteArrayOutputStream baosOut = new ByteArrayOutputStream();
-        final ByteArrayOutputStream baosErr = new ByteArrayOutputStream();
-        final AtomicInteger writes = new AtomicInteger(nbMessages);
-
-        channel.getAsyncIn().write(new ByteArrayBuffer(message))
-                .addListener(new SshFutureListener<IoWriteFuture>() {
-                    @Override
-                    public void operationComplete(IoWriteFuture future) {
-                        try {
-                            if (future.isWritten()) {
-                                if (writes.decrementAndGet() > 0) {
-                                    channel.getAsyncIn().write(new ByteArrayBuffer(message)).addListener(this);
-                                } else {
-                                    channel.getAsyncIn().close(false);
+        try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+            session.addPasswordIdentity("smx");
+            session.auth().verify(5L, TimeUnit.SECONDS);
+
+            try(final ChannelShell channel = session.createShellChannel()) {
+                channel.setStreaming(ClientChannel.Streaming.Async);
+                channel.open().verify(5L, TimeUnit.SECONDS);
+        
+                final byte[] message = "0123456789\n".getBytes();
+                final int nbMessages = 1000;
+        
+                try(final ByteArrayOutputStream baosOut = new ByteArrayOutputStream();
+                    final ByteArrayOutputStream baosErr = new ByteArrayOutputStream()) {
+                    final AtomicInteger writes = new AtomicInteger(nbMessages);
+            
+                    channel.getAsyncIn().write(new ByteArrayBuffer(message))
+                            .addListener(new SshFutureListener<IoWriteFuture>() {
+                                @Override
+                                public void operationComplete(IoWriteFuture future) {
+                                    try {
+                                        if (future.isWritten()) {
+                                            if (writes.decrementAndGet() > 0) {
+                                                channel.getAsyncIn().write(new ByteArrayBuffer(message)).addListener(this);
+                                            } else {
+                                                channel.getAsyncIn().close(false);
+                                            }
+                                        } else {
+                                            throw new SshException("Error writing", future.getException());
+                                        }
+                                    } catch (IOException e) {
+                                        if (!channel.isClosing()) {
+                                            e.printStackTrace();
+                                            channel.close(true);
+                                        }
+                                    }
                                 }
-                            } else {
-                                throw new SshException("Error writing", future.getException());
-                            }
-                        } catch (IOException e) {
-                            if (!channel.isClosing()) {
-                                e.printStackTrace();
-                                channel.close(true);
-                            }
-                        }
-                    }
-                });
-        channel.getAsyncOut().read(new ByteArrayBuffer())
-                .addListener(new SshFutureListener<IoReadFuture>() {
-                    @Override
-                    public void operationComplete(IoReadFuture future) {
-                        try {
-                            future.verify();
-                            Buffer buffer = future.getBuffer();
-                            baosOut.write(buffer.array(), buffer.rpos(), buffer.available());
-                            buffer.rpos(buffer.rpos() + buffer.available());
-                            buffer.compact();
-                            channel.getAsyncOut().read(buffer).addListener(this);
-                        } catch (IOException e) {
-                            if (!channel.isClosing()) {
-                                e.printStackTrace();
-                                channel.close(true);
-                            }
-                        }
-                    }
-                });
-        channel.getAsyncErr().read(new ByteArrayBuffer())
-                .addListener(new SshFutureListener<IoReadFuture>() {
-                    @Override
-                    public void operationComplete(IoReadFuture future) {
-                        try {
-                            future.verify();
-                            Buffer buffer = future.getBuffer();
-                            baosErr.write(buffer.array(), buffer.rpos(), buffer.available());
-                            buffer.rpos(buffer.rpos() + buffer.available());
-                            buffer.compact();
-                            channel.getAsyncErr().read(buffer).addListener(this);
-                        } catch (IOException e) {
-                            if (!channel.isClosing()) {
-                                e.printStackTrace();
-                                channel.close(true);
-                            }
-                        }
-                    }
-                });
-
-        channel.waitFor(ClientChannel.CLOSED, 0);
-
-        assertEquals(nbMessages * message.length, baosOut.size());
+                            });
+                    channel.getAsyncOut().read(new ByteArrayBuffer())
+                            .addListener(new SshFutureListener<IoReadFuture>() {
+                                @Override
+                                public void operationComplete(IoReadFuture future) {
+                                    try {
+                                        future.verify(5L, TimeUnit.SECONDS);
+                                        Buffer buffer = future.getBuffer();
+                                        baosOut.write(buffer.array(), buffer.rpos(), buffer.available());
+                                        buffer.rpos(buffer.rpos() + buffer.available());
+                                        buffer.compact();
+                                        channel.getAsyncOut().read(buffer).addListener(this);
+                                    } catch (IOException e) {
+                                        if (!channel.isClosing()) {
+                                            e.printStackTrace();
+                                            channel.close(true);
+                                        }
+                                    }
+                                }
+                            });
+                    channel.getAsyncErr().read(new ByteArrayBuffer())
+                            .addListener(new SshFutureListener<IoReadFuture>() {
+                                @Override
+                                public void operationComplete(IoReadFuture future) {
+                                    try {
+                                        future.verify(5L, TimeUnit.SECONDS);
+                                        Buffer buffer = future.getBuffer();
+                                        baosErr.write(buffer.array(), buffer.rpos(), buffer.available());
+                                        buffer.rpos(buffer.rpos() + buffer.available());
+                                        buffer.compact();
+                                        channel.getAsyncErr().read(buffer).addListener(this);
+                                    } catch (IOException e) {
+                                        if (!channel.isClosing()) {
+                                            e.printStackTrace();
+                                            channel.close(true);
+                                        }
+                                    }
+                                }
+                            });
+        
+                    channel.waitFor(ClientChannel.CLOSED, 0);
+        
+                    assertEquals(nbMessages * message.length, baosOut.size());
+                }
+            }    
 
-        client.close(true);
+            client.close(true);
+        }
     }
 
     @Test
     public void testCommandDeadlock() throws Exception {
         client.start();
-        ClientSession session = client.connect("smx", "localhost", port).await().getSession();
-        session.addPasswordIdentity("smx");
-        session.auth().verify();
-        ChannelExec channel = session.createExecChannel("test");
-        channel.setOut(new NoCloseOutputStream(System.out));
-        channel.setErr(new NoCloseOutputStream(System.err));
-        channel.open().await();
-        Thread.sleep(100);
-        try {
-            for (int i = 0; i < 100; i++) {
-                channel.getInvertedIn().write("a".getBytes());
-                channel.getInvertedIn().flush();
+        try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+            session.addPasswordIdentity("smx");
+            session.auth().verify(5L, TimeUnit.SECONDS);
+            
+            try(ChannelExec channel = session.createExecChannel("test");
+                OutputStream stdout = new NoCloseOutputStream(System.out);
+                OutputStream stderr = new NoCloseOutputStream(System.err)) {
+
+                channel.setOut(stdout);
+                channel.setErr(stderr);
+                channel.open().await();
+                Thread.sleep(100);
+                try {
+                    for (int i = 0; i < 100; i++) {
+                        channel.getInvertedIn().write("a".getBytes());
+                        channel.getInvertedIn().flush();
+                    }
+                } catch (SshException e) {
+                    // That's ok, the channel is being closed by the other side
+                }
+                assertEquals(ClientChannel.CLOSED, channel.waitFor(ClientChannel.CLOSED, 0) & ClientChannel.CLOSED);
+                session.close(false).await();
             }
-        } catch (SshException e) {
-            // That's ok, the channel is being closed by the other side
+        } finally {
+            client.stop();
         }
-        assertEquals(ClientChannel.CLOSED, channel.waitFor(ClientChannel.CLOSED, 0) & ClientChannel.CLOSED);
-        session.close(false).await();
-        client.stop();
     }
 
     @Test
     public void testClient() throws Exception {
         client.start();
-        ClientSession session = client.connect("smx", "localhost", port).await().getSession();
-        session.addPasswordIdentity("smx");
-        session.auth().verify();
-        ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
-
-        ByteArrayOutputStream sent = new ByteArrayOutputStream();
-        PipedOutputStream pipedIn = new PipedOutputStream();
-        channel.setIn(new PipedInputStream(pipedIn));
-        OutputStream teeOut = new TeeOutputStream(sent, pipedIn);
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        ByteArrayOutputStream err = new ByteArrayOutputStream();
-        channel.setOut(out);
-        channel.setErr(err);
-        channel.open();
-
-        teeOut.write("this is my command\n".getBytes());
-        teeOut.flush();
-
-        StringBuilder sb = new StringBuilder();
-        for (int i = 0; i < 1000; i++) {
-            sb.append("0123456789");
-        }
-        sb.append("\n");
-        teeOut.write(sb.toString().getBytes());
-
-        teeOut.write("exit\n".getBytes());
-        teeOut.flush();
-
-        channel.waitFor(ClientChannel.CLOSED, 0);
 
-        channel.close(false);
-        client.stop();
-
-        assertArrayEquals(sent.toByteArray(), out.toByteArray());
+        try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+            session.addPasswordIdentity("smx");
+            session.auth().verify(5L, TimeUnit.SECONDS);
+            
+            try(ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
+                ByteArrayOutputStream sent = new ByteArrayOutputStream();
+                PipedOutputStream pipedIn = new PipedOutputStream();
+                PipedInputStream pipedOut = new PipedInputStream(pipedIn)) {
+
+                channel.setIn(pipedOut);
+
+                try(OutputStream teeOut = new TeeOutputStream(sent, pipedIn);
+                    ByteArrayOutputStream out = new ByteArrayOutputStream();
+                    ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+
+                    channel.setOut(out);
+                    channel.setErr(err);
+                    channel.open();
+            
+                    teeOut.write("this is my command\n".getBytes());
+                    teeOut.flush();
+            
+                    StringBuilder sb = new StringBuilder();
+                    for (int i = 0; i < 1000; i++) {
+                        sb.append("0123456789");
+                    }
+                    sb.append("\n");
+                    teeOut.write(sb.toString().getBytes());
+            
+                    teeOut.write("exit\n".getBytes());
+                    teeOut.flush();
+            
+                    channel.waitFor(ClientChannel.CLOSED, 0);
+            
+                    channel.close(false);
+                    client.stop();
+            
+                    assertArrayEquals(sent.toByteArray(), out.toByteArray());
+                }
+            }
+        }
     }
 
     @Test
     public void testClientInverted() throws Exception {
         client.start();
-        ClientSession session = client.connect("smx", "localhost", port).await().getSession();
-        session.addPasswordIdentity("smx");
-        session.auth().verify();
-        ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
-
-        ByteArrayOutputStream sent = new ByteArrayOutputStream();
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        ByteArrayOutputStream err = new ByteArrayOutputStream();
-        channel.setOut(out);
-        channel.setErr(err);
-        channel.open().await();
-
-        OutputStream pipedIn = new TeeOutputStream(sent, channel.getInvertedIn());
-
-        pipedIn.write("this is my command\n".getBytes());
-        pipedIn.flush();
-
-        StringBuilder sb = new StringBuilder();
-        for (int i = 0; i < 1000; i++) {
-            sb.append("0123456789");
+        
+        try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+            session.addPasswordIdentity("smx");
+            session.auth().verify(5L, TimeUnit.SECONDS);
+            
+            try(ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
+                ByteArrayOutputStream sent = new ByteArrayOutputStream();
+                ByteArrayOutputStream out = new ByteArrayOutputStream();
+                ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+
+                channel.setOut(out);
+                channel.setErr(err);
+                channel.open().await();
+        
+                try(OutputStream pipedIn = new TeeOutputStream(sent, channel.getInvertedIn())) {
+                    pipedIn.write("this is my command\n".getBytes());
+                    pipedIn.flush();
+            
+                    StringBuilder sb = new StringBuilder();
+                    for (int i = 0; i < 1000; i++) {
+                        sb.append("0123456789");
+                    }
+                    sb.append("\n");
+                    pipedIn.write(sb.toString().getBytes());
+            
+                    pipedIn.write("exit\n".getBytes());
+                    pipedIn.flush();
+                }
+        
+                channel.waitFor(ClientChannel.CLOSED, 0);
+        
+                channel.close(false);
+                client.stop();
+        
+                assertArrayEquals(sent.toByteArray(), out.toByteArray());
+            }
         }
-        sb.append("\n");
-        pipedIn.write(sb.toString().getBytes());
-
-        pipedIn.write("exit\n".getBytes());
-        pipedIn.flush();
-
-        channel.waitFor(ClientChannel.CLOSED, 0);
-
-        channel.close(false);
-        client.stop();
-
-        assertArrayEquals(sent.toByteArray(), out.toByteArray());
     }
 
     @Test
     public void testClientWithCustomChannel() throws Exception {
         client.start();
-        ClientSession session = client.connect("smx", "localhost", port).await().getSession();
-        session.addPasswordIdentity("smx");
-        session.auth().verify();
-
-        ChannelShell channel = new ChannelShell();
-        session.getService(ConnectionService.class).registerChannel(channel);
-
-        ByteArrayOutputStream sent = new ByteArrayOutputStream();
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        ByteArrayOutputStream err = new ByteArrayOutputStream();
-        channel.setOut(out);
-        channel.setErr(err);
-        channel.open().verify();
-
-        channel.close(false).await();
-        client.stop();
+        
+        try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+            session.addPasswordIdentity("smx");
+            session.auth().verify(5L, TimeUnit.SECONDS);
+    
+            try(ChannelShell channel = new ChannelShell();
+                ByteArrayOutputStream sent = new ByteArrayOutputStream();
+                ByteArrayOutputStream out = new ByteArrayOutputStream();
+                ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+
+                session.getService(ConnectionService.class).registerChannel(channel);
+                channel.setOut(out);
+                channel.setErr(err);
+                channel.open().verify(5L, TimeUnit.SECONDS);
+                channel.close(false).await();
+            }
+        } finally {
+            client.stop();
+        }
     }
 
     @Test
     public void testClientClosingStream() throws Exception {
         client.start();
-        ClientSession session = client.connect("smx", "localhost", port).await().getSession();
-        session.addPasswordIdentity("smx");
-        session.auth().verify();
-        ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
-
-
-        ByteArrayOutputStream sent = new ByteArrayOutputStream();
-        PipedOutputStream pipedIn = new PipedOutputStream();
-        OutputStream teeOut = new TeeOutputStream(sent, pipedIn);
-        channel.setIn(new PipedInputStream(pipedIn));
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        ByteArrayOutputStream err = new ByteArrayOutputStream();
-        channel.setOut(out);
-        channel.setErr(err);
-        channel.open();
-
-        teeOut.write("this is my command\n".getBytes());
-        teeOut.flush();
-
-        StringBuilder sb = new StringBuilder();
-        for (int i = 0; i < 1000; i++) {
-            sb.append("0123456789");
+        
+        try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+            session.addPasswordIdentity("smx");
+            session.auth().verify(5L, TimeUnit.SECONDS);
+    
+            try(ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
+                ByteArrayOutputStream sent = new ByteArrayOutputStream();
+                PipedOutputStream pipedIn = new PipedOutputStream();
+                InputStream inPipe = new PipedInputStream(pipedIn);
+                ByteArrayOutputStream out = new ByteArrayOutputStream();
+                ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+
+                channel.setIn(inPipe);
+                channel.setOut(out);
+                channel.setErr(err);
+                channel.open();
+
+                try(OutputStream teeOut = new TeeOutputStream(sent, pipedIn)) {
+                    teeOut.write("this is my command\n".getBytes());
+                    teeOut.flush();
+        
+                    StringBuilder sb = new StringBuilder();
+                    for (int i = 0; i < 1000; i++) {
+                        sb.append("0123456789");
+                    }
+                    sb.append("\n");
+                    teeOut.write(sb.toString().getBytes());
+                }    
+    
+                channel.waitFor(ClientChannel.CLOSED, 0);
+        
+                channel.close(false);
+                client.stop();
+        
+                assertArrayEquals(sent.toByteArray(), out.toByteArray());
+            }
         }
-        sb.append("\n");
-        teeOut.write(sb.toString().getBytes());
-
-        teeOut.close();
-
-        channel.waitFor(ClientChannel.CLOSED, 0);
-
-        channel.close(false);
-        client.stop();
-
-        assertArrayEquals(sent.toByteArray(), out.toByteArray());
     }
 
     @Test
@@ -435,138 +469,175 @@ public class ClientTest extends BaseTest {
 //        sshd.getProperties().put(SshServer.WINDOW_SIZE, Integer.toString(0x20000));
 //        sshd.getProperties().put(SshServer.MAX_PACKET_SIZE, Integer.toString(0x1000));
         client.start();
-        ClientSession session = client.connect("smx", "localhost", port).await().getSession();
-        session.addPasswordIdentity("smx");
-        session.auth().verify();
-        ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
-        ByteArrayOutputStream sent = new ByteArrayOutputStream();
-        PipedOutputStream pipedIn = new PipedOutputStream();
-        OutputStream teeOut = new TeeOutputStream(sent, pipedIn);
-        channel.setIn(new PipedInputStream(pipedIn));
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        ByteArrayOutputStream err = new ByteArrayOutputStream();
-        channel.setOut(out);
-        channel.setErr(err);
-        channel.open().await();
-
-        long t0 = System.currentTimeMillis();
-
-        int bytes = 0;
-        for (int i = 0; i < 10000; i++) {
-            byte[] data = "01234567890123456789012345678901234567890123456789\n".getBytes();
-            teeOut.write(data);
-            teeOut.flush();
-            bytes += data.length;
-            if ((bytes & 0xFFF00000) != ((bytes - data.length) & 0xFFF00000)) {
-                System.out.println("Bytes written: " + bytes);
+        
+        try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+            session.addPasswordIdentity("smx");
+            session.auth().verify(5L, TimeUnit.SECONDS);
+
+            try(ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
+                ByteArrayOutputStream sent = new ByteArrayOutputStream();
+                PipedOutputStream pipedIn = new PipedOutputStream();
+                InputStream inPipe = new PipedInputStream(pipedIn); 
+                ByteArrayOutputStream out = new ByteArrayOutputStream();
+                ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+
+                channel.setIn(inPipe);
+                channel.setOut(out);
+                channel.setErr(err);
+                channel.open().await();
+        
+        
+                int bytes = 0;
+                byte[] data = "01234567890123456789012345678901234567890123456789\n".getBytes();
+                long t0 = System.currentTimeMillis();
+                try(OutputStream teeOut = new TeeOutputStream(sent, pipedIn)) {
+                    for (int i = 0; i < 10000; i++) {
+                        teeOut.write(data);
+                        teeOut.flush();
+                        bytes += data.length;
+                        if ((bytes & 0xFFF00000) != ((bytes - data.length) & 0xFFF00000)) {
+                            System.out.println("Bytes written: " + bytes);
+                        }
+                    }
+                    teeOut.write("exit\n".getBytes());
+                    teeOut.flush();
+                }        
+                long t1 = System.currentTimeMillis();
+        
+                System.out.println("Sent " + (bytes / 1024) + " Kb in " + (t1 - t0) + " ms");
+        
+                System.out.println("Waiting for channel to be closed");
+        
+                channel.waitFor(ClientChannel.CLOSED, 0);
+        
+                channel.close(false);
+                client.stop();
+        
+                assertArrayEquals(sent.toByteArray(), out.toByteArray());
+                //assertArrayEquals(sent.toByteArray(), out.toByteArray());
             }
         }
-        teeOut.write("exit\n".getBytes());
-        teeOut.flush();
-
-        long t1 = System.currentTimeMillis();
-
-        System.out.println("Sent " + (bytes / 1024) + " Kb in " + (t1 - t0) + " ms");
-
-        System.out.println("Waiting for channel to be closed");
-
-        channel.waitFor(ClientChannel.CLOSED, 0);
-
-        channel.close(false);
-        client.stop();
-
-        assertTrue(BufferUtils.equals(sent.toByteArray(), out.toByteArray()));
-        //assertArrayEquals(sent.toByteArray(), out.toByteArray());
     }
 
     @Test(expected = SshException.class)
     public void testOpenChannelOnClosedSession() throws Exception {
         client.start();
-        ClientSession session = client.connect("smx", "localhost", port).await().getSession();
-        session.addPasswordIdentity("smx");
-        session.auth().verify();
-        ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
-        session.close(false);
-
-        PipedOutputStream pipedIn = new PipedOutputStream();
-        channel.setIn(new PipedInputStream(pipedIn));
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        ByteArrayOutputStream err = new ByteArrayOutputStream();
-        channel.setOut(out);
-        channel.setErr(err);
-        channel.open();
+        
+        try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+            session.addPasswordIdentity("smx");
+            session.auth().verify(5L, TimeUnit.SECONDS);
+            
+            try(ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL)) {
+                session.close(false);
+        
+                try(PipedOutputStream pipedIn = new PipedOutputStream();
+                    InputStream inPipe = new PipedInputStream(pipedIn);
+                    ByteArrayOutputStream out = new ByteArrayOutputStream();
+                    ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+
+                    channel.setIn(inPipe);
+                    channel.setOut(out);
+                    channel.setErr(err);
+                    channel.open();
+                }
+            }
+        }
     }
 
     @Test
     public void testCloseBeforeAuthSucceed() throws Exception {
         authLatch = new CountDownLatch(1);
         client.start();
-        ClientSession session = client.connect("smx", "localhost", port).await().getSession();
-        session.addPasswordIdentity("smx");
-        AuthFuture authFuture = session.auth();
-        CloseFuture closeFuture = session.close(false);
-        authLatch.countDown();
-        authFuture.await();
-        closeFuture.await();
-        assertNotNull(authFuture.getException());
-        assertTrue(closeFuture.isClosed());
+        
+        try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+            session.addPasswordIdentity("smx");
+
+            AuthFuture authFuture = session.auth();
+            CloseFuture closeFuture = session.close(false);
+            authLatch.countDown();
+            authFuture.await();
+            closeFuture.await();
+            assertNotNull("No authentication exception", authFuture.getException());
+            assertTrue("Future not closed", closeFuture.isClosed());
+        }
     }
 
     @Test
     public void testCloseCleanBeforeChannelOpened() throws Exception {
         client.start();
-        ClientSession session = client.connect("smx", "localhost", port).await().getSession();
-        session.addPasswordIdentity("smx");
-        session.auth().verify();
-        ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
-        channel.setIn(new ByteArrayInputStream(new byte[0]));
-        channel.setOut(new ByteArrayOutputStream());
-        channel.setErr(new ByteArrayOutputStream());
-        OpenFuture openFuture = channel.open();
-        CloseFuture closeFuture = session.close(false);
-        openFuture.await();
-        closeFuture.await();
-        assertTrue(openFuture.isOpened());
-        assertTrue(closeFuture.isClosed());
+        
+        try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+            session.addPasswordIdentity("smx");
+            session.auth().verify(5L, TimeUnit.SECONDS);
+
+            try(ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
+                InputStream inp = new ByteArrayInputStream(GenericUtils.EMPTY_BYTE_ARRAY);
+                OutputStream out = new ByteArrayOutputStream();
+                OutputStream err = new ByteArrayOutputStream()) { 
+
+                channel.setIn(inp);
+                channel.setOut(out);
+                channel.setErr(err);
+
+                OpenFuture openFuture = channel.open();
+                CloseFuture closeFuture = session.close(false);
+                openFuture.await();
+                closeFuture.await();
+                assertTrue("Not open", openFuture.isOpened());
+                assertTrue("Not closed", closeFuture.isClosed());
+            }
+        }
     }
 
     @Test
     public void testCloseImmediateBeforeChannelOpened() throws Exception {
         channelLatch = new CountDownLatch(1);
         client.start();
-        ClientSession session = client.connect("smx", "localhost", port).await().getSession();
-        session.addPasswordIdentity("smx");
-        session.auth().verify();
-        ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
-        channel.setIn(new ByteArrayInputStream(new byte[0]));
-        channel.setOut(new ByteArrayOutputStream());
-        channel.setErr(new ByteArrayOutputStream());
-        OpenFuture openFuture = channel.open();
-        CloseFuture closeFuture = session.close(true);
-        channelLatch.countDown();
-        openFuture.await();
-        closeFuture.await();
-        assertNotNull(openFuture.getException());
-        assertTrue(closeFuture.isClosed());
+
+        try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+            session.addPasswordIdentity("smx");
+            session.auth().verify(5L, TimeUnit.SECONDS);
+
+            try(ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
+                InputStream inp = new ByteArrayInputStream(GenericUtils.EMPTY_BYTE_ARRAY);
+                OutputStream out = new ByteArrayOutputStream();
+                OutputStream err = new ByteArrayOutputStream()) { 
+
+                channel.setIn(inp);
+                channel.setOut(out);
+                channel.setErr(err);
+
+                OpenFuture openFuture = channel.open();
+                CloseFuture closeFuture = session.close(true);
+                channelLatch.countDown();
+                openFuture.await();
+                closeFuture.await();
+                assertNotNull("No open exception", openFuture.getException());
+                assertTrue("Not closed", closeFuture.isClosed());
+            }
+        }
     }
 
     @Test
     public void testPublicKeyAuth() throws Exception {
         client.start();
-        ClientSession session = client.connect("smx", "localhost", port).await().getSession();
-
-        KeyPair pair = Utils.createTestHostKeyProvider().loadKey(KeyPairProvider.SSH_RSA);
-        session.addPublicKeyIdentity(pair);
-        session.auth().verify();
+        
+        try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+            KeyPair pair = Utils.createTestHostKeyProvider().loadKey(KeyPairProvider.SSH_RSA);
+            session.addPublicKeyIdentity(pair);
+            session.auth().verify(5L, TimeUnit.SECONDS);
+        }
     }
 
     @Test
     public void testPublicKeyAuthNew() throws Exception {
         client.setUserAuthFactories(Arrays.<NamedFactory<UserAuth>>asList(UserAuthPublicKey.UserAuthPublicKeyFactory.INSTANCE));
         client.start();
-        ClientSession session = client.connect("smx", "localhost", port).await().getSession();
-        session.addPublicKeyIdentity(Utils.createTestHostKeyProvider().loadKey(KeyPairProvider.SSH_RSA));
-        session.auth().verify();
+        
+        try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+            session.addPublicKeyIdentity(Utils.createTestHostKeyProvider().loadKey(KeyPairProvider.SSH_RSA));
+            session.auth().verify(5L, TimeUnit.SECONDS);
+        }
     }
 
     @Test
@@ -580,58 +651,71 @@ public class ClientTest extends BaseTest {
         });
         client.setUserAuthFactories(Arrays.<NamedFactory<UserAuth>>asList(UserAuthPublicKey.UserAuthPublicKeyFactory.INSTANCE));
         client.start();
-        ClientSession session = client.connect("smx", "localhost", port).await().getSession();
-        session.addPublicKeyIdentity(new SimpleGeneratorHostKeyProvider(null, "RSA").loadKey(KeyPairProvider.SSH_RSA));
-        session.addPublicKeyIdentity(pair);
-        session.auth().verify();
+
+        try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+            session.addPublicKeyIdentity(new SimpleGeneratorHostKeyProvider(null, "RSA").loadKey(KeyPairProvider.SSH_RSA));
+            session.addPublicKeyIdentity(pair);
+            session.auth().verify(5L, TimeUnit.SECONDS);
+        }
     }
 
     @Test
     public void testPasswordAuthNew() throws Exception {
         client.setUserAuthFactories(Arrays.<NamedFactory<UserAuth>>asList(new UserAuthPassword.UserAuthPasswordFactory()));
         client.start();
-        ClientSession session = client.connect("smx", "localhost", port).await().getSession();
-        session.addPasswordIdentity("smx");
-        session.auth().verify();
+        
+        try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+            session.addPasswordIdentity("smx");
+            session.auth().verify(5L, TimeUnit.SECONDS);
+        }
     }
 
     @Test
     public void testPasswordAuthNewWithFailureOnFirstIdentity() throws Exception {
         client.setUserAuthFactories(Arrays.<NamedFactory<UserAuth>>asList(new UserAuthPassword.UserAuthPasswordFactory()));
         client.start();
-        ClientSession session = client.connect("smx", "localhost", port).await().getSession();
-        session.addPasswordIdentity("bad");
-        session.addPasswordIdentity("smx");
-        session.auth().verify();
+        
+        try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+            session.addPasswordIdentity("bad");
+            session.addPasswordIdentity("smx");
+            session.auth().verify(5L, TimeUnit.SECONDS);
+        }
     }
 
     @Test
     public void testKeyboardInteractiveAuthNew() throws Exception {
         client.setUserAuthFactories(Arrays.<NamedFactory<UserAuth>>asList(UserAuthKeyboardInteractive.UserAuthKeyboardInteractiveFactory.INSTANCE));
         client.start();
-        ClientSession session = client.connect("smx", "localhost", port).await().getSession();
-        session.addPasswordIdentity("smx");
-        session.auth().verify();
+        
+        try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+            session.addPasswordIdentity("smx");
+            session.auth().verify(5L, TimeUnit.SECONDS);
+        }
     }
 
     @Test
     public void testKeyboardInteractiveAuthNewWithFailureOnFirstIdentity() throws Exception {
         client.setUserAuthFactories(Arrays.<NamedFactory<UserAuth>>asList(UserAuthKeyboardInteractive.UserAuthKeyboardInteractiveFactory.INSTANCE));
         client.start();
-        ClientSession session = client.connect("smx", "localhost", port).await().getSession();
-        session.addPasswordIdentity("bad");
-        session.addPasswordIdentity("smx");
-        session.auth().verify();
+        
+        try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+            session.addPasswordIdentity("bad");
+            session.addPasswordIdentity("smx");
+            session.auth().verify(5L, TimeUnit.SECONDS);
+        }
     }
 
     @Test
     public void testKeyboardInteractiveWithFailures() throws Exception {
         final AtomicInteger count = new AtomicInteger();
-        client.getProperties().put(ClientFactoryManager.PASSWORD_PROMPTS, "3");
+        final int MAX_PROMPTS = 3;
+        FactoryManagerUtils.updateProperty(client, ClientFactoryManager.PASSWORD_PROMPTS, MAX_PROMPTS);
+
         client.setUserAuthFactories(Arrays.<NamedFactory<UserAuth>>asList(new UserAuthKeyboardInteractive.UserAuthKeyboardInteractiveFactory()));
         client.setUserInteraction(new UserInteraction() {
             @Override
             public void welcome(String banner) {
+                // ignored
             }
             @Override
             public String[] interactive(String destination, String name, String instruction, String[] prompt, boolean[] echo) {
@@ -640,96 +724,111 @@ public class ClientTest extends BaseTest {
             }
         });
         client.start();
-        ClientSession session = client.connect("smx", "localhost", port).await().getSession();
-        AuthFuture future = session.auth();
-        future.await();
-        assertTrue(future.isFailure());
-        assertEquals(3, count.get());
+        
+        try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+            AuthFuture future = session.auth();
+            future.await();
+            assertTrue("Unexpected authentication success", future.isFailure());
+            assertEquals("Mismatched authentication retry count", MAX_PROMPTS, count.get());
+        }
     }
 
-
     @Test
     public void testKeyboardInteractiveInSessionUserInteractive() throws Exception {
         final AtomicInteger count = new AtomicInteger();
-        client.getProperties().put(ClientFactoryManager.PASSWORD_PROMPTS, "3");
+        final int MAX_PROMPTS = 3;
+        FactoryManagerUtils.updateProperty(client, ClientFactoryManager.PASSWORD_PROMPTS, MAX_PROMPTS);
+
         client.setUserAuthFactories(Arrays
                         .<NamedFactory<UserAuth>> asList(UserAuthKeyboardInteractive.UserAuthKeyboardInteractiveFactory.INSTANCE));
         client.start();
-        ClientSession session = client.connect("smx", "localhost", port).await().getSession();
-        session.setUserInteraction(new UserInteraction() {
-            @Override
-            public void welcome(String banner) {
-            }
 
-            @Override
-            public String[] interactive(String destination, String name, String instruction,
-                                        String[] prompt, boolean[] echo) {
-                count.incrementAndGet();
-                return new String[] { "smx" };
-            }
-        });
-        AuthFuture future = session.auth();
-        future.await();
-        assertTrue(future.isSuccess());
-        assertFalse(future.isFailure());
-        assertEquals(1, count.get());
+        try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+            session.setUserInteraction(new UserInteraction() {
+                    @Override
+                    public void welcome(String banner) {
+                        // ignored
+                    }
+        
+                    @Override
+                    public String[] interactive(String destination, String name, String instruction,
+                                                String[] prompt, boolean[] echo) {
+                        count.incrementAndGet();
+                        return new String[] { "smx" };
+                    }
+                });
+            AuthFuture future = session.auth();
+            future.await();
+            assertTrue("Authentication not marked as success", future.isSuccess());
+            assertFalse("Authentication marked as failure", future.isFailure());
+            assertEquals("Mismatched authentication attempts count", 1, count.get());
+        }
     }
 
     @Test
     public void testKeyboardInteractiveInSessionUserInteractiveFailure() throws Exception {
         final AtomicInteger count = new AtomicInteger();
-        client.getProperties().put(ClientFactoryManager.PASSWORD_PROMPTS, "3");
+        final int MAX_PROMPTS = 3;
+        FactoryManagerUtils.updateProperty(client, ClientFactoryManager.PASSWORD_PROMPTS, MAX_PROMPTS);
         client.setUserAuthFactories(Arrays
                         .<NamedFactory<UserAuth>> asList(new UserAuthKeyboardInteractive.UserAuthKeyboardInteractiveFactory()));
         client.start();
-        ClientSession session = client.connect("smx", "localhost", port).await().getSession();
-        session.setUserInteraction(new UserInteraction() {
-            @Override
-            public void welcome(String banner) {
-            }
-
-            @Override
-            public String[] interactive(String destination, String name, String instruction,
-                                        String[] prompt, boolean[] echo) {
-                count.incrementAndGet();
-                return new String[] { "bad" };
-            }
-        });
-        AuthFuture future = session.auth();
-        future.await();
-        assertTrue(future.isFailure());
-        assertEquals(3, count.get());
+        
+        try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+            session.setUserInteraction(new UserInteraction() {
+                @Override
+                public void welcome(String banner) {
+                    // ignored
+                }
+    
+                @Override
+                public String[] interactive(String destination, String name, String instruction,
+                                            String[] prompt, boolean[] echo) {
+                    count.incrementAndGet();
+                    return new String[] { "bad" };
+                }
+            });
+            AuthFuture future = session.auth();
+            future.await();
+            assertTrue("Authentication not, marked as failure", future.isFailure());
+            assertEquals("Mismatched authentication retry count", MAX_PROMPTS, count.get());
+        }
     }
 
     @Test
     public void testClientDisconnect() throws Exception {
         TestEchoShellFactory.TestEchoShell.latch = new CountDownLatch(1);
-        try
-        {
+        try {
             client.start();
-            ClientSession session = client.connect("smx", "localhost", port).await().getSession();
-            session.addPasswordIdentity("smx");
-            session.auth().verify();
-            ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
-            PipedOutputStream pipedIn = new PipedOutputStream();
-            channel.setIn(new PipedInputStream(pipedIn));
-            ByteArrayOutputStream out = new ByteArrayOutputStream();
-            ByteArrayOutputStream err = new ByteArrayOutputStream();
-            channel.setOut(out);
-            channel.setErr(err);
-            channel.open().await();
-
-//            ((AbstractSession) session).disconnect(SshConstants.SSH2_DISCONNECT_BY_APPLICATION, "Cancel");
-            AbstractSession cs = (AbstractSession) session;
-            Buffer buffer = cs.createBuffer(SshConstants.SSH_MSG_DISCONNECT);
-            buffer.putInt(SshConstants.SSH2_DISCONNECT_BY_APPLICATION);
-            buffer.putString("Cancel");
-            buffer.putString("");
-            IoWriteFuture f = cs.writePacket(buffer);
-            f.await();
-            suspend(cs.getIoSession());
-
-            TestEchoShellFactory.TestEchoShell.latch.await();
+            
+            try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+                session.addPasswordIdentity("smx");
+                session.auth().verify(5L, TimeUnit.SECONDS);
+                
+                try(ClientChannel channel = session.createChannel(ClientChannel.CHANNEL_SHELL);
+                    PipedOutputStream pipedIn = new PipedOutputStream();
+                    InputStream inPipe = new PipedInputStream(pipedIn); 
+                    ByteArrayOutputStream out = new ByteArrayOutputStream();
+                    ByteArrayOutputStream err = new ByteArrayOutputStream()) {
+
+                    channel.setIn(inPipe);
+                    channel.setOut(out);
+                    channel.setErr(err);
+                    channel.open().await();
+        
+        //            ((AbstractSession) session).disconnect(SshConstants.SSH2_DISCONNECT_BY_APPLICATION, "Cancel");
+                    AbstractSession cs = (AbstractSession) session;
+                    Buffer buffer = cs.createBuffer(SshConstants.SSH_MSG_DISCONNECT);
+                    buffer.putInt(SshConstants.SSH2_DISCONNECT_BY_APPLICATION);
+                    buffer.putString("Cancel");
+                    buffer.putString("");
+                    IoWriteFuture f = cs.writePacket(buffer);
+                    f.await();
+                    suspend(cs.getIoSession());
+    
+                    TestEchoShellFactory.TestEchoShell.latch.await();
+                }
+            }
         } finally {
             TestEchoShellFactory.TestEchoShell.latch = null;
         }
@@ -753,10 +852,13 @@ public class ClientTest extends BaseTest {
                 }
         );
         client.start();
-        ClientSession session = client.connect("smx", "localhost", port).await().getSession();
-        session.waitFor(ClientSession.WAIT_AUTH, 10000);
-        assertTrue(ok.get());
-        client.stop();
+        
+        try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+            session.waitFor(ClientSession.WAIT_AUTH, 10000);
+            assertTrue(ok.get());
+        } finally {
+            client.stop();
+        }
     }
 
     @Test
@@ -764,15 +866,18 @@ public class ClientTest extends BaseTest {
         sshd.getCipherFactories().add(BuiltinCiphers.none);
         client.getCipherFactories().add(BuiltinCiphers.none);
         client.start();
-        ClientSession session = client.connect("smx", "localhost", port).await().getSession();
-        session.addPasswordIdentity("smx");
-        session.auth().verify();
-        session.switchToNoneCipher().await();
-
-        ClientChannel channel = session.createSubsystemChannel("sftp");
-        channel.open().verify();
-
-        client.stop();
+        
+        try(ClientSession session = client.connect("smx", "localhost", port).await().getSession()) {
+            session.addPasswordIdentity("smx");
+            session.auth().verify(5L, TimeUnit.SECONDS);
+            session.switchToNoneCipher().await();
+    
+            try(ClientChannel channel = session.createSubsystemChannel("sftp")) {
+                channel.open().verify(5L, TimeUnit.SECONDS);
+            }
+        } finally {
+            client.stop();
+        }
     }
 
     private void suspend(IoSession ioSession) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/dafaa622/sshd-core/src/test/java/org/apache/sshd/ServerTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/ServerTest.java b/sshd-core/src/test/java/org/apache/sshd/ServerTest.java
index 9b52296..9190800 100644
--- a/sshd-core/src/test/java/org/apache/sshd/ServerTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/ServerTest.java
@@ -208,7 +208,8 @@ public class ServerTest extends BaseTest {
         client.start();
         try(ClientSession s = client.connect("test", "localhost", port).await().getSession()) {
             s.addPasswordIdentity("test");
-            s.auth().verify();
+            s.auth().verify(5L, TimeUnit.SECONDS);
+
             try(ChannelShell shell = s.createShellChannel();
                 ByteArrayOutputStream out = new ByteArrayOutputStream();
                 ByteArrayOutputStream err = new ByteArrayOutputStream()) {
@@ -260,7 +261,8 @@ public class ServerTest extends BaseTest {
 
         try(ClientSession s = client.connect("test", "localhost", port).await().getSession()) {
             s.addPasswordIdentity("test");
-            s.auth().verify();
+            s.auth().verify(5L, TimeUnit.SECONDS);
+
             try(ChannelExec shell = s.createExecChannel("normal");
                 // Create a pipe that will block reading when the buffer is full
                 PipedInputStream pis = new PipedInputStream();
@@ -355,7 +357,7 @@ public class ServerTest extends BaseTest {
 
         try(ClientSession s = client.connect("test", "localhost", port).await().getSession()) {
             s.addPasswordIdentity("test");
-            s.auth().verify();
+            s.auth().verify(5L, TimeUnit.SECONDS);
             Assert.assertEquals("Mismatched client events count", 1, clientEventCount.get());
             Assert.assertEquals("Mismatched server events count", 1, serverEventCount.get());
             s.close(false);
@@ -411,7 +413,7 @@ public class ServerTest extends BaseTest {
             try {
                 try(ClientSession s = client.connect("test", "localhost", port).await().getSession()) {
                     s.addPasswordIdentity("test");
-                    s.auth().verify();
+                    s.auth().verify(5L, TimeUnit.SECONDS);
                 }
                 
                 synchronized(eventsMap) {