You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by kp...@apache.org on 2013/10/23 01:32:26 UTC

svn commit: r1534848 - in /tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard: Client.java DrawboardEndpoint.java Room.java

Author: kpreisser
Date: Tue Oct 22 23:32:26 2013
New Revision: 1534848

URL: http://svn.apache.org/r1534848
Log:
Merged revision(s) 1534846 from tomcat/trunk:
- Prevent recursive invocation of Runnables by Room.invokeAndWait() to prevent errors like ConcurrentModificationException when Room.broadcastRoomMessage() iterates over an ArrayList and then calls Room.invokeAndWait() recursivly, iterating again over the array.
- Add comment about blocking Session.close() method.

Modified:
    tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Client.java
    tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/DrawboardEndpoint.java
    tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Room.java

Modified: tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Client.java
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Client.java?rev=1534848&r1=1534847&r2=1534848&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Client.java (original)
+++ tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Client.java Tue Oct 22 23:32:26 2013
@@ -105,6 +105,17 @@ public class Client {
                                 CloseCodes.VIOLATED_POLICY,
                                 "Send Buffer exceeded");
                         try {
+                            // TODO: close() may block if the remote endpoint doesn't read the data
+                            // (eventually there will be a TimeoutException). However, this method
+                            // (sendMessage) is intended to run asynchronous code and shouldn't
+                            // block. Otherwise it would temporarily stop processing of messages
+                            // from other clients.
+                            // Maybe call this method on another thread.
+                            // Note that when this method is called, the RemoteEndpoint.Async
+                            // is still in the process of sending data, so there probably should
+                            // be another way to abort the Websocket connection.
+                            // Ideally, there should be some abort() method that cancels the
+                            // connection immediately...
                             session.close(cr);
                         } catch (IOException e) {
                             // Ignore
@@ -188,6 +199,21 @@ public class Client {
     private final SendHandler sendHandler = new SendHandler() {
         @Override
         public void onResult(SendResult result) {
+            if (!result.isOK()) {
+                // Message could not be sent. In this case, we don't
+                // set isSendingMessage to false because we must assume the connection
+                // broke (and onClose will be called), so we don't try to send
+                // other messages.
+                // As a precaution, we close the session (e.g. if a send timeout occured).
+                // TODO: session.close() blocks, while this handler shouldn't block.
+                // Ideally, there should be some abort() method that cancels the
+                // connection immediately...
+                try {
+                    session.close();
+                } catch (IOException ex) {
+                    // Ignore
+                }
+            }
             synchronized (messagesToSend) {
 
                 if (!messagesToSend.isEmpty()) {

Modified: tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/DrawboardEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/DrawboardEndpoint.java?rev=1534848&r1=1534847&r2=1534848&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/DrawboardEndpoint.java (original)
+++ tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/DrawboardEndpoint.java Tue Oct 22 23:32:26 2013
@@ -123,6 +123,11 @@ public final class DrawboardEndpoint ext
                         if (player != null) {
                             // Remove this player from the room.
                             player.removeFromRoom();
+
+                            // Set player to null to prevent NPEs when onMessage events
+                            // are processed (from other threads) after onClose has been
+                            // called from different thread which closed the Websocket session.
+                            player = null;
                         }
                     } catch (RuntimeException ex) {
                         log.error("Unexpected exception: " + ex.toString(), ex);

Modified: tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Room.java
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Room.java?rev=1534848&r1=1534847&r2=1534848&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Room.java (original)
+++ tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Room.java Tue Oct 22 23:32:26 2013
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.imageio.ImageIO;
 
@@ -85,9 +86,9 @@ public final class Room {
 
 
     /**
-     * An object used to synchronize access to this Room.
+     * The lock used to synchronize access to this Room.
      */
-    private final Object syncObj = new Object();
+    private final ReentrantLock roomLock = new ReentrantLock();
 
     /**
      * Indicates if this room has already been shutdown.
@@ -196,7 +197,8 @@ public final class Room {
      * @param p
      */
     private void internalRemovePlayer(Player p) {
-        players.remove(p);
+        boolean removed = players.remove(p);
+        assert removed;
 
         // Broadcast that one player is removed.
         broadcastRoomMessage(MessageType.PLAYER_CHANGED, "-");
@@ -292,19 +294,63 @@ public final class Room {
         }
     }
 
+    /**
+     * A list of cached {@link Runnable}s to prevent recursive invocation of Runnables
+     * by one thread. This variable is only used by one thread at a time and then
+     * set to <code>null</code>.
+     */
+    private List<Runnable> cachedRunnables = null;
 
     /**
      * Submits the given Runnable to the Room Executor and waits until it
      * has been executed. Currently, this simply means that the Runnable
-     * will be run directly inside of a synchronized() block.
+     * will be run directly inside of a synchronized() block.<br>
+     * Note that if a runnable recursively calls invokeAndWait() with another
+     * runnable on this Room, it will not be executed recursively, but instead
+     * cached until the original runnable is finished, to keep the behavior of
+     * using a Executor.
      * @param task
      */
     public void invokeAndWait(Runnable task)  {
-        synchronized (syncObj) {
-            if (!closed) {
-                task.run();
+
+        // Check if the current thread already holds a lock on this room.
+        // If yes, then we must not directly execute the Runnable but instead
+        // cache it until the original invokeAndWait() has finished.
+        if (roomLock.isHeldByCurrentThread()) {
+
+            if (cachedRunnables == null) {
+                cachedRunnables = new ArrayList<Runnable>();
+            }
+            cachedRunnables.add(task);
+
+        } else {
+
+            roomLock.lock();
+            try {
+                // Explicitely overwrite value to ensure data consistency in
+                // current thread
+                cachedRunnables = null;
+
+                if (!closed) {
+                    task.run();
+                }
+
+                // Run the cached runnables.
+                if (cachedRunnables != null) {
+                    for (int i = 0; i < cachedRunnables.size(); i++) {
+                        if (!closed) {
+                            cachedRunnables.get(i).run();
+                        }
+                    }
+                    cachedRunnables = null;
+                }
+
+            } finally {
+                roomLock.unlock();
             }
+
         }
+
     }
 
     /**



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org