You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2012/05/21 16:08:53 UTC

svn commit: r1341036 - in /mina/sshd/trunk/sshd-core/src: main/java/org/apache/sshd/client/channel/AbstractClientChannel.java main/java/org/apache/sshd/server/session/TcpipForwardSupport.java test/java/org/apache/sshd/PortForwardingTest.java

Author: gnodet
Date: Mon May 21 14:08:52 2012
New Revision: 1341036

URL: http://svn.apache.org/viewvc?rev=1341036&view=rev
Log:
[SSHD-123] TcpipForward race condition & deadlock

Modified:
    mina/sshd/trunk/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
    mina/sshd/trunk/sshd-core/src/main/java/org/apache/sshd/server/session/TcpipForwardSupport.java
    mina/sshd/trunk/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java

Modified: mina/sshd/trunk/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
URL: http://svn.apache.org/viewvc/mina/sshd/trunk/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java?rev=1341036&r1=1341035&r2=1341036&view=diff
==============================================================================
--- mina/sshd/trunk/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java (original)
+++ mina/sshd/trunk/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java Mon May 21 14:08:52 2012
@@ -92,7 +92,7 @@ public abstract class AbstractClientChan
                     } else {
                         openFuture.addListener(new SshFutureListener<OpenFuture>() {
                             public void operationComplete(OpenFuture future) {
-                                close(immediately);
+                                close(true);
                             }
                         });
                     }

Modified: mina/sshd/trunk/sshd-core/src/main/java/org/apache/sshd/server/session/TcpipForwardSupport.java
URL: http://svn.apache.org/viewvc/mina/sshd/trunk/sshd-core/src/main/java/org/apache/sshd/server/session/TcpipForwardSupport.java?rev=1341036&r1=1341035&r2=1341036&view=diff
==============================================================================
--- mina/sshd/trunk/sshd-core/src/main/java/org/apache/sshd/server/session/TcpipForwardSupport.java (original)
+++ mina/sshd/trunk/sshd-core/src/main/java/org/apache/sshd/server/session/TcpipForwardSupport.java Mon May 21 14:08:52 2012
@@ -31,12 +31,14 @@ import org.apache.mina.core.session.IoEv
 import org.apache.mina.core.session.IoSession;
 import org.apache.mina.filter.executor.ExecutorFilter;
 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.apache.sshd.ClientChannel;
 import org.apache.sshd.client.channel.AbstractClientChannel;
 import org.apache.sshd.client.future.DefaultOpenFuture;
 import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.channel.ChannelOutputStream;
+import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.util.Buffer;
 import org.apache.sshd.server.ForwardingFilter;
 
@@ -131,23 +133,25 @@ public class TcpipForwardSupport extends
     }
 
     @Override
-    public void sessionCreated(IoSession session) throws Exception {
-        ChannelForwardedTcpip channel = new ChannelForwardedTcpip(session);
+    public void sessionCreated(final IoSession session) throws Exception {
+        final ChannelForwardedTcpip channel = new ChannelForwardedTcpip(session);
         session.setAttribute(ChannelForwardedTcpip.class, channel);
         this.session.registerChannel(channel);
-        OpenFuture future = channel.open().await();
-        Throwable t = future.getException();
-        if (t instanceof Exception) {
-            throw (Exception) t;
-        } else if (t != null) {
-            throw new Exception(t);
-        }
+        channel.open().addListener(new SshFutureListener<OpenFuture>() {
+            public void operationComplete(OpenFuture future) {
+                Throwable t = future.getException();
+                if (t != null) {
+                    TcpipForwardSupport.this.session.unregisterChannel(channel);
+                    channel.close(false);
+                }
+            }
+        });
     }
 
     @Override
     public void sessionClosed(IoSession session) throws Exception {
         ChannelForwardedTcpip channel = (ChannelForwardedTcpip) session.getAttribute(ChannelForwardedTcpip.class);
-        if ( channel != null ){
+        if (channel != null) {
         	channel.close(false);
         }
     }
@@ -159,6 +163,7 @@ public class TcpipForwardSupport extends
         int r = ioBuffer.remaining();
         byte[] b = new byte[r];
         ioBuffer.get(b, 0, r);
+        channel.waitFor(ClientChannel.OPENED | ClientChannel.CLOSED, Long.MAX_VALUE);
         channel.getOut().write(b, 0, r);
         channel.getOut().flush();
     }
@@ -178,6 +183,11 @@ public class TcpipForwardSupport extends
             this.serverSession = serverSession;
         }
 
+
+        public OpenFuture getOpenFuture() {
+            return openFuture;
+        }
+
         public synchronized OpenFuture open() throws Exception {
             InetSocketAddress remote = (InetSocketAddress) serverSession.getRemoteAddress();
             InetSocketAddress local = (InetSocketAddress) serverSession.getLocalAddress();

Modified: mina/sshd/trunk/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java
URL: http://svn.apache.org/viewvc/mina/sshd/trunk/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java?rev=1341036&r1=1341035&r2=1341036&view=diff
==============================================================================
--- mina/sshd/trunk/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java (original)
+++ mina/sshd/trunk/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java Mon May 21 14:08:52 2012
@@ -19,8 +19,11 @@
 package org.apache.sshd;
 
 import java.io.*;
+import java.lang.reflect.Field;
 import java.net.*;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 
@@ -40,9 +43,11 @@ import org.apache.sshd.util.BogusPasswor
 import org.apache.sshd.util.BogusForwardingFilter;
 import org.apache.sshd.util.EchoShellFactory;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.slf4j.LoggerFactory;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -52,6 +57,8 @@ import static org.junit.Assert.assertTru
  */
 public class PortForwardingTest {
 
+    private final org.slf4j.Logger log = LoggerFactory.getLogger(getClass());
+
     private SshServer sshd;
     private int sshPort;
     private int echoPort;
@@ -151,6 +158,121 @@ public class PortForwardingTest {
 //        Thread.sleep(1000000);
     }
 
+    @Test(timeout = 20000)
+    public void testRemoteForwardingWithDisconnect() throws Exception {
+        Session session = createSession();
+
+        // 1. Create a Port Forward
+        int forwardedPort = getFreePort();
+        session.setPortForwardingR(forwardedPort, "localhost", echoPort);
+
+        // 2. Establish a connection through it
+        new Socket("localhost", forwardedPort);
+
+        // 3. Simulate the client going away
+        rudelyDisconnectJschSession(session);
+
+        // 4. Make sure the NIOprocessor is not stuck
+        {
+            Thread.sleep(1000);
+            // from here, we need to check all the threads running and find a
+            // "NioProcessor-"
+            // that is stuck on a PortForward.dispose
+            ThreadGroup root = Thread.currentThread().getThreadGroup().getParent();
+            while (root.getParent() != null) {
+                root = root.getParent();
+            }
+            boolean stuck;
+            do {
+                stuck = false;
+                for (Thread t : findThreads(root, "NioProcessor-")) {
+                    stuck = true;
+                }
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+
+                }
+            } while (stuck);
+        }
+
+        session.delPortForwardingR(forwardedPort);
+    }
+
+    /**
+     * Close the socket inside this JSCH session. Use reflection to find it and
+     * just close it.
+     *
+     * @param session
+     *            the Session to violate
+     * @throws Exception
+     */
+    private void rudelyDisconnectJschSession(Session session) throws Exception {
+        Field fSocket = session.getClass().getDeclaredField("socket");
+        fSocket.setAccessible(true);
+        Socket socket = (Socket) fSocket.get(session);
+
+        Assert.assertTrue("socket is not connected", socket.isConnected());
+        Assert.assertFalse("socket should not be closed", socket.isClosed());
+        socket.close();
+        Assert.assertTrue("socket has not closed", socket.isClosed());
+    }
+
+    private Set<Thread> findThreads(ThreadGroup group, String name) {
+        HashSet<Thread> ret = new HashSet<Thread>();
+        int numThreads = group.activeCount();
+        Thread[] threads = new Thread[numThreads * 2];
+        numThreads = group.enumerate(threads, false);
+        // Enumerate each thread in `group'
+        for (int i = 0; i < numThreads; ++i) {
+            // Get thread
+            // log.debug("Thread name: " + threads[i].getName());
+            if (checkThreadForPortForward(threads[i], name)) {
+                ret.add(threads[i]);
+            }
+        }
+        // didn't find the thread to check the
+        int numGroups = group.activeGroupCount();
+        ThreadGroup[] groups = new ThreadGroup[numGroups * 2];
+        numGroups = group.enumerate(groups, false);
+        for (int i = 0; i < numGroups; ++i) {
+            ret.addAll(findThreads(groups[i], name));
+        }
+        return ret;
+    }
+
+    private boolean checkThreadForPortForward(Thread thread, String name) {
+        if (thread == null)
+            return false;
+        // does it contain the name we're looking for?
+        if (thread.getName().contains(name)) {
+            // look at the stack
+            StackTraceElement[] stack = thread.getStackTrace();
+            if (stack.length == 0)
+                return false;
+            else {
+                // does it have
+                // 'org.apache.sshd.server.session.TcpipForwardSupport.close'?
+                for (int i = 0; i < stack.length; ++i) {
+                    String clazzName = stack[i].getClassName();
+                    String methodName = stack[i].getMethodName();
+                    // log.debug("Class: " + clazzName);
+                    // log.debug("Method: " + methodName);
+                    if (clazzName
+                            .equals("org.apache.sshd.server.session.TcpipForwardSupport")
+                            && (methodName.equals("close") || methodName
+                            .equals("sessionCreated"))) {
+                        log.warn(thread.getName() + " stuck at " + clazzName
+                                + "." + methodName + ": "
+                                + stack[i].getLineNumber());
+                        return true;
+                    }
+                }
+            }
+        }
+        return false;
+    }
+
     protected Session createSession() throws JSchException {
         JSch sch = new JSch();
         sch.setLogger(new Logger() {