You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2013/01/14 11:45:24 UTC
svn commit: r1432867 - in /tomcat/trunk/java/org/apache:
coyote/AbstractProtocol.java
coyote/http11/upgrade/AbstractServletOutputStream.java
tomcat/websocket/WsRemoteEndpoint.java
Author: markt
Date: Mon Jan 14 10:45:24 2013
New Revision: 1432867
URL: http://svn.apache.org/viewvc?rev=1432867&view=rev
Log:
Fix various issues highlighted when running the Autobahn test suite on Linux.
- Don't register the socket for a read when a write event completes (may lead to thread starvation)
- Use a dedicated lock rather than the ServletOutputStream to protect the writing of a WebSocket message
- Add a lock to protect the write buffer from concurrent calls to writeInternal()
- Add a lock to ensure a consistent view of buffer and fireListenerLock
Modified:
tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java
tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java
Modified: tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java?rev=1432867&r1=1432866&r2=1432867&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java Mon Jan 14 10:45:24 2013
@@ -663,7 +663,14 @@ public abstract class AbstractProtocol i
} else if (state == SocketState.UPGRADED) {
// Need to keep the connection associated with the processor
connections.put(socket, processor);
- longPoll(wrapper, processor);
+ // Don't add sockets back to the poller if this was a
+ // non-blocking write otherwise the poller may trigger
+ // multiple read events which may lead to thread starvation
+ // in the connector. The write() method will add this this
+ // socket to the poller if necessary.
+ if (status != SocketStatus.OPEN_WRITE) {
+ longPoll(wrapper, processor);
+ }
} else {
// Connection closed. OK to recycle the processor. Upgrade
// processors are not recycled.
Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java?rev=1432867&r1=1432866&r2=1432867&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java Mon Jan 14 10:45:24 2013
@@ -28,6 +28,8 @@ public abstract class AbstractServletOut
protected static final StringManager sm =
StringManager.getManager(Constants.Package);
+ private final Object fireListenerLock = new Object();
+ private final Object nioWriteLock = new Object();
// Start in blocking-mode
private volatile WriteListener listener = null;
@@ -41,9 +43,13 @@ public abstract class AbstractServletOut
sm.getString("upgrade.sos.canWrite.is"));
}
- boolean result = (buffer == null);
- fireListener = !result;
- return result;
+ // Make sure canWrite() and onWritePossible() have a consistent view of
+ // buffer and fireListener when determining if the listener should fire
+ synchronized (fireListenerLock) {
+ boolean result = (buffer == null);
+ fireListener = !result;
+ return result;
+ }
}
@Override
@@ -90,14 +96,21 @@ public abstract class AbstractServletOut
doWrite(true, b, off, len);
} else {
// Non-blocking IO
- int written = doWrite(false, b, off, len);
- if (written < len) {
- // TODO: - Reuse the buffer
- // - Only reallocate if it gets too big (>8k?)
- buffer = new byte[len - written];
- System.arraycopy(b, off + written, buffer, 0, len - written);
- } else {
- buffer = null;
+ // If the non-blocking read does not complete, doWrite() will add
+ // the socket back into the poller. The poller way trigger a new
+ // write event before this method has finished updating buffer. This
+ // sync makes sure that buffer is updated before the next write
+ // executes.
+ synchronized (nioWriteLock) {
+ int written = doWrite(false, b, off, len);
+ if (written < len) {
+ // TODO: - Reuse the buffer
+ // - Only reallocate if it gets too big (>8k?)
+ buffer = new byte[len - written];
+ System.arraycopy(b, off + written, buffer, 0, len - written);
+ } else {
+ buffer = null;
+ }
}
}
}
@@ -109,9 +122,13 @@ public abstract class AbstractServletOut
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
- if (buffer == null && fireListener) {
- fireListener = false;
- listener.onWritePossible();
+ // Make sure canWrite() and onWritePossible() have a consistent view of
+ // buffer and fireListener when determining if the listener should fire
+ synchronized (fireListenerLock) {
+ if (buffer == null && fireListener) {
+ fireListener = false;
+ listener.onWritePossible();
+ }
}
}
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java?rev=1432867&r1=1432866&r2=1432867&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java Mon Jan 14 10:45:24 2013
@@ -36,6 +36,8 @@ import javax.websocket.SendResult;
public class WsRemoteEndpoint implements RemoteEndpoint {
+ private final Object messageWriteLock = new Object();
+
private final ServletOutputStream sos;
private final WsSession wsSession;
// Max length for outgoing WebSocket frame header is 10 bytes
@@ -248,7 +250,9 @@ public class WsRemoteEndpoint implements
}
header.flip();
- synchronized (sos) {
+ // Could sync on sos but don't as other (user or container) code may
+ // sync on this creating the potential for deadlocks.
+ synchronized (messageWriteLock) {
doBlockingWrite(header);
doBlockingWrite(message);
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org