You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2013/07/26 10:37:28 UTC

[3/3] git commit: [SSHD-246] Let commands finish stream consumption and cleanly exit

[SSHD-246] Let commands finish stream consumption and cleanly exit

Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/f549a71b
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/f549a71b
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/f549a71b

Branch: refs/heads/master
Commit: f549a71bc1559df036b2149db0f2a405595b329b
Parents: 725bcd9
Author: Guillaume Nodet <gn...@apache.org>
Authored: Fri Jul 26 10:37:12 2013 +0200
Committer: Guillaume Nodet <gn...@apache.org>
Committed: Fri Jul 26 10:37:12 2013 +0200

----------------------------------------------------------------------
 .../sshd/agent/unix/AgentForwardedChannel.java  |  4 +-
 .../client/channel/AbstractClientChannel.java   |  8 +-
 .../sshd/client/channel/ChannelSession.java     |  5 +-
 .../sshd/common/channel/AbstractChannel.java    | 77 ++++++++++++--------
 .../common/channel/ChannelPipedInputStream.java |  4 +-
 .../sshd/common/forward/TcpipClientChannel.java | 14 +++-
 .../sshd/server/ServerFactoryManager.java       |  7 ++
 .../server/channel/AbstractServerChannel.java   |  1 +
 .../sshd/server/channel/ChannelSession.java     | 45 +++++++++++-
 .../sshd/server/x11/X11ForwardSupport.java      | 14 +++-
 10 files changed, 131 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
index 3207074..4f23584 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
@@ -66,9 +66,9 @@ public class AgentForwardedChannel extends AbstractClientChannel implements Runn
     }
 
     @Override
-    protected synchronized void doClose() {
+    protected synchronized void postClose() {
         Socket.close(socket);
-        super.doClose();
+        super.postClose();
     }
 
     protected synchronized void doWriteData(byte[] data, int off, int len) throws IOException {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
index b786c39..6a332d4 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
@@ -123,9 +123,9 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
     }
 
     @Override
-    protected void doClose() {
-        super.doClose();
+    protected void postClose() {
         IoUtils.closeQuietly(invertedIn, invertedOut, invertedErr, in, out, err);
+        super.postClose();
     }
 
     public int waitFor(int mask, long timeout) {
@@ -207,7 +207,7 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
         } catch (Exception e) {
             this.openFuture.setException(e);
             this.closeFuture.setClosed();
-            this.doClose();
+            this.postClose();
         } finally {
             notifyStateChanged();
         }
@@ -222,7 +222,7 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
         this.openFailureMsg = msg;
         this.openFuture.setException(new SshException(msg));
         this.closeFuture.setClosed();
-        this.doClose();
+        this.postClose();
         notifyStateChanged();
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
index d6c9124..e18378a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
@@ -26,6 +26,7 @@ import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.channel.ChannelOutputStream;
 import org.apache.sshd.common.channel.ChannelPipedInputStream;
 import org.apache.sshd.common.channel.ChannelPipedOutputStream;
+import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.util.Buffer;
 
 /**
@@ -76,12 +77,12 @@ public class ChannelSession extends AbstractClientChannel {
     }
 
     @Override
-    protected void doClose() {
-        super.doClose();
+    protected void postClose() {
         if (streamPumper != null) {
             streamPumper.interrupt();
             streamPumper = null;
         }
+        super.postClose();
     }
 
     protected void pumpInputStream() {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index 71d9fa4..a0caaf8 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -29,6 +29,9 @@ import org.apache.sshd.common.Session;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.DefaultCloseFuture;
+import org.apache.sshd.common.future.DefaultSshFuture;
+import org.apache.sshd.common.future.SshFuture;
+import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.util.Buffer;
 import org.apache.sshd.common.util.BufferUtils;
 import org.slf4j.Logger;
@@ -93,36 +96,43 @@ public abstract class AbstractChannel implements Channel {
     }
 
     public CloseFuture close(boolean immediately) {
-        if (closeFuture.isClosed()) {
-            return closeFuture;
-        }
         if (closing.compareAndSet(false, true)) {
-            try {
-                if (immediately) {
-                    log.debug("Closing channel {} immediately", id);
-                    doClose();
-                    closeFuture.setClosed();
-                    notifyStateChanged();
-                    session.unregisterChannel(this);
-                } else {
-                    log.debug("Closing channel {} gracefully", id);
-                    doClose();
-                    log.debug("Send SSH_MSG_CHANNEL_CLOSE on channel {}", id);
-                    Buffer buffer = session.createBuffer(SshConstants.Message.SSH_MSG_CHANNEL_CLOSE, 0);
-                    buffer.putInt(recipient);
-                    session.writePacket(buffer).addListener(new IoFutureListener<WriteFuture>() {
-                        public void operationComplete(WriteFuture future) {
-                            if (closedByOtherSide) {
-                                log.debug("Message SSH_MSG_CHANNEL_CLOSE written on channel {}", id);
-                                closeFuture.setClosed();
-                                notifyStateChanged();
-                            }
+            if (immediately) {
+                log.debug("Closing channel {} immediately", id);
+                preClose(immediately).addListener(new SshFutureListener<CloseFuture>() {
+                    public void operationComplete(CloseFuture future) {
+                        postClose();
+                        closeFuture.setClosed();
+                        notifyStateChanged();
+                        session.unregisterChannel(AbstractChannel.this);
+                    }
+                });
+            } else {
+                log.debug("Closing channel {} gracefully", id);
+                preClose(immediately).addListener(new SshFutureListener<CloseFuture>() {
+                    public void operationComplete(CloseFuture future) {
+                        log.debug("Send SSH_MSG_CHANNEL_CLOSE on channel {}", id);
+                        Buffer buffer = session.createBuffer(SshConstants.Message.SSH_MSG_CHANNEL_CLOSE, 0);
+                        buffer.putInt(recipient);
+                        try {
+                            session.writePacket(buffer).addListener(new IoFutureListener<WriteFuture>() {
+                                public void operationComplete(WriteFuture future) {
+                                    if (closedByOtherSide) {
+                                        log.debug("Message SSH_MSG_CHANNEL_CLOSE written on channel {}", id);
+                                        postClose();
+                                        closeFuture.setClosed();
+                                        notifyStateChanged();
+                                    }
+                                }
+                            });
+                        } catch (IOException e) {
+                            log.debug("Exception caught while writing SSH_MSG_CHANNEL_CLOSE packet on channel " + id, e);
+                            postClose();
+                            closeFuture.setClosed();
+                            notifyStateChanged();
                         }
-                    });
-                }
-            } catch (IOException e) {
-                session.exceptionCaught(e);
-                closeFuture.setClosed();
+                    }
+                });
             }
         }
         return closeFuture;
@@ -134,12 +144,19 @@ public abstract class AbstractChannel implements Channel {
         if (closedByOtherSide) {
             close(false);
         } else {
-            close(false).setClosed();
+            postClose();
+            closeFuture.setClosed();
             notifyStateChanged();
         }
     }
 
-    protected void doClose() {
+    protected CloseFuture preClose(boolean immediately) {
+        CloseFuture future = new DefaultCloseFuture(lock);
+        future.setClosed();
+        return future;
+    }
+
+    protected void postClose() {
     }
 
     protected void writePacket(Buffer buffer) throws IOException {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
index 1d7b489..2257cd9 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
@@ -40,6 +40,7 @@ public class ChannelPipedInputStream extends InputStream {
     private final Buffer buffer = new Buffer();
     private final byte[] b = new byte[1];
     private boolean closed;
+    private boolean eofSent;
 
     private final Lock lock = new ReentrantLock();
     private final Condition dataAvailable = lock.newCondition();
@@ -96,13 +97,14 @@ public class ChannelPipedInputStream extends InputStream {
         lock.lock();
         try {
             for (;;) {
-                if (closed && !writerClosed) {
+                if (closed && writerClosed && eofSent || closed && !writerClosed) {
                     throw new IOException("Pipe closed");
                 }
                 if (buffer.available() > 0) {
                     break;
                 }
                 if (writerClosed) {
+                    eofSent = true;
                     return -1; // no more data to read
                 }
                 try {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
index d0c4de8..ccd1f77 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 
 import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.future.IoFutureListener;
 import org.apache.mina.core.session.IoSession;
 import org.apache.sshd.client.channel.AbstractClientChannel;
 import org.apache.sshd.client.future.DefaultOpenFuture;
@@ -30,6 +31,8 @@ import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.SshdSocketAddress;
 import org.apache.sshd.common.channel.ChannelOutputStream;
+import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.future.DefaultCloseFuture;
 import org.apache.sshd.common.util.Buffer;
 
 /**
@@ -96,9 +99,14 @@ public class TcpipClientChannel extends AbstractClientChannel {
     }
 
     @Override
-    protected synchronized void doClose() {
-        serverSession.close(false);
-        super.doClose();
+    protected synchronized CloseFuture preClose(boolean immediately) {
+        final CloseFuture future = new DefaultCloseFuture(null);
+        serverSession.close(immediately).addListener(new IoFutureListener<org.apache.mina.core.future.CloseFuture>() {
+            public void operationComplete(org.apache.mina.core.future.CloseFuture f) {
+                future.setClosed();
+            }
+        });
+        return future;
     }
 
     protected synchronized void doWriteData(byte[] data, int off, int len) throws IOException {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
index 81a437d..d47e10c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
@@ -83,6 +83,13 @@ public interface ServerFactoryManager extends FactoryManager {
     public static final String AUTH_METHODS = "auth-methods";
 
     /**
+     * Key used to configure the timeout used when receiving a close request
+     * on a channel to wait until the command cleanly exits after setting
+     * an EOF on the input stream. In milliseconds.
+     */
+    public static final String COMMAND_EXIT_TIMEOUT = "command-exit-timeout";
+
+    /**
      * Retrieve the list of named factories for <code>UserAuth<code> objects.
      *
      * @return a list of named <code>UserAuth</code> factories, never <code>null</code>

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
index 4ab0412..e96f483 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
@@ -66,6 +66,7 @@ public abstract class AbstractServerChannel extends AbstractChannel {
             buffer.putByte((byte) 0);
             buffer.putInt(v);
             writePacket(buffer);
+            notifyStateChanged();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
index 5388d8a..1fcf6d0 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
@@ -25,8 +25,10 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.sshd.agent.SshAgent;
 import org.apache.sshd.agent.SshAgentFactory;
@@ -37,6 +39,7 @@ import org.apache.sshd.common.PtyMode;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.channel.ChannelOutputStream;
 import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.future.DefaultCloseFuture;
 import org.apache.sshd.common.future.SshFuture;
 import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.util.Buffer;
@@ -48,6 +51,7 @@ import org.apache.sshd.server.Environment;
 import org.apache.sshd.server.ExitCallback;
 import org.apache.sshd.common.file.FileSystemAware;
 import org.apache.sshd.common.file.FileSystemFactory;
+import org.apache.sshd.server.ServerFactoryManager;
 import org.apache.sshd.server.SessionAware;
 import org.apache.sshd.server.Signal;
 import org.apache.sshd.server.SignalListener;
@@ -61,6 +65,8 @@ import org.apache.sshd.server.x11.X11ForwardSupport;
  */
 public class ChannelSession extends AbstractServerChannel {
 
+    public static final long DEFAULT_COMMAND_EXIT_TIMEOUT = 5000;
+
     public static class Factory implements NamedFactory<Channel> {
 
         public String getName() {
@@ -171,19 +177,52 @@ public class ChannelSession extends AbstractServerChannel {
     protected ChannelDataReceiver receiver;
     protected StandardEnvironment env = new StandardEnvironment();
     protected Buffer tempBuffer;
+    protected final CloseFuture commandExitFuture = new DefaultCloseFuture(lock);
 
     public ChannelSession() {
     }
 
     @Override
-    protected void doClose() {
+    protected CloseFuture preClose(boolean immediately) {
+        if (immediately) {
+            commandExitFuture.setClosed();
+        } else if (!commandExitFuture.isClosed()) {
+            log.debug("Wait 5s for shell to exit cleanly");
+            IoUtils.closeQuietly(receiver);
+            final TimerTask task = new TimerTask() {
+                @Override
+                public void run() {
+                    commandExitFuture.setClosed();
+                }
+            };
+            long timeout = DEFAULT_COMMAND_EXIT_TIMEOUT;
+            String val = getSession().getFactoryManager().getProperties().get(ServerFactoryManager.COMMAND_EXIT_TIMEOUT);
+            if (val != null) {
+                try {
+                   timeout = Long.parseLong(val);
+                } catch (NumberFormatException e) {
+                    // Ignore
+                }
+            }
+            getSession().getFactoryManager().getScheduledExecutorService().schedule(task, timeout, TimeUnit.MILLISECONDS);
+            commandExitFuture.addListener(new SshFutureListener<CloseFuture>() {
+                public void operationComplete(CloseFuture future) {
+                    task.cancel();
+                }
+            });
+        }
+        return commandExitFuture;
+    }
+
+    @Override
+    protected void postClose() {
         if (command != null) {
             command.destroy();
             command = null;
         }
         remoteWindow.notifyClosed();
         IoUtils.closeQuietly(out, err, receiver);
-        super.doClose();
+        super.postClose();
     }
 
     @Override
@@ -568,9 +607,9 @@ public class ChannelSession extends AbstractServerChannel {
         if (!closing.get()) {
             sendEof();
             sendExitStatus(exitValue);
-            // TODO: We should wait for all streams to be consumed before closing the channel
             close(false);
         }
+        commandExitFuture.setClosed();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f549a71b/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java b/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
index 37ec331..84d8e9a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
@@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
 import java.util.EnumSet;
 
 import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.future.IoFutureListener;
 import org.apache.mina.core.service.IoAcceptor;
 import org.apache.mina.core.service.IoHandlerAdapter;
 import org.apache.mina.core.session.IoEventType;
@@ -36,6 +37,8 @@ import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.channel.ChannelOutputStream;
+import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.future.DefaultCloseFuture;
 import org.apache.sshd.common.util.Buffer;
 import org.apache.sshd.server.session.ServerSession;
 import org.slf4j.Logger;
@@ -207,9 +210,14 @@ public class X11ForwardSupport extends IoHandlerAdapter {
         }
 
         @Override
-        protected synchronized void doClose() {
-            serverSession.close(false);
-            super.doClose();
+        protected synchronized CloseFuture preClose(boolean immediately) {
+            final CloseFuture future = new DefaultCloseFuture(null);
+            serverSession.close(immediately).addListener(new IoFutureListener<org.apache.mina.core.future.CloseFuture>() {
+                public void operationComplete(org.apache.mina.core.future.CloseFuture f) {
+                    future.setClosed();
+                }
+            });
+            return future;
         }
 
         protected synchronized void doWriteData(byte[] data, int off, int len) throws IOException {