You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2013/07/23 12:09:42 UTC
[6/6] git commit: [SSHD-218] Possible deadlock in client when getting
SSH_MSG_DISCONNECT on invalid channel
[SSHD-218] Possible deadlock in client when getting SSH_MSG_DISCONNECT on invalid channel
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/8bfdf38c
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/8bfdf38c
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/8bfdf38c
Branch: refs/heads/master
Commit: 8bfdf38cd74ff669fcdb8ca78bd34e77decbc98e
Parents: 4e75e8f
Author: Guillaume Nodet <gn...@apache.org>
Authored: Tue Jul 23 12:09:26 2013 +0200
Committer: Guillaume Nodet <gn...@apache.org>
Committed: Tue Jul 23 12:09:26 2013 +0200
----------------------------------------------------------------------
.../client/channel/AbstractClientChannel.java | 96 +++++++++-----------
.../sshd/client/channel/ChannelSession.java | 2 +-
.../sshd/common/channel/AbstractChannel.java | 86 +++++++++---------
.../sshd/common/forward/TcpipServerChannel.java | 20 ++--
.../sshd/server/channel/ChannelSession.java | 38 ++++----
.../sshd/server/command/UnknownCommand.java | 24 +++--
.../test/java/org/apache/sshd/ClientTest.java | 35 +++++++
7 files changed, 167 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8bfdf38c/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
index eb2ea17..8d04dba 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
@@ -40,7 +40,7 @@ import org.apache.sshd.common.util.IoUtils;
*/
public abstract class AbstractClientChannel extends AbstractChannel implements ClientChannel {
- protected boolean opened;
+ protected volatile boolean opened;
protected final String type;
protected InputStream in;
protected OutputStream invertedIn;
@@ -96,29 +96,27 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
@Override
public CloseFuture close(final boolean immediately) {
- synchronized (lock) {
- if (!closeFuture.isDone()) {
- if (opened) {
+ if (!closeFuture.isDone()) {
+ if (opened) {
+ super.close(immediately);
+ } else if (openFuture != null) {
+ if (immediately) {
+ openFuture.setException(new SshException("Channel closed"));
super.close(immediately);
- } else if (openFuture != null) {
- if (immediately) {
- openFuture.setException(new SshException("Channel closed"));
- super.close(immediately);
- } else {
- openFuture.addListener(new SshFutureListener<OpenFuture>() {
- public void operationComplete(OpenFuture future) {
- if (future.isOpened()) {
- close(immediately);
- } else {
- close(true);
- }
- }
- });
- }
} else {
- closeFuture.setClosed();
- lock.notifyAll();
+ openFuture.addListener(new SshFutureListener<OpenFuture>() {
+ public void operationComplete(OpenFuture future) {
+ if (future.isOpened()) {
+ close(immediately);
+ } else {
+ close(true);
+ }
+ }
+ });
}
+ } else {
+ closeFuture.setClosed();
+ notifyStateChanged();
}
}
return closeFuture;
@@ -200,20 +198,18 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
}
public void handleOpenSuccess(int recipient, int rwsize, int rmpsize, Buffer buffer) {
- synchronized (lock) {
- this.recipient = recipient;
- this.remoteWindow.init(rwsize, rmpsize);
- try {
- doOpen();
- this.opened = true;
- this.openFuture.setOpened();
- } catch (Exception e) {
- this.openFuture.setException(e);
- this.closeFuture.setClosed();
- this.doClose();
- } finally {
- lock.notifyAll();
- }
+ this.recipient = recipient;
+ this.remoteWindow.init(rwsize, rmpsize);
+ try {
+ doOpen();
+ this.opened = true;
+ this.openFuture.setOpened();
+ } catch (Exception e) {
+ this.openFuture.setException(e);
+ this.closeFuture.setClosed();
+ this.doClose();
+ } finally {
+ notifyStateChanged();
}
}
@@ -222,20 +218,20 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
public void handleOpenFailure(Buffer buffer) {
int reason = buffer.getInt();
String msg = buffer.getString();
- synchronized (lock) {
- this.openFailureReason = reason;
- this.openFailureMsg = msg;
- this.openFuture.setException(new SshException(msg));
- this.closeFuture.setClosed();
- this.doClose();
- lock.notifyAll();
- }
+ this.openFailureReason = reason;
+ this.openFailureMsg = msg;
+ this.openFuture.setException(new SshException(msg));
+ this.closeFuture.setClosed();
+ this.doClose();
+ notifyStateChanged();
}
protected void doWriteData(byte[] data, int off, int len) throws IOException {
if (out != null) {
out.write(data, off, len);
out.flush();
+ } else {
+ throw new IllegalStateException("No output stream for channel");
}
localWindow.consumeAndCheck(len);
}
@@ -244,6 +240,8 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
if (err != null) {
err.write(data, off, len);
err.flush();
+ } else {
+ throw new IllegalStateException("No error stream for channel");
}
localWindow.consumeAndCheck(len);
}
@@ -253,16 +251,12 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
String req = buffer.getString();
if ("exit-status".equals(req)) {
buffer.getBoolean();
- synchronized (lock) {
- exitStatus = buffer.getInt();
- lock.notifyAll();
- }
+ exitStatus = buffer.getInt();
+ notifyStateChanged();
} else if ("exit-signal".equals(req)) {
buffer.getBoolean();
- synchronized (lock) {
- exitSignal = buffer.getString();
- lock.notifyAll();
- }
+ exitSignal = buffer.getString();
+ notifyStateChanged();
}
// TODO: handle other channel requests
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8bfdf38c/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
index ee48e0d..3166666 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
@@ -98,7 +98,7 @@ public class ChannelSession extends AbstractClientChannel {
}
}
} catch (Exception e) {
- if (!closing) {
+ if (!closing.get()) {
log.info("Caught exception", e);
close(false);
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8bfdf38c/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index ff2a306..ae2a9e1 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -19,7 +19,7 @@
package org.apache.sshd.common.channel;
import java.io.IOException;
-import java.io.InterruptedIOException;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.future.WriteFuture;
@@ -51,9 +51,9 @@ public abstract class AbstractChannel implements Channel {
protected Session session;
protected int id;
protected int recipient;
- protected boolean eof;
protected final CloseFuture closeFuture = new DefaultCloseFuture(lock);
- protected boolean closing;
+ protected volatile boolean eof;
+ protected final AtomicBoolean closing = new AtomicBoolean();
protected boolean closedByOtherSide;
public int getId() {
@@ -86,56 +86,56 @@ public abstract class AbstractChannel implements Channel {
configureWindow();
}
+ protected void notifyStateChanged() {
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ }
+
public CloseFuture close(boolean immediately) {
if (closeFuture.isClosed()) {
return closeFuture;
}
- try {
- synchronized (lock) {
+ if (closing.compareAndSet(false, true)) {
+ try {
if (immediately) {
log.debug("Closing channel {} immediately", id);
+ doClose();
closeFuture.setClosed();
- lock.notifyAll();
+ notifyStateChanged();
session.unregisterChannel(this);
} else {
- if (!closing) {
- closing = true;
- log.debug("Send SSH_MSG_CHANNEL_CLOSE on channel {}", id);
- Buffer buffer = session.createBuffer(SshConstants.Message.SSH_MSG_CHANNEL_CLOSE, 0);
- buffer.putInt(recipient);
- session.writePacket(buffer).addListener(new IoFutureListener<WriteFuture>() {
- public void operationComplete(WriteFuture future) {
- synchronized (lock) {
- if (closedByOtherSide) {
- log.debug("Message SSH_MSG_CHANNEL_CLOSE written on channel {}", id);
- closeFuture.setClosed();
- doClose();
- lock.notifyAll();
- }
- }
+ log.debug("Closing channel {} gracefully", id);
+ doClose();
+ log.debug("Send SSH_MSG_CHANNEL_CLOSE on channel {}", id);
+ Buffer buffer = session.createBuffer(SshConstants.Message.SSH_MSG_CHANNEL_CLOSE, 0);
+ buffer.putInt(recipient);
+ session.writePacket(buffer).addListener(new IoFutureListener<WriteFuture>() {
+ public void operationComplete(WriteFuture future) {
+ if (closedByOtherSide) {
+ log.debug("Message SSH_MSG_CHANNEL_CLOSE written on channel {}", id);
+ closeFuture.setClosed();
+ notifyStateChanged();
}
- });
- }
+ }
+ });
}
+ } catch (IOException e) {
+ session.exceptionCaught(e);
+ closeFuture.setClosed();
}
- } catch (IOException e) {
- session.exceptionCaught(e);
- closeFuture.setClosed();
}
return closeFuture;
}
public void handleClose() throws IOException {
log.debug("Received SSH_MSG_CHANNEL_CLOSE on channel {}", id);
- synchronized (lock) {
- closedByOtherSide = !closing;
- if (closedByOtherSide) {
- close(false);
- } else {
- close(false).setClosed();
- doClose();
- lock.notifyAll();
- }
+ closedByOtherSide = !closing.get();
+ if (closedByOtherSide) {
+ close(false);
+ } else {
+ close(false).setClosed();
+ notifyStateChanged();
}
}
@@ -143,12 +143,10 @@ public abstract class AbstractChannel implements Channel {
}
protected void writePacket(Buffer buffer) throws IOException {
- synchronized (lock) {
- if (!closing) {
- session.writePacket(buffer);
- } else {
- log.debug("Discarding output packet because channel is being closed");
- }
+ if (!closing.get()) {
+ session.writePacket(buffer);
+ } else {
+ log.debug("Discarding output packet because channel is being closed");
}
}
@@ -187,10 +185,8 @@ public abstract class AbstractChannel implements Channel {
public void handleEof() throws IOException {
log.debug("Received SSH_MSG_CHANNEL_EOF on channel {}", id);
- synchronized (lock) {
- eof = true;
- lock.notifyAll();
- }
+ eof = true;
+ notifyStateChanged();
}
public void handleWindowAdjust(Buffer buffer) throws IOException {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8bfdf38c/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
index 4c9fa28..5bd314b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
@@ -117,17 +117,15 @@ public class TcpipServerChannel extends AbstractServerChannel {
IoHandler handler = new IoHandlerAdapter() {
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
- synchronized (lock) {
- if (closing) {
- log.debug("Ignoring write to channel {} in CLOSING state", id);
- } else {
- IoBuffer ioBuffer = (IoBuffer) message;
- int r = ioBuffer.remaining();
- byte[] b = new byte[r];
- ioBuffer.get(b, 0, r);
- out.write(b, 0, r);
- out.flush();
- }
+ if (closing.get()) {
+ log.debug("Ignoring write to channel {} in CLOSING state", id);
+ } else {
+ IoBuffer ioBuffer = (IoBuffer) message;
+ int r = ioBuffer.remaining();
+ byte[] b = new byte[r];
+ ioBuffer.get(b, 0, r);
+ out.write(b, 0, r);
+ out.flush();
}
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8bfdf38c/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
index a357305..5388d8a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
@@ -175,23 +175,21 @@ public class ChannelSession extends AbstractServerChannel {
public ChannelSession() {
}
- public CloseFuture close(boolean immediately) {
- return super.close(immediately).addListener(new SshFutureListener() {
- public void operationComplete(SshFuture sshFuture) {
- if (command != null) {
- command.destroy();
- command = null;
- }
- remoteWindow.notifyClosed();
- IoUtils.closeQuietly(out, err, receiver);
- }
- });
+ @Override
+ protected void doClose() {
+ if (command != null) {
+ command.destroy();
+ command = null;
+ }
+ remoteWindow.notifyClosed();
+ IoUtils.closeQuietly(out, err, receiver);
+ super.doClose();
}
@Override
public void handleEof() throws IOException {
super.handleEof();
- receiver.close();
+ IoUtils.closeQuietly(receiver);
}
public void handleRequest(Buffer buffer) throws IOException {
@@ -206,6 +204,10 @@ public class ChannelSession extends AbstractServerChannel {
}
protected void doWriteData(byte[] data, int off, int len) throws IOException {
+ // If we're already closing, ignore incoming data
+ if (closing.get()) {
+ return;
+ }
if (receiver != null) {
int r = receiver.data(this, data, off, len);
if (r > 0) {
@@ -563,13 +565,11 @@ public class ChannelSession extends AbstractServerChannel {
}
protected void closeShell(int exitValue) throws IOException {
- synchronized (lock) {
- if (!closing) {
- sendEof();
- sendExitStatus(exitValue);
- // TODO: We should wait for all streams to be consumed before closing the channel
- close(false);
- }
+ if (!closing.get()) {
+ sendEof();
+ sendExitStatus(exitValue);
+ // TODO: We should wait for all streams to be consumed before closing the channel
+ close(false);
}
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8bfdf38c/sshd-core/src/main/java/org/apache/sshd/server/command/UnknownCommand.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/command/UnknownCommand.java b/sshd-core/src/main/java/org/apache/sshd/server/command/UnknownCommand.java
index 7bcc1c3..95c0fda 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/command/UnknownCommand.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/command/UnknownCommand.java
@@ -35,31 +35,41 @@ import org.apache.sshd.server.ExitCallback;
*/
public class UnknownCommand implements Command {
+ private String command;
+ private InputStream in;
+ private OutputStream out;
+ private OutputStream err;
+ private ExitCallback callback;
+
public UnknownCommand(String command) {
+ this.command = command;
}
public void setInputStream(InputStream in) {
- //To change body of implemented methods use File | Settings | File Templates.
+ this.in = in;
}
public void setOutputStream(OutputStream out) {
- //To change body of implemented methods use File | Settings | File Templates.
+ this.out = out;
}
public void setErrorStream(OutputStream err) {
- //To change body of implemented methods use File | Settings | File Templates.
+ this.err = err;
}
public void setExitCallback(ExitCallback callback) {
- //To change body of implemented methods use File | Settings | File Templates.
+ this.callback = callback;
}
public void start(Environment env) throws IOException {
- //To change body of implemented methods use File | Settings | File Templates.
- // TODO: send back an error ?
+ err.write(("Unknown command: " + command + "\n").getBytes());
+ err.flush();
+ if (callback != null) {
+ callback.onExit(1, "Unknown command: " + command);
+ }
}
public void destroy() {
- //To change body of implemented methods use File | Settings | File Templates.
}
+
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8bfdf38c/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/ClientTest.java b/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
index 8bf93b4..5db1059 100644
--- a/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
@@ -28,6 +28,7 @@ import java.security.KeyPair;
import java.util.concurrent.CountDownLatch;
import org.apache.mina.core.future.WriteFuture;
+import org.apache.sshd.client.channel.ChannelExec;
import org.apache.sshd.client.future.AuthFuture;
import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.common.KeyPairProvider;
@@ -37,14 +38,19 @@ import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.session.AbstractSession;
import org.apache.sshd.common.util.Buffer;
import org.apache.sshd.common.util.BufferUtils;
+import org.apache.sshd.common.util.NoCloseOutputStream;
import org.apache.sshd.server.Command;
+import org.apache.sshd.server.CommandFactory;
+import org.apache.sshd.server.command.UnknownCommand;
import org.apache.sshd.util.*;
import org.junit.After;
import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
/**
@@ -67,6 +73,11 @@ public class ClientTest {
sshd.setPort(port);
sshd.setKeyPairProvider(Utils.createTestHostKeyProvider());
sshd.setShellFactory(new TestEchoShellFactory());
+ sshd.setCommandFactory(new CommandFactory() {
+ public Command createCommand(String command) {
+ return new UnknownCommand(command);
+ }
+ });
sshd.setPasswordAuthenticator(new BogusPasswordAuthenticator());
sshd.setPublickeyAuthenticator(new BogusPublickeyAuthenticator());
sshd.start();
@@ -81,6 +92,30 @@ public class ClientTest {
}
@Test
+ public void testCommandDeadlock() throws Exception {
+ SshClient client = SshClient.setUpDefaultClient();
+ client.start();
+ ClientSession session = client.connect("localhost", port).await().getSession();
+ session.authPassword("smx", "smx").await().isSuccess();
+ ChannelExec channel = session.createExecChannel("test");
+ channel.setOut(new NoCloseOutputStream(System.out));
+ channel.setErr(new NoCloseOutputStream(System.err));
+ channel.open().await();
+ Thread.sleep(100);
+ try {
+ for (int i = 0; i < 100; i++) {
+ channel.getInvertedIn().write("a".getBytes());
+ channel.getInvertedIn().flush();
+ }
+ } catch (SshException e) {
+ // That's ok, the channel is being closed by the other side
+ }
+ assertEquals(ChannelExec.CLOSED, channel.waitFor(ChannelExec.CLOSED, 0) & ChannelExec.CLOSED);
+ session.close(false).await();
+ client.stop();
+ }
+
+ @Test
public void testClient() throws Exception {
SshClient client = SshClient.setUpDefaultClient();
client.start();