You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2013/07/26 10:37:28 UTC
[3/3] git commit: [SSHD-246] Let commands finish stream consumption
and cleanly exit
[SSHD-246] Let commands finish stream consumption and cleanly exit
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/f549a71b
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/f549a71b
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/f549a71b
Branch: refs/heads/master
Commit: f549a71bc1559df036b2149db0f2a405595b329b
Parents: 725bcd9
Author: Guillaume Nodet <gn...@apache.org>
Authored: Fri Jul 26 10:37:12 2013 +0200
Committer: Guillaume Nodet <gn...@apache.org>
Committed: Fri Jul 26 10:37:12 2013 +0200
----------------------------------------------------------------------
.../sshd/agent/unix/AgentForwardedChannel.java | 4 +-
.../client/channel/AbstractClientChannel.java | 8 +-
.../sshd/client/channel/ChannelSession.java | 5 +-
.../sshd/common/channel/AbstractChannel.java | 77 ++++++++++++--------
.../common/channel/ChannelPipedInputStream.java | 4 +-
.../sshd/common/forward/TcpipClientChannel.java | 14 +++-
.../sshd/server/ServerFactoryManager.java | 7 ++
.../server/channel/AbstractServerChannel.java | 1 +
.../sshd/server/channel/ChannelSession.java | 45 +++++++++++-
.../sshd/server/x11/X11ForwardSupport.java | 14 +++-
10 files changed, 131 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
index 3207074..4f23584 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
@@ -66,9 +66,9 @@ public class AgentForwardedChannel extends AbstractClientChannel implements Runn
}
@Override
- protected synchronized void doClose() {
+ protected synchronized void postClose() {
Socket.close(socket);
- super.doClose();
+ super.postClose();
}
protected synchronized void doWriteData(byte[] data, int off, int len) throws IOException {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
index b786c39..6a332d4 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
@@ -123,9 +123,9 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
}
@Override
- protected void doClose() {
- super.doClose();
+ protected void postClose() {
IoUtils.closeQuietly(invertedIn, invertedOut, invertedErr, in, out, err);
+ super.postClose();
}
public int waitFor(int mask, long timeout) {
@@ -207,7 +207,7 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
} catch (Exception e) {
this.openFuture.setException(e);
this.closeFuture.setClosed();
- this.doClose();
+ this.postClose();
} finally {
notifyStateChanged();
}
@@ -222,7 +222,7 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
this.openFailureMsg = msg;
this.openFuture.setException(new SshException(msg));
this.closeFuture.setClosed();
- this.doClose();
+ this.postClose();
notifyStateChanged();
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
index d6c9124..e18378a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
@@ -26,6 +26,7 @@ import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.channel.ChannelOutputStream;
import org.apache.sshd.common.channel.ChannelPipedInputStream;
import org.apache.sshd.common.channel.ChannelPipedOutputStream;
+import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.util.Buffer;
/**
@@ -76,12 +77,12 @@ public class ChannelSession extends AbstractClientChannel {
}
@Override
- protected void doClose() {
- super.doClose();
+ protected void postClose() {
if (streamPumper != null) {
streamPumper.interrupt();
streamPumper = null;
}
+ super.postClose();
}
protected void pumpInputStream() {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index 71d9fa4..a0caaf8 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -29,6 +29,9 @@ import org.apache.sshd.common.Session;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.future.DefaultCloseFuture;
+import org.apache.sshd.common.future.DefaultSshFuture;
+import org.apache.sshd.common.future.SshFuture;
+import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.util.Buffer;
import org.apache.sshd.common.util.BufferUtils;
import org.slf4j.Logger;
@@ -93,36 +96,43 @@ public abstract class AbstractChannel implements Channel {
}
public CloseFuture close(boolean immediately) {
- if (closeFuture.isClosed()) {
- return closeFuture;
- }
if (closing.compareAndSet(false, true)) {
- try {
- if (immediately) {
- log.debug("Closing channel {} immediately", id);
- doClose();
- closeFuture.setClosed();
- notifyStateChanged();
- session.unregisterChannel(this);
- } else {
- log.debug("Closing channel {} gracefully", id);
- doClose();
- log.debug("Send SSH_MSG_CHANNEL_CLOSE on channel {}", id);
- Buffer buffer = session.createBuffer(SshConstants.Message.SSH_MSG_CHANNEL_CLOSE, 0);
- buffer.putInt(recipient);
- session.writePacket(buffer).addListener(new IoFutureListener<WriteFuture>() {
- public void operationComplete(WriteFuture future) {
- if (closedByOtherSide) {
- log.debug("Message SSH_MSG_CHANNEL_CLOSE written on channel {}", id);
- closeFuture.setClosed();
- notifyStateChanged();
- }
+ if (immediately) {
+ log.debug("Closing channel {} immediately", id);
+ preClose(immediately).addListener(new SshFutureListener<CloseFuture>() {
+ public void operationComplete(CloseFuture future) {
+ postClose();
+ closeFuture.setClosed();
+ notifyStateChanged();
+ session.unregisterChannel(AbstractChannel.this);
+ }
+ });
+ } else {
+ log.debug("Closing channel {} gracefully", id);
+ preClose(immediately).addListener(new SshFutureListener<CloseFuture>() {
+ public void operationComplete(CloseFuture future) {
+ log.debug("Send SSH_MSG_CHANNEL_CLOSE on channel {}", id);
+ Buffer buffer = session.createBuffer(SshConstants.Message.SSH_MSG_CHANNEL_CLOSE, 0);
+ buffer.putInt(recipient);
+ try {
+ session.writePacket(buffer).addListener(new IoFutureListener<WriteFuture>() {
+ public void operationComplete(WriteFuture future) {
+ if (closedByOtherSide) {
+ log.debug("Message SSH_MSG_CHANNEL_CLOSE written on channel {}", id);
+ postClose();
+ closeFuture.setClosed();
+ notifyStateChanged();
+ }
+ }
+ });
+ } catch (IOException e) {
+ log.debug("Exception caught while writing SSH_MSG_CHANNEL_CLOSE packet on channel " + id, e);
+ postClose();
+ closeFuture.setClosed();
+ notifyStateChanged();
}
- });
- }
- } catch (IOException e) {
- session.exceptionCaught(e);
- closeFuture.setClosed();
+ }
+ });
}
}
return closeFuture;
@@ -134,12 +144,19 @@ public abstract class AbstractChannel implements Channel {
if (closedByOtherSide) {
close(false);
} else {
- close(false).setClosed();
+ postClose();
+ closeFuture.setClosed();
notifyStateChanged();
}
}
- protected void doClose() {
+ protected CloseFuture preClose(boolean immediately) {
+ CloseFuture future = new DefaultCloseFuture(lock);
+ future.setClosed();
+ return future;
+ }
+
+ protected void postClose() {
}
protected void writePacket(Buffer buffer) throws IOException {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
index 1d7b489..2257cd9 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
@@ -40,6 +40,7 @@ public class ChannelPipedInputStream extends InputStream {
private final Buffer buffer = new Buffer();
private final byte[] b = new byte[1];
private boolean closed;
+ private boolean eofSent;
private final Lock lock = new ReentrantLock();
private final Condition dataAvailable = lock.newCondition();
@@ -96,13 +97,14 @@ public class ChannelPipedInputStream extends InputStream {
lock.lock();
try {
for (;;) {
- if (closed && !writerClosed) {
+ if (closed && writerClosed && eofSent || closed && !writerClosed) {
throw new IOException("Pipe closed");
}
if (buffer.available() > 0) {
break;
}
if (writerClosed) {
+ eofSent = true;
return -1; // no more data to read
}
try {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
index d0c4de8..ccd1f77 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.session.IoSession;
import org.apache.sshd.client.channel.AbstractClientChannel;
import org.apache.sshd.client.future.DefaultOpenFuture;
@@ -30,6 +31,8 @@ import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.SshdSocketAddress;
import org.apache.sshd.common.channel.ChannelOutputStream;
+import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.future.DefaultCloseFuture;
import org.apache.sshd.common.util.Buffer;
/**
@@ -96,9 +99,14 @@ public class TcpipClientChannel extends AbstractClientChannel {
}
@Override
- protected synchronized void doClose() {
- serverSession.close(false);
- super.doClose();
+ protected synchronized CloseFuture preClose(boolean immediately) {
+ final CloseFuture future = new DefaultCloseFuture(null);
+ serverSession.close(immediately).addListener(new IoFutureListener<org.apache.mina.core.future.CloseFuture>() {
+ public void operationComplete(org.apache.mina.core.future.CloseFuture f) {
+ future.setClosed();
+ }
+ });
+ return future;
}
protected synchronized void doWriteData(byte[] data, int off, int len) throws IOException {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
index 81a437d..d47e10c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
@@ -83,6 +83,13 @@ public interface ServerFactoryManager extends FactoryManager {
public static final String AUTH_METHODS = "auth-methods";
/**
+ * Key used to configure the timeout used when receiving a close request
+ * on a channel to wait until the command cleanly exits after setting
+ * an EOF on the input stream. In milliseconds.
+ */
+ public static final String COMMAND_EXIT_TIMEOUT = "command-exit-timeout";
+
+ /**
* Retrieve the list of named factories for <code>UserAuth<code> objects.
*
* @return a list of named <code>UserAuth</code> factories, never <code>null</code>
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
index 4ab0412..e96f483 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
@@ -66,6 +66,7 @@ public abstract class AbstractServerChannel extends AbstractChannel {
buffer.putByte((byte) 0);
buffer.putInt(v);
writePacket(buffer);
+ notifyStateChanged();
}
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
index 5388d8a..1fcf6d0 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
@@ -25,8 +25,10 @@ import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.TimeUnit;
import org.apache.sshd.agent.SshAgent;
import org.apache.sshd.agent.SshAgentFactory;
@@ -37,6 +39,7 @@ import org.apache.sshd.common.PtyMode;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.channel.ChannelOutputStream;
import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.future.DefaultCloseFuture;
import org.apache.sshd.common.future.SshFuture;
import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.util.Buffer;
@@ -48,6 +51,7 @@ import org.apache.sshd.server.Environment;
import org.apache.sshd.server.ExitCallback;
import org.apache.sshd.common.file.FileSystemAware;
import org.apache.sshd.common.file.FileSystemFactory;
+import org.apache.sshd.server.ServerFactoryManager;
import org.apache.sshd.server.SessionAware;
import org.apache.sshd.server.Signal;
import org.apache.sshd.server.SignalListener;
@@ -61,6 +65,8 @@ import org.apache.sshd.server.x11.X11ForwardSupport;
*/
public class ChannelSession extends AbstractServerChannel {
+ public static final long DEFAULT_COMMAND_EXIT_TIMEOUT = 5000;
+
public static class Factory implements NamedFactory<Channel> {
public String getName() {
@@ -171,19 +177,52 @@ public class ChannelSession extends AbstractServerChannel {
protected ChannelDataReceiver receiver;
protected StandardEnvironment env = new StandardEnvironment();
protected Buffer tempBuffer;
+ protected final CloseFuture commandExitFuture = new DefaultCloseFuture(lock);
public ChannelSession() {
}
@Override
- protected void doClose() {
+ protected CloseFuture preClose(boolean immediately) {
+ if (immediately) {
+ commandExitFuture.setClosed();
+ } else if (!commandExitFuture.isClosed()) {
+ log.debug("Wait 5s for shell to exit cleanly");
+ IoUtils.closeQuietly(receiver);
+ final TimerTask task = new TimerTask() {
+ @Override
+ public void run() {
+ commandExitFuture.setClosed();
+ }
+ };
+ long timeout = DEFAULT_COMMAND_EXIT_TIMEOUT;
+ String val = getSession().getFactoryManager().getProperties().get(ServerFactoryManager.COMMAND_EXIT_TIMEOUT);
+ if (val != null) {
+ try {
+ timeout = Long.parseLong(val);
+ } catch (NumberFormatException e) {
+ // Ignore
+ }
+ }
+ getSession().getFactoryManager().getScheduledExecutorService().schedule(task, timeout, TimeUnit.MILLISECONDS);
+ commandExitFuture.addListener(new SshFutureListener<CloseFuture>() {
+ public void operationComplete(CloseFuture future) {
+ task.cancel();
+ }
+ });
+ }
+ return commandExitFuture;
+ }
+
+ @Override
+ protected void postClose() {
if (command != null) {
command.destroy();
command = null;
}
remoteWindow.notifyClosed();
IoUtils.closeQuietly(out, err, receiver);
- super.doClose();
+ super.postClose();
}
@Override
@@ -568,9 +607,9 @@ public class ChannelSession extends AbstractServerChannel {
if (!closing.get()) {
sendEof();
sendExitStatus(exitValue);
- // TODO: We should wait for all streams to be consumed before closing the channel
close(false);
}
+ commandExitFuture.setClosed();
}
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java b/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
index 37ec331..84d8e9a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
@@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
import java.util.EnumSet;
import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoEventType;
@@ -36,6 +37,8 @@ import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.channel.ChannelOutputStream;
+import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.future.DefaultCloseFuture;
import org.apache.sshd.common.util.Buffer;
import org.apache.sshd.server.session.ServerSession;
import org.slf4j.Logger;
@@ -207,9 +210,14 @@ public class X11ForwardSupport extends IoHandlerAdapter {
}
@Override
- protected synchronized void doClose() {
- serverSession.close(false);
- super.doClose();
+ protected synchronized CloseFuture preClose(boolean immediately) {
+ final CloseFuture future = new DefaultCloseFuture(null);
+ serverSession.close(immediately).addListener(new IoFutureListener<org.apache.mina.core.future.CloseFuture>() {
+ public void operationComplete(org.apache.mina.core.future.CloseFuture f) {
+ future.setClosed();
+ }
+ });
+ return future;
}
protected synchronized void doWriteData(byte[] data, int off, int len) throws IOException {