You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2012/05/21 16:08:53 UTC
svn commit: r1341036 - in /mina/sshd/trunk/sshd-core/src:
main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
main/java/org/apache/sshd/server/session/TcpipForwardSupport.java
test/java/org/apache/sshd/PortForwardingTest.java
Author: gnodet
Date: Mon May 21 14:08:52 2012
New Revision: 1341036
URL: http://svn.apache.org/viewvc?rev=1341036&view=rev
Log:
[SSHD-123] TcpipForward race condition & deadlock
Modified:
mina/sshd/trunk/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
mina/sshd/trunk/sshd-core/src/main/java/org/apache/sshd/server/session/TcpipForwardSupport.java
mina/sshd/trunk/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java
Modified: mina/sshd/trunk/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
URL: http://svn.apache.org/viewvc/mina/sshd/trunk/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java?rev=1341036&r1=1341035&r2=1341036&view=diff
==============================================================================
--- mina/sshd/trunk/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java (original)
+++ mina/sshd/trunk/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java Mon May 21 14:08:52 2012
@@ -92,7 +92,7 @@ public abstract class AbstractClientChan
} else {
openFuture.addListener(new SshFutureListener<OpenFuture>() {
public void operationComplete(OpenFuture future) {
- close(immediately);
+ close(true);
}
});
}
Modified: mina/sshd/trunk/sshd-core/src/main/java/org/apache/sshd/server/session/TcpipForwardSupport.java
URL: http://svn.apache.org/viewvc/mina/sshd/trunk/sshd-core/src/main/java/org/apache/sshd/server/session/TcpipForwardSupport.java?rev=1341036&r1=1341035&r2=1341036&view=diff
==============================================================================
--- mina/sshd/trunk/sshd-core/src/main/java/org/apache/sshd/server/session/TcpipForwardSupport.java (original)
+++ mina/sshd/trunk/sshd-core/src/main/java/org/apache/sshd/server/session/TcpipForwardSupport.java Mon May 21 14:08:52 2012
@@ -31,12 +31,14 @@ import org.apache.mina.core.session.IoEv
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.apache.sshd.ClientChannel;
import org.apache.sshd.client.channel.AbstractClientChannel;
import org.apache.sshd.client.future.DefaultOpenFuture;
import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.channel.ChannelOutputStream;
+import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.util.Buffer;
import org.apache.sshd.server.ForwardingFilter;
@@ -131,23 +133,25 @@ public class TcpipForwardSupport extends
}
@Override
- public void sessionCreated(IoSession session) throws Exception {
- ChannelForwardedTcpip channel = new ChannelForwardedTcpip(session);
+ public void sessionCreated(final IoSession session) throws Exception {
+ final ChannelForwardedTcpip channel = new ChannelForwardedTcpip(session);
session.setAttribute(ChannelForwardedTcpip.class, channel);
this.session.registerChannel(channel);
- OpenFuture future = channel.open().await();
- Throwable t = future.getException();
- if (t instanceof Exception) {
- throw (Exception) t;
- } else if (t != null) {
- throw new Exception(t);
- }
+ channel.open().addListener(new SshFutureListener<OpenFuture>() {
+ public void operationComplete(OpenFuture future) {
+ Throwable t = future.getException();
+ if (t != null) {
+ TcpipForwardSupport.this.session.unregisterChannel(channel);
+ channel.close(false);
+ }
+ }
+ });
}
@Override
public void sessionClosed(IoSession session) throws Exception {
ChannelForwardedTcpip channel = (ChannelForwardedTcpip) session.getAttribute(ChannelForwardedTcpip.class);
- if ( channel != null ){
+ if (channel != null) {
channel.close(false);
}
}
@@ -159,6 +163,7 @@ public class TcpipForwardSupport extends
int r = ioBuffer.remaining();
byte[] b = new byte[r];
ioBuffer.get(b, 0, r);
+ channel.waitFor(ClientChannel.OPENED | ClientChannel.CLOSED, Long.MAX_VALUE);
channel.getOut().write(b, 0, r);
channel.getOut().flush();
}
@@ -178,6 +183,11 @@ public class TcpipForwardSupport extends
this.serverSession = serverSession;
}
+
+ public OpenFuture getOpenFuture() {
+ return openFuture;
+ }
+
public synchronized OpenFuture open() throws Exception {
InetSocketAddress remote = (InetSocketAddress) serverSession.getRemoteAddress();
InetSocketAddress local = (InetSocketAddress) serverSession.getLocalAddress();
Modified: mina/sshd/trunk/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java
URL: http://svn.apache.org/viewvc/mina/sshd/trunk/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java?rev=1341036&r1=1341035&r2=1341036&view=diff
==============================================================================
--- mina/sshd/trunk/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java (original)
+++ mina/sshd/trunk/sshd-core/src/test/java/org/apache/sshd/PortForwardingTest.java Mon May 21 14:08:52 2012
@@ -19,8 +19,11 @@
package org.apache.sshd;
import java.io.*;
+import java.lang.reflect.Field;
import java.net.*;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@@ -40,9 +43,11 @@ import org.apache.sshd.util.BogusPasswor
import org.apache.sshd.util.BogusForwardingFilter;
import org.apache.sshd.util.EchoShellFactory;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
+import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -52,6 +57,8 @@ import static org.junit.Assert.assertTru
*/
public class PortForwardingTest {
+ private final org.slf4j.Logger log = LoggerFactory.getLogger(getClass());
+
private SshServer sshd;
private int sshPort;
private int echoPort;
@@ -151,6 +158,121 @@ public class PortForwardingTest {
// Thread.sleep(1000000);
}
+ @Test(timeout = 20000)
+ public void testRemoteForwardingWithDisconnect() throws Exception {
+ Session session = createSession();
+
+ // 1. Create a Port Forward
+ int forwardedPort = getFreePort();
+ session.setPortForwardingR(forwardedPort, "localhost", echoPort);
+
+ // 2. Establish a connection through it
+ new Socket("localhost", forwardedPort);
+
+ // 3. Simulate the client going away
+ rudelyDisconnectJschSession(session);
+
+ // 4. Make sure the NIOprocessor is not stuck
+ {
+ Thread.sleep(1000);
+ // from here, we need to check all the threads running and find a
+ // "NioProcessor-"
+ // that is stuck on a PortForward.dispose
+ ThreadGroup root = Thread.currentThread().getThreadGroup().getParent();
+ while (root.getParent() != null) {
+ root = root.getParent();
+ }
+ boolean stuck;
+ do {
+ stuck = false;
+ for (Thread t : findThreads(root, "NioProcessor-")) {
+ stuck = true;
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+
+ }
+ } while (stuck);
+ }
+
+ session.delPortForwardingR(forwardedPort);
+ }
+
+ /**
+ * Close the socket inside this JSCH session. Use reflection to find it and
+ * just close it.
+ *
+ * @param session
+ * the Session to violate
+ * @throws Exception
+ */
+ private void rudelyDisconnectJschSession(Session session) throws Exception {
+ Field fSocket = session.getClass().getDeclaredField("socket");
+ fSocket.setAccessible(true);
+ Socket socket = (Socket) fSocket.get(session);
+
+ Assert.assertTrue("socket is not connected", socket.isConnected());
+ Assert.assertFalse("socket should not be closed", socket.isClosed());
+ socket.close();
+ Assert.assertTrue("socket has not closed", socket.isClosed());
+ }
+
+ private Set<Thread> findThreads(ThreadGroup group, String name) {
+ HashSet<Thread> ret = new HashSet<Thread>();
+ int numThreads = group.activeCount();
+ Thread[] threads = new Thread[numThreads * 2];
+ numThreads = group.enumerate(threads, false);
+ // Enumerate each thread in `group'
+ for (int i = 0; i < numThreads; ++i) {
+ // Get thread
+ // log.debug("Thread name: " + threads[i].getName());
+ if (checkThreadForPortForward(threads[i], name)) {
+ ret.add(threads[i]);
+ }
+ }
+ // didn't find the thread to check the
+ int numGroups = group.activeGroupCount();
+ ThreadGroup[] groups = new ThreadGroup[numGroups * 2];
+ numGroups = group.enumerate(groups, false);
+ for (int i = 0; i < numGroups; ++i) {
+ ret.addAll(findThreads(groups[i], name));
+ }
+ return ret;
+ }
+
+ private boolean checkThreadForPortForward(Thread thread, String name) {
+ if (thread == null)
+ return false;
+ // does it contain the name we're looking for?
+ if (thread.getName().contains(name)) {
+ // look at the stack
+ StackTraceElement[] stack = thread.getStackTrace();
+ if (stack.length == 0)
+ return false;
+ else {
+ // does it have
+ // 'org.apache.sshd.server.session.TcpipForwardSupport.close'?
+ for (int i = 0; i < stack.length; ++i) {
+ String clazzName = stack[i].getClassName();
+ String methodName = stack[i].getMethodName();
+ // log.debug("Class: " + clazzName);
+ // log.debug("Method: " + methodName);
+ if (clazzName
+ .equals("org.apache.sshd.server.session.TcpipForwardSupport")
+ && (methodName.equals("close") || methodName
+ .equals("sessionCreated"))) {
+ log.warn(thread.getName() + " stuck at " + clazzName
+ + "." + methodName + ": "
+ + stack[i].getLineNumber());
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
protected Session createSession() throws JSchException {
JSch sch = new JSch();
sch.setLogger(new Logger() {