You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2014/11/20 07:38:58 UTC

svn commit: r1640690 - in /tomcat/tc7.0.x/trunk: ./ java/org/apache/tomcat/websocket/ test/org/apache/tomcat/websocket/ webapps/docs/

Author: markt
Date: Thu Nov 20 06:38:58 2014
New Revision: 1640690

URL: http://svn.apache.org/r1640690
Log:
Fix various problems identified with flushing batched messages:
- Flush triggered by disabling batching failed to flip buffer before writing and also failed to clear the buffer after writing was complete. This resulted in duplicated and/or corrupted messages.
- The flush triggered by session close was too late since no writes are permitted once the close process starts. This resulted in an exception being thrown.

Modified:
    tomcat/tc7.0.x/trunk/   (props changed)
    tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties
    tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
    tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsSession.java
    tomcat/tc7.0.x/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java
    tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml

Propchange: tomcat/tc7.0.x/trunk/
------------------------------------------------------------------------------
  Merged /tomcat/trunk:r1640688
  Merged /tomcat/tc8.0.x/trunk:r1640689

Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties?rev=1640690&r1=1640689&r2=1640690&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties (original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties Thu Nov 20 06:38:58 2014
@@ -68,7 +68,7 @@ wsRemoteEndpoint.closedOutputStream=This
 wsRemoteEndpoint.closedWriter=This method may not be called as the Writer has been closed
 wsRemoteEndpoint.changeType=When sending a fragmented message, all fragments bust be of the same type
 wsRemoteEndpoint.concurrentMessageSend=Messages may not be sent concurrently even when using the asynchronous send messages. The client must wait for the previous message to complete before sending the next.
-wsRemoteEndpoint.flushOnCloseFailed=Flushing batched messages before closing the session failed
+wsRemoteEndpoint.flushOnCloseFailed=Batched messages still enabled after session has been closed. Unable to flush remaining batched message.
 wsRemoteEndpoint.invalidEncoder=The specified encoder of type [{0}] could not be instantiated
 wsRemoteEndpoint.noEncoder=No encoder specified for object of class [{0}]
 wsRemoteEndpoint.wrongState=The remote endpoint was in state [{0}] which is an invalid state for called method
@@ -86,6 +86,7 @@ wsSession.duplicateHandlerBinary=A binar
 wsSession.duplicateHandlerPong=A pong message handler has already been configured
 wsSession.duplicateHandlerText=A text message handler has already been configured
 wsSession.invalidHandlerTypePong=A pong message handler must implement MessageHandler.Basic
+wsSession.flushFailOnClose=Failed to flush batched messages on session close
 wsSession.messageFailed=Unable to write the complete message as the WebSocket connection has been closed
 wsSession.sendCloseFail=Failed to send close message to remote endpoint
 wsSession.removeHandlerFailed=Unable to remove the handler [{0}] as it was not registered with this session

Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java?rev=1640690&r1=1640689&r2=1640690&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java (original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java Thu Nov 20 06:38:58 2014
@@ -305,13 +305,10 @@ public abstract class WsRemoteEndpointIm
 
         boolean doWrite = false;
         synchronized (messagePartLock) {
-            if (Constants.OPCODE_CLOSE == mp.getOpCode()) {
-                try {
-                    setBatchingAllowed(false);
-                } catch (IOException e) {
-                    log.warn(sm.getString(
-                            "wsRemoteEndpoint.flushOnCloseFailed"), e);
-                }
+            if (Constants.OPCODE_CLOSE == mp.getOpCode() && getBatchingAllowed()) {
+                // Should not happen. To late to send batched messages now since
+                // the session has been closed. Complain loudly.
+                log.warn(sm.getString("wsRemoteEndpoint.flushOnCloseFailed"));
             }
             if (messagePartInProgress) {
                 // When a control message is sent while another message is being
@@ -385,7 +382,10 @@ public abstract class WsRemoteEndpointIm
         if (Constants.INTERNAL_OPCODE_FLUSH == mp.getOpCode()) {
             nextFragmented = fragmented;
             nextText = text;
-            doWrite(mp.getEndHandler(), outputBuffer);
+            outputBuffer.flip();
+            SendHandler flushHandler = new OutputBufferFlushSendHandler(
+                    outputBuffer, mp.getEndHandler());
+            doWrite(flushHandler, outputBuffer);
             return;
         }
 
@@ -866,6 +866,30 @@ public abstract class WsRemoteEndpointIm
         }
     }
 
+
+    /**
+     * Ensures that tne output buffer is cleared after it has been flushed.
+     */
+    private static class OutputBufferFlushSendHandler implements SendHandler {
+
+        private final ByteBuffer outputBuffer;
+        private final SendHandler handler;
+
+        public OutputBufferFlushSendHandler(ByteBuffer outputBuffer, SendHandler handler) {
+            this.outputBuffer = outputBuffer;
+            this.handler = handler;
+        }
+
+        @Override
+        public void onResult(SendResult result) {
+            if (result.isOK()) {
+                outputBuffer.clear();
+            }
+            handler.onResult(result);
+        }
+    }
+
+
     private class WsOutputStream extends OutputStream {
 
         private final WsRemoteEndpointImplBase endpoint;

Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsSession.java
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsSession.java?rev=1640690&r1=1640689&r2=1640690&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsSession.java (original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsSession.java Thu Nov 20 06:38:58 2014
@@ -436,6 +436,13 @@ public class WsSession implements Sessio
                 return;
             }
 
+            try {
+                wsRemoteEndpoint.setBatchingAllowed(false);
+            } catch (IOException e) {
+                log.warn(sm.getString("wsSession.flushFailOnClose"), e);
+                fireEndpointOnError(e);
+            }
+
             state = State.CLOSING;
 
             sendCloseMessage(closeReasonMessage);
@@ -461,6 +468,12 @@ public class WsSession implements Sessio
 
         synchronized (stateLock) {
             if (state == State.OPEN) {
+                try {
+                    wsRemoteEndpoint.setBatchingAllowed(false);
+                } catch (IOException e) {
+                    log.warn(sm.getString("wsSession.flushFailOnClose"), e);
+                    fireEndpointOnError(e);
+                }
                 sendCloseMessage(closeReason);
                 fireEndpointOnClose(closeReason);
                 state = State.CLOSED;
@@ -471,7 +484,6 @@ public class WsSession implements Sessio
         }
     }
 
-
     private void fireEndpointOnClose(CloseReason closeReason) {
 
         // Fire the onClose event
@@ -489,6 +501,21 @@ public class WsSession implements Sessio
     }
 
 
+
+    private void fireEndpointOnError(Throwable throwable) {
+
+        // Fire the onError event
+        Thread t = Thread.currentThread();
+        ClassLoader cl = t.getContextClassLoader();
+        t.setContextClassLoader(applicationClassLoader);
+        try {
+            localEndpoint.onError(this, throwable);
+        } finally {
+            t.setContextClassLoader(cl);
+        }
+    }
+
+
     private void sendCloseMessage(CloseReason closeReason) {
         // 125 is maximum size for the payload of a control message
         ByteBuffer msg = ByteBuffer.allocate(125);

Modified: tomcat/tc7.0.x/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java?rev=1640690&r1=1640689&r2=1640690&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java (original)
+++ tomcat/tc7.0.x/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java Thu Nov 20 06:38:58 2014
@@ -119,10 +119,14 @@ public class TesterFirehoseServer {
 
             for (int i = 0; i < MESSAGE_COUNT; i++) {
                 remote.sendText(MESSAGE);
+                if (i % (MESSAGE_COUNT * 0.4) == 0) {
+                    remote.setBatchingAllowed(false);
+                    remote.setBatchingAllowed(true);
+                }
             }
 
-            // Ensure remaining messages are flushed
-            remote.setBatchingAllowed(false);
+            // Flushing should happen automatically on session close
+            session.close();
         }
 
         @OnError

Modified: tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml?rev=1640690&r1=1640689&r2=1640690&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml Thu Nov 20 06:38:58 2014
@@ -78,6 +78,14 @@
       </fix>
     </changelog>
   </subsection>
+  <subsection name="WebSocket">
+    <changelog>
+      <fix>
+        Correct multiple issues with the flushing of batched messages that could
+        lead to duplicate and/or corrupt messages. (markt)
+      </fix>
+    </changelog>
+  </subsection>
   <subsection name="Web applications">
     <changelog>
       <fix>



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org