You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2014/02/07 17:18:19 UTC

[2/2] git commit: Fix additional synchronisation issues

Fix additional synchronisation issues 

Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/e737ae3f
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/e737ae3f
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/e737ae3f

Branch: refs/heads/master
Commit: e737ae3fc1f72aaa2bf4959364f449424e006093
Parents: 18bcdf7
Author: Guillaume Nodet <gn...@apache.org>
Authored: Fri Feb 7 14:09:36 2014 +0100
Committer: Guillaume Nodet <gn...@apache.org>
Committed: Fri Feb 7 17:18:09 2014 +0100

----------------------------------------------------------------------
 .../sshd/common/channel/AbstractChannel.java    | 14 ++--
 .../apache/sshd/common/io/nio2/Nio2Service.java |  2 +-
 .../apache/sshd/common/io/nio2/Nio2Session.java | 83 ++++++++++++--------
 .../sshd/common/session/AbstractSession.java    |  2 +-
 .../org/apache/sshd/AuthenticationTest.java     |  1 +
 .../test/java/org/apache/sshd/ClientTest.java   | 55 ++++++++++++-
 6 files changed, 116 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e737ae3f/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index e0585b5..e275f06 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -157,11 +157,15 @@ public abstract class AbstractChannel extends CloseableUtils.AbstractInnerClosea
                     try {
                         session.writePacket(buffer).addListener(new SshFutureListener<IoWriteFuture>() {
                             public void operationComplete(IoWriteFuture future) {
-                                log.debug("Message SSH_MSG_CHANNEL_CLOSE written on channel {}", AbstractChannel.this);
-                                if (gracefulState.compareAndSet(0, CLOSE_SENT)) {
-                                    // Waiting for CLOSE message to come back from the remote side
-                                } else if (gracefulState.compareAndSet(CLOSE_RECV, CLOSE_SENT | CLOSE_RECV)) {
-                                    gracefulFuture.setValue(true);
+                                if (future.isWritten()) {
+                                    log.debug("Message SSH_MSG_CHANNEL_CLOSE written on channel {}", AbstractChannel.this);
+                                    if (gracefulState.compareAndSet(0, CLOSE_SENT)) {
+                                        // Waiting for CLOSE message to come back from the remote side
+                                    } else if (gracefulState.compareAndSet(CLOSE_RECV, CLOSE_SENT | CLOSE_RECV)) {
+                                        gracefulFuture.setValue(true);
+                                    }
+                                } else {
+                                    close(true);
                                 }
                             }
                         });

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e737ae3f/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
index 93cb1a5..31dc957 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java
@@ -82,7 +82,7 @@ public abstract class Nio2Service implements IoService {
         try {
             close(true).await();
         } catch (InterruptedException e) {
-            e.printStackTrace();
+            logger.debug("Exception caught while closing", e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e737ae3f/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
index d28cb16..73ad803 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
@@ -28,9 +28,11 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.sshd.common.RuntimeSshException;
 import org.apache.sshd.common.future.DefaultSshFuture;
 import org.apache.sshd.common.future.SshFuture;
 import org.apache.sshd.common.io.IoHandler;
@@ -131,38 +133,6 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
         }
     }
 
-    private void startWriting() {
-        final DefaultIoWriteFuture future = writes.peek();
-        if (future != null) {
-            if (currentWrite.compareAndSet(null, future)) {
-                socket.write(future.buffer, null, new CompletionHandler<Integer, Object>() {
-                    public void completed(Integer result, Object attachment) {
-                        if (future.buffer.hasRemaining()) {
-                            socket.write(future.buffer, null, this);
-                        } else {
-                            log.debug("Finished writing");
-                            future.setWritten();
-                            finishWrite();
-                        }
-                    }
-                    public void failed(Throwable exc, Object attachment) {
-                        future.setException(exc);
-                        exceptionCaught(exc);
-                        finishWrite();
-                    }
-                    private void finishWrite() {
-                        synchronized (writes) {
-                            writes.remove(future);
-                            writes.notifyAll();
-                        }
-                        currentWrite.compareAndSet(future, null);
-                        startWriting();
-                    }
-                });
-            }
-        }
-    }
-
     @Override
     protected SshFuture doCloseGracefully() {
         synchronized (writes) {
@@ -218,7 +188,11 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
                             }
                         };
                         handler.messageReceived(Nio2Session.this, buf);
-                        startReading();
+                        if (!closeFuture.isClosed()) {
+                            startReading();
+                        } else {
+                            log.debug("IoSession has been closed, stop reading");
+                        }
                     } else {
                         log.debug("Socket has been disconnected, closing IoSession now");
                         Nio2Session.this.close(true);
@@ -233,6 +207,49 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
         });
     }
 
+    private void startWriting() {
+        final DefaultIoWriteFuture future = writes.peek();
+        if (future != null) {
+            if (currentWrite.compareAndSet(null, future)) {
+                try {
+                    socket.write(future.buffer, null, new CompletionHandler<Integer, Object>() {
+                        public void completed(Integer result, Object attachment) {
+                            if (future.buffer.hasRemaining()) {
+                                try {
+                                    socket.write(future.buffer, null, this);
+                                } catch (Throwable t) {
+                                    log.debug("Exception caught while writing", t);
+                                    future.setWritten();
+                                    finishWrite();
+                                }
+                            } else {
+                                log.debug("Finished writing");
+                                future.setWritten();
+                                finishWrite();
+                            }
+                        }
+                        public void failed(Throwable exc, Object attachment) {
+                            future.setException(exc);
+                            exceptionCaught(exc);
+                            finishWrite();
+                        }
+                        private void finishWrite() {
+                            synchronized (writes) {
+                                writes.remove(future);
+                                writes.notifyAll();
+                            }
+                            currentWrite.compareAndSet(future, null);
+                            startWriting();
+                        }
+                    });
+                } catch (RuntimeException e) {
+                    future.setWritten();
+                    throw e;
+                }
+            }
+        }
+    }
+
     static class DefaultIoWriteFuture extends DefaultSshFuture<IoWriteFuture> implements IoWriteFuture {
         private final ByteBuffer buffer;
         DefaultIoWriteFuture(Object lock, ByteBuffer buffer) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e737ae3f/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
index b2b999e..6ffeb8c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
@@ -409,7 +409,7 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
     public void exceptionCaught(Throwable t) {
         // Ignore exceptions that happen while closing
         synchronized (lock) {
-            if (state.get() == OPENED) {
+            if (state.get() != OPENED) {
                 return;
             }
         }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e737ae3f/sshd-core/src/test/java/org/apache/sshd/AuthenticationTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/AuthenticationTest.java b/sshd-core/src/test/java/org/apache/sshd/AuthenticationTest.java
index 09d06b7..0d3c35b 100644
--- a/sshd-core/src/test/java/org/apache/sshd/AuthenticationTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/AuthenticationTest.java
@@ -85,6 +85,7 @@ public class AuthenticationTest extends BaseTest {
         assertFalse(s.authPassword("user2", "the-password").await().isSuccess());
 
         assertEquals(ClientSession.CLOSED, s.waitFor(ClientSession.CLOSED, 1000));
+        client.stop();
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e737ae3f/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/ClientTest.java b/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
index bac4130..455a3dc 100644
--- a/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
@@ -20,18 +20,26 @@ package org.apache.sshd;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
 import java.security.KeyPair;
+import java.util.Arrays;
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.sshd.client.channel.ChannelExec;
 import org.apache.sshd.client.future.AuthFuture;
 import org.apache.sshd.client.future.OpenFuture;
+import org.apache.sshd.common.Channel;
 import org.apache.sshd.common.KeyPairProvider;
+import org.apache.sshd.common.NamedFactory;
+import org.apache.sshd.common.RuntimeSshException;
+import org.apache.sshd.common.Service;
+import org.apache.sshd.common.Session;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.forward.TcpipServerChannel;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.io.IoWriteFuture;
@@ -43,7 +51,10 @@ import org.apache.sshd.common.util.BufferUtils;
 import org.apache.sshd.common.util.NoCloseOutputStream;
 import org.apache.sshd.server.Command;
 import org.apache.sshd.server.CommandFactory;
+import org.apache.sshd.server.channel.ChannelSession;
 import org.apache.sshd.server.command.UnknownCommand;
+import org.apache.sshd.server.session.ServerConnectionService;
+import org.apache.sshd.server.session.ServerUserAuthService;
 import org.apache.sshd.util.BaseTest;
 import org.apache.sshd.util.BogusPasswordAuthenticator;
 import org.apache.sshd.util.BogusPublickeyAuthenticator;
@@ -56,6 +67,7 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -68,10 +80,14 @@ public class ClientTest extends BaseTest {
 
     private SshServer sshd;
     private int port;
+    private CountDownLatch authLatch;
+    private CountDownLatch channelLatch;
 
     @Before
     public void setUp() throws Exception {
         port = Utils.getFreePort();
+        authLatch = new CountDownLatch(0);
+        channelLatch = new CountDownLatch(0);
 
         sshd = SshServer.setUpDefaultServer();
         sshd.setPort(port);
@@ -84,6 +100,39 @@ public class ClientTest extends BaseTest {
         });
         sshd.setPasswordAuthenticator(new BogusPasswordAuthenticator());
         sshd.setPublickeyAuthenticator(new BogusPublickeyAuthenticator());
+        sshd.setServiceFactories(Arrays.asList(
+                new ServerUserAuthService.Factory() {
+                    @Override
+                    public Service create(Session session) throws IOException {
+                        return new ServerUserAuthService(session) {
+                            @Override
+                            public void process(byte cmd, Buffer buffer) throws Exception {
+                                authLatch.await();
+                                super.process(cmd, buffer);
+                            }
+                        };
+                    }
+                },
+                new ServerConnectionService.Factory()
+        ));
+        sshd.setChannelFactories(Arrays.<NamedFactory<Channel>>asList(
+                new ChannelSession.Factory() {
+                    @Override
+                    public Channel create() {
+                        return new ChannelSession() {
+                            @Override
+                            public OpenFuture open(int recipient, int rwsize, int rmpsize, Buffer buffer) {
+                                try {
+                                    channelLatch.await();
+                                } catch (InterruptedException e) {
+                                    throw new RuntimeSshException(e);
+                                }
+                                return super.open(recipient, rwsize, rmpsize, buffer);
+                            }
+                        };
+                    }
+                },
+                new TcpipServerChannel.DirectTcpipFactory()));
         sshd.start();
     }
 
@@ -307,11 +356,13 @@ public class ClientTest extends BaseTest {
 
     @Test
     public void testCloseBeforeAuthSucceed() throws Exception {
+        authLatch = new CountDownLatch(1);
         SshClient client = SshClient.setUpDefaultClient();
         client.start();
         ClientSession session = client.connect("localhost", port).await().getSession();
         AuthFuture authFuture = session.authPassword("smx", "smx");
         CloseFuture closeFuture = session.close(false);
+        authLatch.countDown();
         authFuture.await();
         closeFuture.await();
         assertNotNull(authFuture.getException());
@@ -332,12 +383,13 @@ public class ClientTest extends BaseTest {
         CloseFuture closeFuture = session.close(false);
         openFuture.await();
         closeFuture.await();
-        assertNotNull(openFuture.isOpened());
+        assertTrue(openFuture.isOpened());
         assertTrue(closeFuture.isClosed());
     }
 
     @Test
     public void testCloseImmediateBeforeChannelOpened() throws Exception {
+        channelLatch = new CountDownLatch(1);
         SshClient client = SshClient.setUpDefaultClient();
         client.start();
         ClientSession session = client.connect("localhost", port).await().getSession();
@@ -348,6 +400,7 @@ public class ClientTest extends BaseTest {
         channel.setErr(new ByteArrayOutputStream());
         OpenFuture openFuture = channel.open();
         CloseFuture closeFuture = session.close(true);
+        channelLatch.countDown();
         openFuture.await();
         closeFuture.await();
         assertNotNull(openFuture.getException());