You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2013/01/14 11:45:24 UTC

svn commit: r1432867 - in /tomcat/trunk/java/org/apache: coyote/AbstractProtocol.java coyote/http11/upgrade/AbstractServletOutputStream.java tomcat/websocket/WsRemoteEndpoint.java

Author: markt
Date: Mon Jan 14 10:45:24 2013
New Revision: 1432867

URL: http://svn.apache.org/viewvc?rev=1432867&view=rev
Log:
Fix various issues highlighted when running the Autobahn test suite on Linux.
- Don't register the socket for a read when a write event completes (may lead to thread starvation)
- Use a dedicated lock rather than the ServletOutputStream to protect the writing of a WebSocket message
- Add a lock to protect the write buffer from concurrent calls to writeInternal()
- Add a lock to ensure a consistent view of buffer and fireListenerLock

Modified:
    tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java
    tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java

Modified: tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java?rev=1432867&r1=1432866&r2=1432867&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java Mon Jan 14 10:45:24 2013
@@ -663,7 +663,14 @@ public abstract class AbstractProtocol i
                 } else if (state == SocketState.UPGRADED) {
                     // Need to keep the connection associated with the processor
                     connections.put(socket, processor);
-                    longPoll(wrapper, processor);
+                    // Don't add sockets back to the poller if this was a
+                    // non-blocking write otherwise the poller may trigger
+                    // multiple read events which may lead to thread starvation
+                    // in the connector. The write() method will add this this
+                    // socket to the poller if necessary.
+                    if (status != SocketStatus.OPEN_WRITE) {
+                        longPoll(wrapper, processor);
+                    }
                 } else {
                     // Connection closed. OK to recycle the processor. Upgrade
                     // processors are not recycled.

Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java?rev=1432867&r1=1432866&r2=1432867&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java Mon Jan 14 10:45:24 2013
@@ -28,6 +28,8 @@ public abstract class AbstractServletOut
     protected static final StringManager sm =
             StringManager.getManager(Constants.Package);
 
+    private final Object fireListenerLock = new Object();
+    private final Object nioWriteLock = new Object();
 
     // Start in blocking-mode
     private volatile WriteListener listener = null;
@@ -41,9 +43,13 @@ public abstract class AbstractServletOut
                     sm.getString("upgrade.sos.canWrite.is"));
         }
 
-        boolean result = (buffer == null);
-        fireListener = !result;
-        return result;
+        // Make sure canWrite() and onWritePossible() have a consistent view of
+        // buffer and fireListener when determining if the listener should fire
+        synchronized (fireListenerLock) {
+            boolean result = (buffer == null);
+            fireListener = !result;
+            return result;
+        }
     }
 
     @Override
@@ -90,14 +96,21 @@ public abstract class AbstractServletOut
             doWrite(true, b, off, len);
         } else {
             // Non-blocking IO
-            int written = doWrite(false, b, off, len);
-            if (written < len) {
-                // TODO: - Reuse the buffer
-                //       - Only reallocate if it gets too big (>8k?)
-                buffer = new byte[len - written];
-                System.arraycopy(b, off + written, buffer, 0, len - written);
-            } else {
-                buffer = null;
+            // If the non-blocking read does not complete, doWrite() will add
+            // the socket back into the poller. The poller way trigger a new
+            // write event before this method has finished updating buffer. This
+            // sync makes sure that buffer is updated before the next write
+            // executes.
+            synchronized (nioWriteLock) {
+                int written = doWrite(false, b, off, len);
+                if (written < len) {
+                    // TODO: - Reuse the buffer
+                    //       - Only reallocate if it gets too big (>8k?)
+                    buffer = new byte[len - written];
+                    System.arraycopy(b, off + written, buffer, 0, len - written);
+                } else {
+                    buffer = null;
+                }
             }
         }
     }
@@ -109,9 +122,13 @@ public abstract class AbstractServletOut
         } catch (IOException ioe) {
             throw new RuntimeException(ioe);
         }
-        if (buffer == null && fireListener) {
-            fireListener = false;
-            listener.onWritePossible();
+        // Make sure canWrite() and onWritePossible() have a consistent view of
+        // buffer and fireListener when determining if the listener should fire
+        synchronized (fireListenerLock) {
+            if (buffer == null && fireListener) {
+                fireListener = false;
+                listener.onWritePossible();
+            }
         }
     }
 

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java?rev=1432867&r1=1432866&r2=1432867&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java Mon Jan 14 10:45:24 2013
@@ -36,6 +36,8 @@ import javax.websocket.SendResult;
 
 public class WsRemoteEndpoint implements RemoteEndpoint {
 
+    private final Object messageWriteLock = new Object();
+
     private final ServletOutputStream sos;
     private final WsSession wsSession;
     // Max length for outgoing WebSocket frame header is 10 bytes
@@ -248,7 +250,9 @@ public class WsRemoteEndpoint implements
         }
         header.flip();
 
-        synchronized (sos) {
+        // Could sync on sos but don't as other (user or container) code may
+        // sync on this creating the potential for deadlocks.
+        synchronized (messageWriteLock) {
             doBlockingWrite(header);
             doBlockingWrite(message);
             try {



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org