You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2013/11/20 14:51:34 UTC

svn commit: r1543815 - in /tomcat/trunk/java/org/apache/tomcat/websocket: LocalStrings.properties WsRemoteEndpointImplBase.java

Author: markt
Date: Wed Nov 20 13:51:34 2013
New Revision: 1543815

URL: http://svn.apache.org/r1543815
Log:
Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=55799
Implement the restriction required by the JSR356 specification that only one message can be written to a remote endpoint at a time.

Modified:
    tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties
    tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties?rev=1543815&r1=1543814&r2=1543815&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties Wed Nov 20 13:51:34 2013
@@ -60,6 +60,7 @@ wsRemoteEndpoint.flushOnCloseFailed=Flus
 wsRemoteEndpoint.inProgress=Message will not be sent because the WebSocket session is currently sending another message
 wsRemoteEndpoint.invalidEncoder=The specified encoder of type [{0}] could not be instantiated
 wsRemoteEndpoint.noEncoder=No encoder specified for object of class [{0}]
+wsRemoteEndpoint.wrongState=Remote endpoint was in state [{0}] but state [{1}] is required for this action
 
 # Note the following message is used as a close reason in a WebSocket control
 # frame and therefore must be 123 bytes (not characters) or less in length.

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java?rev=1543815&r1=1543814&r2=1543815&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java Wed Nov 20 13:51:34 2013
@@ -59,6 +59,8 @@ public abstract class WsRemoteEndpointIm
 
     private final Log log = LogFactory.getLog(WsRemoteEndpointImplBase.class);
 
+    private final StateMachine stateMachine = new StateMachine();
+
     private boolean messagePartInProgress = false;
     private final Queue<MessagePart> messagePartQueue = new ArrayDeque<>();
     private final Object messagePartLock = new Object();
@@ -114,7 +116,12 @@ public abstract class WsRemoteEndpointIm
 
 
     public void sendBytes(ByteBuffer data) throws IOException {
-        startMessageBlock(Constants.OPCODE_BINARY, data, true);
+        stateMachine.binaryStart();
+        try {
+            startMessageBlock(Constants.OPCODE_BINARY, data, true);
+        } finally {
+            stateMachine.complete(true);
+        }
     }
 
 
@@ -126,13 +133,20 @@ public abstract class WsRemoteEndpointIm
 
 
     public void sendBytesByCompletion(ByteBuffer data, SendHandler handler) {
-        startMessage(Constants.OPCODE_BINARY, data, true, handler);
+        StateUpdateSendHandler sush = new StateUpdateSendHandler(handler);
+        stateMachine.binaryStart();
+        startMessage(Constants.OPCODE_BINARY, data, true, sush);
     }
 
 
     public void sendPartialBytes(ByteBuffer partialByte, boolean last)
             throws IOException {
-        startMessageBlock(Constants.OPCODE_BINARY, partialByte, last);
+        stateMachine.binaryPartialStart();
+        try {
+            startMessageBlock(Constants.OPCODE_BINARY, partialByte, last);
+        } finally {
+            stateMachine.complete(last);
+        }
     }
 
 
@@ -151,6 +165,7 @@ public abstract class WsRemoteEndpointIm
 
 
     public void sendString(String text) throws IOException {
+        stateMachine.textStart();
         sendPartialString(CharBuffer.wrap(text), true);
     }
 
@@ -163,24 +178,29 @@ public abstract class WsRemoteEndpointIm
 
 
     public void sendStringByCompletion(String text, SendHandler handler) {
+        stateMachine.textStart();
         TextMessageSendHandler tmsh = new TextMessageSendHandler(handler,
                 CharBuffer.wrap(text), true, encoder, encoderBuffer, this);
         tmsh.write();
+        // TextMessageSendHandler will update stateMachine when it completes
     }
 
 
     public void sendPartialString(String fragment, boolean isLast)
             throws IOException {
+        stateMachine.textPartialStart();
         sendPartialString(CharBuffer.wrap(fragment), isLast);
     }
 
 
     public OutputStream getSendStream() {
+        stateMachine.streamStart();
         return new WsOutputStream(this);
     }
 
 
     public Writer getSendWriter() {
+        stateMachine.writeStart();
         return new WsWriter(this);
     }
 
@@ -634,6 +654,7 @@ public abstract class WsRemoteEndpointIm
         @Override
         public void onResult(SendResult result) {
             if (isDone || !result.isOK()) {
+                endpoint.stateMachine.complete(isLast);
                 handler.onResult(result);
             } else {
                 write();
@@ -915,4 +936,103 @@ public abstract class WsRemoteEndpointIm
             return encoder;
         }
     }
+
+
+    private static enum State {
+        OPEN,
+        STREAM_WRITING,
+        WRITER_WRITING,
+        BINARY_PARTIAL_WRITING,
+        BINARY_PARTIAL_READY,
+        BINARY_FULL_WRITING,
+        TEXT_PARTIAL_WRITING,
+        TEXT_PARTIAL_READY,
+        TEXT_FULL_WRITING
+    }
+
+
+    private static class StateMachine {
+        private State state = State.OPEN;
+
+        public synchronized void streamStart() {
+            checkState(State.OPEN);
+            state = State.STREAM_WRITING;
+        }
+
+        public synchronized void writeStart() {
+            checkState(State.OPEN);
+            state = State.WRITER_WRITING;
+        }
+
+        public synchronized void binaryPartialStart() {
+            checkState(State.OPEN, State.BINARY_PARTIAL_READY);
+            state = State.BINARY_PARTIAL_WRITING;
+        }
+
+        public synchronized void binaryStart() {
+            checkState(State.OPEN);
+            state = State.BINARY_FULL_WRITING;
+        }
+
+        public synchronized void textPartialStart() {
+            checkState(State.OPEN, State.TEXT_PARTIAL_READY);
+            state = State.TEXT_PARTIAL_WRITING;
+        }
+
+        public synchronized void textStart() {
+            checkState(State.OPEN);
+            state = State.TEXT_FULL_WRITING;
+        }
+
+        public synchronized void complete(boolean last) {
+            if (last) {
+                checkState(State.TEXT_PARTIAL_WRITING, State.TEXT_FULL_WRITING,
+                        State.BINARY_PARTIAL_WRITING, State.BINARY_FULL_WRITING,
+                        State.STREAM_WRITING, State.WRITER_WRITING);
+                state = State.OPEN;
+            } else {
+                checkState(State.TEXT_PARTIAL_WRITING, State.BINARY_PARTIAL_WRITING,
+                        State.STREAM_WRITING, State.WRITER_WRITING);
+                if (state == State.TEXT_PARTIAL_WRITING) {
+                    state = State.TEXT_PARTIAL_READY;
+                } else if (state == State.BINARY_PARTIAL_WRITING){
+                    state = State.BINARY_PARTIAL_READY;
+                } else if (state == State.WRITER_WRITING) {
+                    // NO-OP. Leave state as is.
+                } else if (state == State.STREAM_WRITING) {
+                 // NO-OP. Leave state as is.
+                } else {
+                    // Should never happen
+                    // TODO Better message
+                    throw new IllegalStateException();
+                }
+            }
+        }
+
+        private void checkState(State... required) {
+            for (State state : required) {
+                if (this.state == state) {
+                    return;
+                }
+            }
+            // TODO Better (well, any) message
+            throw new IllegalStateException();
+        }
+    }
+
+
+    private class StateUpdateSendHandler implements SendHandler {
+
+        private final SendHandler handler;
+
+        public StateUpdateSendHandler(SendHandler handler) {
+            this.handler = handler;
+        }
+
+        @Override
+        public void onResult(SendResult result) {
+            stateMachine.complete(true);
+            handler.onResult(result);
+        }
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org