You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2013/07/23 12:09:42 UTC

[6/6] git commit: [SSHD-218] Possible deadlock in client when getting SSH_MSG_DISCONNECT on invalid channel

[SSHD-218] Possible deadlock in client when getting SSH_MSG_DISCONNECT on invalid channel

Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/8bfdf38c
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/8bfdf38c
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/8bfdf38c

Branch: refs/heads/master
Commit: 8bfdf38cd74ff669fcdb8ca78bd34e77decbc98e
Parents: 4e75e8f
Author: Guillaume Nodet <gn...@apache.org>
Authored: Tue Jul 23 12:09:26 2013 +0200
Committer: Guillaume Nodet <gn...@apache.org>
Committed: Tue Jul 23 12:09:26 2013 +0200

----------------------------------------------------------------------
 .../client/channel/AbstractClientChannel.java   | 96 +++++++++-----------
 .../sshd/client/channel/ChannelSession.java     |  2 +-
 .../sshd/common/channel/AbstractChannel.java    | 86 +++++++++---------
 .../sshd/common/forward/TcpipServerChannel.java | 20 ++--
 .../sshd/server/channel/ChannelSession.java     | 38 ++++----
 .../sshd/server/command/UnknownCommand.java     | 24 +++--
 .../test/java/org/apache/sshd/ClientTest.java   | 35 +++++++
 7 files changed, 167 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8bfdf38c/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
index eb2ea17..8d04dba 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
@@ -40,7 +40,7 @@ import org.apache.sshd.common.util.IoUtils;
  */
 public abstract class AbstractClientChannel extends AbstractChannel implements ClientChannel {
 
-    protected boolean opened;
+    protected volatile boolean opened;
     protected final String type;
     protected InputStream in;
     protected OutputStream invertedIn;
@@ -96,29 +96,27 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
 
     @Override
     public CloseFuture close(final boolean immediately) {
-        synchronized (lock) {
-            if (!closeFuture.isDone()) {
-                if (opened) {
+        if (!closeFuture.isDone()) {
+            if (opened) {
+                super.close(immediately);
+            } else if (openFuture != null) {
+                if (immediately) {
+                    openFuture.setException(new SshException("Channel closed"));
                     super.close(immediately);
-                } else if (openFuture != null) {
-                    if (immediately) {
-                        openFuture.setException(new SshException("Channel closed"));
-                        super.close(immediately);
-                    } else {
-                        openFuture.addListener(new SshFutureListener<OpenFuture>() {
-                            public void operationComplete(OpenFuture future) {
-                                if (future.isOpened()) {
-                                    close(immediately);
-                                } else {
-                                    close(true);
-                                }
-                            }
-                        });
-                    }
                 } else {
-                    closeFuture.setClosed();
-                    lock.notifyAll();
+                    openFuture.addListener(new SshFutureListener<OpenFuture>() {
+                        public void operationComplete(OpenFuture future) {
+                            if (future.isOpened()) {
+                                close(immediately);
+                            } else {
+                                close(true);
+                            }
+                        }
+                    });
                 }
+            } else {
+                closeFuture.setClosed();
+                notifyStateChanged();
             }
         }
         return closeFuture;
@@ -200,20 +198,18 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
     }
 
     public void handleOpenSuccess(int recipient, int rwsize, int rmpsize, Buffer buffer) {
-        synchronized (lock) {
-            this.recipient = recipient;
-            this.remoteWindow.init(rwsize, rmpsize);
-            try {
-                doOpen();
-                this.opened = true;
-                this.openFuture.setOpened();
-            } catch (Exception e) {
-                this.openFuture.setException(e);
-                this.closeFuture.setClosed();
-                this.doClose();
-            } finally {
-                lock.notifyAll();
-            }
+        this.recipient = recipient;
+        this.remoteWindow.init(rwsize, rmpsize);
+        try {
+            doOpen();
+            this.opened = true;
+            this.openFuture.setOpened();
+        } catch (Exception e) {
+            this.openFuture.setException(e);
+            this.closeFuture.setClosed();
+            this.doClose();
+        } finally {
+            notifyStateChanged();
         }
     }
 
@@ -222,20 +218,20 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
     public void handleOpenFailure(Buffer buffer) {
         int reason = buffer.getInt();
         String msg = buffer.getString();
-        synchronized (lock) {
-            this.openFailureReason = reason;
-            this.openFailureMsg = msg;
-            this.openFuture.setException(new SshException(msg));
-            this.closeFuture.setClosed();
-            this.doClose();
-            lock.notifyAll();
-        }
+        this.openFailureReason = reason;
+        this.openFailureMsg = msg;
+        this.openFuture.setException(new SshException(msg));
+        this.closeFuture.setClosed();
+        this.doClose();
+        notifyStateChanged();
     }
 
     protected void doWriteData(byte[] data, int off, int len) throws IOException {
         if (out != null) {
             out.write(data, off, len);
             out.flush();
+        } else {
+            throw new IllegalStateException("No output stream for channel");
         }
         localWindow.consumeAndCheck(len);
     }
@@ -244,6 +240,8 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
         if (err != null) {
             err.write(data, off, len);
             err.flush();
+        } else {
+            throw new IllegalStateException("No error stream for channel");
         }
         localWindow.consumeAndCheck(len);
     }
@@ -253,16 +251,12 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
         String req = buffer.getString();
         if ("exit-status".equals(req)) {
             buffer.getBoolean();
-            synchronized (lock) {
-                exitStatus = buffer.getInt();
-                lock.notifyAll();
-            }
+            exitStatus = buffer.getInt();
+            notifyStateChanged();
         } else if ("exit-signal".equals(req)) {
             buffer.getBoolean();
-            synchronized (lock) {
-                exitSignal = buffer.getString();
-                lock.notifyAll();
-            }
+            exitSignal = buffer.getString();
+            notifyStateChanged();
         }
         // TODO: handle other channel requests
     }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8bfdf38c/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
index ee48e0d..3166666 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
@@ -98,7 +98,7 @@ public class ChannelSession extends AbstractClientChannel {
                 }
             }
         } catch (Exception e) {
-            if (!closing) {
+            if (!closing.get()) {
                 log.info("Caught exception", e);
                 close(false);
             }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8bfdf38c/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index ff2a306..ae2a9e1 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -19,7 +19,7 @@
 package org.apache.sshd.common.channel;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.mina.core.future.IoFutureListener;
 import org.apache.mina.core.future.WriteFuture;
@@ -51,9 +51,9 @@ public abstract class AbstractChannel implements Channel {
     protected Session session;
     protected int id;
     protected int recipient;
-    protected boolean eof;
     protected final CloseFuture closeFuture = new DefaultCloseFuture(lock);
-    protected boolean closing;
+    protected volatile boolean eof;
+    protected final AtomicBoolean closing = new AtomicBoolean();
     protected boolean closedByOtherSide;
 
     public int getId() {
@@ -86,56 +86,56 @@ public abstract class AbstractChannel implements Channel {
         configureWindow();
     }
 
+    protected void notifyStateChanged() {
+        synchronized (lock) {
+            lock.notifyAll();
+        }
+    }
+
     public CloseFuture close(boolean immediately) {
         if (closeFuture.isClosed()) {
             return closeFuture;
         }
-        try {
-            synchronized (lock) {
+        if (closing.compareAndSet(false, true)) {
+            try {
                 if (immediately) {
                     log.debug("Closing channel {} immediately", id);
+                    doClose();
                     closeFuture.setClosed();
-                    lock.notifyAll();
+                    notifyStateChanged();
                     session.unregisterChannel(this);
                 } else {
-                    if (!closing) {
-                        closing = true;
-                        log.debug("Send SSH_MSG_CHANNEL_CLOSE on channel {}", id);
-                        Buffer buffer = session.createBuffer(SshConstants.Message.SSH_MSG_CHANNEL_CLOSE, 0);
-                        buffer.putInt(recipient);
-                        session.writePacket(buffer).addListener(new IoFutureListener<WriteFuture>() {
-                            public void operationComplete(WriteFuture future) {
-                                synchronized (lock) {
-                                    if (closedByOtherSide) {
-                                        log.debug("Message SSH_MSG_CHANNEL_CLOSE written on channel {}", id);
-                                        closeFuture.setClosed();
-                                        doClose();
-                                        lock.notifyAll();
-                                    }
-                                }
+                    log.debug("Closing channel {} gracefully", id);
+                    doClose();
+                    log.debug("Send SSH_MSG_CHANNEL_CLOSE on channel {}", id);
+                    Buffer buffer = session.createBuffer(SshConstants.Message.SSH_MSG_CHANNEL_CLOSE, 0);
+                    buffer.putInt(recipient);
+                    session.writePacket(buffer).addListener(new IoFutureListener<WriteFuture>() {
+                        public void operationComplete(WriteFuture future) {
+                            if (closedByOtherSide) {
+                                log.debug("Message SSH_MSG_CHANNEL_CLOSE written on channel {}", id);
+                                closeFuture.setClosed();
+                                notifyStateChanged();
                             }
-                        });
-                    }
+                        }
+                    });
                 }
+            } catch (IOException e) {
+                session.exceptionCaught(e);
+                closeFuture.setClosed();
             }
-        } catch (IOException e) {
-            session.exceptionCaught(e);
-            closeFuture.setClosed();
         }
         return closeFuture;
     }
 
     public void handleClose() throws IOException {
         log.debug("Received SSH_MSG_CHANNEL_CLOSE on channel {}", id);
-        synchronized (lock) {
-            closedByOtherSide = !closing;
-            if (closedByOtherSide) {
-                close(false);
-            } else {
-                close(false).setClosed();
-                doClose();
-                lock.notifyAll();
-            }
+        closedByOtherSide = !closing.get();
+        if (closedByOtherSide) {
+            close(false);
+        } else {
+            close(false).setClosed();
+            notifyStateChanged();
         }
     }
 
@@ -143,12 +143,10 @@ public abstract class AbstractChannel implements Channel {
     }
 
     protected void writePacket(Buffer buffer) throws IOException {
-        synchronized (lock) {
-            if (!closing) {
-                session.writePacket(buffer);
-            } else {
-                log.debug("Discarding output packet because channel is being closed");
-            }
+        if (!closing.get()) {
+            session.writePacket(buffer);
+        } else {
+            log.debug("Discarding output packet because channel is being closed");
         }
     }
 
@@ -187,10 +185,8 @@ public abstract class AbstractChannel implements Channel {
 
     public void handleEof() throws IOException {
         log.debug("Received SSH_MSG_CHANNEL_EOF on channel {}", id);
-        synchronized (lock) {
-            eof = true;
-            lock.notifyAll();
-        }
+        eof = true;
+        notifyStateChanged();
     }
 
     public void handleWindowAdjust(Buffer buffer) throws IOException {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8bfdf38c/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
index 4c9fa28..5bd314b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
@@ -117,17 +117,15 @@ public class TcpipServerChannel extends AbstractServerChannel {
         IoHandler handler = new IoHandlerAdapter() {
             @Override
             public void messageReceived(IoSession session, Object message) throws Exception {
-                synchronized (lock) {
-                    if (closing) {
-                        log.debug("Ignoring write to channel {} in CLOSING state", id);
-                    } else {
-                        IoBuffer ioBuffer = (IoBuffer) message;
-                        int r = ioBuffer.remaining();
-                        byte[] b = new byte[r];
-                        ioBuffer.get(b, 0, r);
-                        out.write(b, 0, r);
-                        out.flush();
-                    }
+                if (closing.get()) {
+                    log.debug("Ignoring write to channel {} in CLOSING state", id);
+                } else {
+                    IoBuffer ioBuffer = (IoBuffer) message;
+                    int r = ioBuffer.remaining();
+                    byte[] b = new byte[r];
+                    ioBuffer.get(b, 0, r);
+                    out.write(b, 0, r);
+                    out.flush();
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8bfdf38c/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
index a357305..5388d8a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
@@ -175,23 +175,21 @@ public class ChannelSession extends AbstractServerChannel {
     public ChannelSession() {
     }
 
-    public CloseFuture close(boolean immediately) {
-        return super.close(immediately).addListener(new SshFutureListener() {
-            public void operationComplete(SshFuture sshFuture) {
-                if (command != null) {
-                    command.destroy();
-                    command = null;
-                }
-                remoteWindow.notifyClosed();
-                IoUtils.closeQuietly(out, err, receiver);
-            }
-        });
+    @Override
+    protected void doClose() {
+        if (command != null) {
+            command.destroy();
+            command = null;
+        }
+        remoteWindow.notifyClosed();
+        IoUtils.closeQuietly(out, err, receiver);
+        super.doClose();
     }
 
     @Override
     public void handleEof() throws IOException {
         super.handleEof();
-        receiver.close();
+        IoUtils.closeQuietly(receiver);
     }
 
     public void handleRequest(Buffer buffer) throws IOException {
@@ -206,6 +204,10 @@ public class ChannelSession extends AbstractServerChannel {
     }
 
     protected void doWriteData(byte[] data, int off, int len) throws IOException {
+        // If we're already closing, ignore incoming data
+        if (closing.get()) {
+            return;
+        }
         if (receiver != null) {
             int r = receiver.data(this, data, off, len);
             if (r > 0) {
@@ -563,13 +565,11 @@ public class ChannelSession extends AbstractServerChannel {
     }
 
     protected void closeShell(int exitValue) throws IOException {
-        synchronized (lock) {
-            if (!closing) {
-                sendEof();
-                sendExitStatus(exitValue);
-                // TODO: We should wait for all streams to be consumed before closing the channel
-                close(false);
-            }
+        if (!closing.get()) {
+            sendEof();
+            sendExitStatus(exitValue);
+            // TODO: We should wait for all streams to be consumed before closing the channel
+            close(false);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8bfdf38c/sshd-core/src/main/java/org/apache/sshd/server/command/UnknownCommand.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/command/UnknownCommand.java b/sshd-core/src/main/java/org/apache/sshd/server/command/UnknownCommand.java
index 7bcc1c3..95c0fda 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/command/UnknownCommand.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/command/UnknownCommand.java
@@ -35,31 +35,41 @@ import org.apache.sshd.server.ExitCallback;
  */
 public class UnknownCommand implements Command {
 
+    private String command;
+    private InputStream in;
+    private OutputStream out;
+    private OutputStream err;
+    private ExitCallback callback;
+
     public UnknownCommand(String command) {
+        this.command = command;
     }
 
     public void setInputStream(InputStream in) {
-        //To change body of implemented methods use File | Settings | File Templates.
+        this.in = in;
     }
 
     public void setOutputStream(OutputStream out) {
-        //To change body of implemented methods use File | Settings | File Templates.
+        this.out = out;
     }
 
     public void setErrorStream(OutputStream err) {
-        //To change body of implemented methods use File | Settings | File Templates.
+        this.err = err;
     }
 
     public void setExitCallback(ExitCallback callback) {
-        //To change body of implemented methods use File | Settings | File Templates.
+        this.callback = callback;
     }
 
     public void start(Environment env) throws IOException {
-        //To change body of implemented methods use File | Settings | File Templates.
-        // TODO: send back an error ?
+        err.write(("Unknown command: " + command + "\n").getBytes());
+        err.flush();
+        if (callback != null) {
+            callback.onExit(1, "Unknown command: " + command);
+        }
     }
 
     public void destroy() {
-        //To change body of implemented methods use File | Settings | File Templates.
     }
+
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8bfdf38c/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/ClientTest.java b/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
index 8bf93b4..5db1059 100644
--- a/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
@@ -28,6 +28,7 @@ import java.security.KeyPair;
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.mina.core.future.WriteFuture;
+import org.apache.sshd.client.channel.ChannelExec;
 import org.apache.sshd.client.future.AuthFuture;
 import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.KeyPairProvider;
@@ -37,14 +38,19 @@ import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.session.AbstractSession;
 import org.apache.sshd.common.util.Buffer;
 import org.apache.sshd.common.util.BufferUtils;
+import org.apache.sshd.common.util.NoCloseOutputStream;
 import org.apache.sshd.server.Command;
+import org.apache.sshd.server.CommandFactory;
+import org.apache.sshd.server.command.UnknownCommand;
 import org.apache.sshd.util.*;
 import org.junit.After;
 
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 /**
@@ -67,6 +73,11 @@ public class ClientTest {
         sshd.setPort(port);
         sshd.setKeyPairProvider(Utils.createTestHostKeyProvider());
         sshd.setShellFactory(new TestEchoShellFactory());
+        sshd.setCommandFactory(new CommandFactory() {
+            public Command createCommand(String command) {
+                return new UnknownCommand(command);
+            }
+        });
         sshd.setPasswordAuthenticator(new BogusPasswordAuthenticator());
         sshd.setPublickeyAuthenticator(new BogusPublickeyAuthenticator());
         sshd.start();
@@ -81,6 +92,30 @@ public class ClientTest {
     }
 
     @Test
+    public void testCommandDeadlock() throws Exception {
+        SshClient client = SshClient.setUpDefaultClient();
+        client.start();
+        ClientSession session = client.connect("localhost", port).await().getSession();
+        session.authPassword("smx", "smx").await().isSuccess();
+        ChannelExec channel = session.createExecChannel("test");
+        channel.setOut(new NoCloseOutputStream(System.out));
+        channel.setErr(new NoCloseOutputStream(System.err));
+        channel.open().await();
+        Thread.sleep(100);
+        try {
+            for (int i = 0; i < 100; i++) {
+                channel.getInvertedIn().write("a".getBytes());
+                channel.getInvertedIn().flush();
+            }
+        } catch (SshException e) {
+            // That's ok, the channel is being closed by the other side
+        }
+        assertEquals(ChannelExec.CLOSED, channel.waitFor(ChannelExec.CLOSED, 0) & ChannelExec.CLOSED);
+        session.close(false).await();
+        client.stop();
+    }
+
+    @Test
     public void testClient() throws Exception {
         SshClient client = SshClient.setUpDefaultClient();
         client.start();