You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2013/02/11 15:03:35 UTC

svn commit: r1444768 - in /tomcat/trunk/java/org/apache/tomcat/websocket: Constants.java LocalStrings.properties WsRemoteEndpointBase.java WsSession.java

Author: markt
Date: Mon Feb 11 14:03:35 2013
New Revision: 1444768

URL: http://svn.apache.org/r1444768
Log:
Implement new approach to locking

Modified:
    tomcat/trunk/java/org/apache/tomcat/websocket/Constants.java
    tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties
    tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java
    tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/Constants.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/Constants.java?rev=1444768&r1=1444767&r2=1444768&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/Constants.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/Constants.java Mon Feb 11 14:03:35 2013
@@ -31,6 +31,11 @@ public class Constants {
     public static final byte OPCODE_PING = 0x09;
     public static final byte OPCODE_PONG = 0x0A;
 
+    // Internal OP Codes
+    // RFC 6455 limits OP Codes to 4 bits so these should never clash
+    // Always set bit 4 so these will be treated as control codes
+    static final byte INTERNAL_OPCODE_FLUSH = 0x18;
+
     // Client connection
     public static final String HOST_HEADER_NAME = "Host";
     public static final String UPGRADE_HEADER_NAME = "Upgrade";

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties?rev=1444768&r1=1444767&r2=1444768&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties Mon Feb 11 14:03:35 2013
@@ -13,10 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-messageSendStateMachine.changeType=When sending a fragmented message, all fragments bust be of the same type
-messageSendStateMachine.closed=Message will not be sent because the WebSocket session has been closed
-messageSendStateMachine.inProgress=Message will not be sent because the WebSocket session is currently sending another message
-
 # Note the wsFrame.* messages are used as close reasons in WebSocket control
 # frames and therefore must be 123 bytes (not characters) or less in length.
 # Messages are encoded using UTF-8 where a single character may be encoded in
@@ -35,7 +31,10 @@ wsFrame.oneByteCloseCode=The client sent
 wsFrame.textMessageTooBig=The decoded text message was too big for the output buffer and the endpoint does not support partial messages
 wsFrame.wrongRsv=The client frame set the reserved bits to [{0}] which was not supported by this endpoint
 
+wsRemoteEndpoint.closed=Message will not be sent because the WebSocket session has been closed
+wsRemoteEndpoint.changeType=When sending a fragmented message, all fragments bust be of the same type
 wsRemoteEndpoint.concurrentMessageSend=Messages may not be sent concurrently even when using the asynchronous send messages. The client must wait for the previous message to complete before sending the next.
+wsRemoteEndpoint.inProgress=Message will not be sent because the WebSocket session is currently sending another message
 
 wsSession.duplicateHandlerBinary=A binary message handler has already been configured
 wsSession.duplicateHandlerPong=A pong message handler has already been configured

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java?rev=1444768&r1=1444767&r2=1444768&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java Mon Feb 11 14:03:35 2013
@@ -24,14 +24,14 @@ import java.nio.CharBuffer;
 import java.nio.charset.Charset;
 import java.nio.charset.CharsetEncoder;
 import java.nio.charset.CoderResult;
+import java.util.ArrayDeque;
+import java.util.Queue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 import javax.websocket.EncodeException;
 import javax.websocket.RemoteEndpoint;
@@ -45,10 +45,18 @@ public abstract class WsRemoteEndpointBa
     private static final StringManager sm =
             StringManager.getManager(Constants.PACKAGE_NAME);
 
-    private final ReentrantLock writeLock = new ReentrantLock();
-    private final Condition notInProgress = writeLock.newCondition();
-    // Must hold writeLock above to modify state
-    private final MessageSendStateMachine state = new MessageSendStateMachine();
+    private boolean messagePartInProgress = false;
+    private Queue<MessagePart> messagePartQueue = new ArrayDeque<>();
+    private final Object messagePartLock = new Object();
+    private boolean dataMessageInProgress = false;
+
+    // State
+    private boolean closed = false;
+    private boolean fragmented = false;
+    private boolean nextFragmented = false;
+    private boolean text = false;
+    private boolean nextText = false;
+
     // Max size of WebSocket header is 14 bytes
     private final ByteBuffer headerBuffer = ByteBuffer.allocate(14);
     private final ByteBuffer outputBuffer = ByteBuffer.allocate(8192);
@@ -89,38 +97,17 @@ public abstract class WsRemoteEndpointBa
 
     @Override
     public void flushBatch() {
-        // Have to hold lock to flush output buffer
-        writeLock.lock();
         try {
-            while (state.isInProgress()) {
-                notInProgress.await();
-            }
-            FutureToSendHandler f2sh = new FutureToSendHandler();
-            doWrite(f2sh, outputBuffer);
-            f2sh.get();
-        } catch (InterruptedException | ExecutionException e) {
+            startMessageBlock(Constants.INTERNAL_OPCODE_FLUSH, null, true);
+        } catch (IOException e) {
             // TODO Log this? Runtime exception? Something else?
-        } finally {
-            writeLock.unlock();
         }
     }
 
 
     @Override
     public void sendBytes(ByteBuffer data) throws IOException {
-        Future<SendResult> f = sendBytesByFuture(data);
-        try {
-            SendResult sr = f.get();
-            if (!sr.isOK()) {
-                if (sr.getException() == null) {
-                    throw new IOException();
-                } else {
-                    throw new IOException(sr.getException());
-                }
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            throw new IOException(e);
-        }
+        startMessageBlock(Constants.OPCODE_BINARY, data, true);
     }
 
 
@@ -133,72 +120,35 @@ public abstract class WsRemoteEndpointBa
 
 
     @Override
-    public void sendBytesByCompletion(ByteBuffer data, SendHandler completion) {
-        boolean locked = writeLock.tryLock();
-        if (!locked) {
-            throw new IllegalStateException(
-                    sm.getString("wsRemoteEndpoint.concurrentMessageSend"));
-        }
-        try {
-            byte opCode = Constants.OPCODE_BINARY;
-            boolean isLast = true;
-            sendMessage(opCode, data, isLast, completion);
-        } finally {
-            writeLock.unlock();
-        }
+    public void sendBytesByCompletion(ByteBuffer data, SendHandler handler) {
+        startMessage(Constants.OPCODE_BINARY, data, true, handler);
     }
 
 
     @Override
-    public void sendPartialBytes(ByteBuffer partialByte, boolean isLast)
+    public void sendPartialBytes(ByteBuffer partialByte, boolean last)
             throws IOException {
-        boolean locked = writeLock.tryLock();
-        if (!locked) {
-            throw new IllegalStateException(
-                    sm.getString("wsRemoteEndpoint.concurrentMessageSend"));
-        }
-        try {
-            byte opCode = Constants.OPCODE_BINARY;
-            FutureToSendHandler f2sh = new FutureToSendHandler();
-            sendMessage(opCode, partialByte, isLast, f2sh);
-            f2sh.get();
-        } catch (InterruptedException | ExecutionException e) {
-            throw new IOException(e);
-        } finally {
-            writeLock.unlock();
-        }
+        startMessageBlock(Constants.OPCODE_BINARY, partialByte, last);
     }
 
 
     @Override
     public void sendPing(ByteBuffer applicationData) throws IOException,
             IllegalArgumentException {
-        sendControlMessage(Constants.OPCODE_PING, applicationData);
+        startMessageBlock(Constants.OPCODE_PING, applicationData, true);
     }
 
 
     @Override
     public void sendPong(ByteBuffer applicationData) throws IOException,
             IllegalArgumentException {
-        sendControlMessage(Constants.OPCODE_PONG, applicationData);
+        startMessageBlock(Constants.OPCODE_PONG, applicationData, true);
     }
 
 
     @Override
     public void sendString(String text) throws IOException {
-        Future<SendResult> f = sendStringByFuture(text);
-        try {
-            SendResult sr = f.get();
-            if (!sr.isOK()) {
-                if (sr.getException() == null) {
-                    throw new IOException();
-                } else {
-                    throw new IOException(sr.getException());
-                }
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            throw new IOException(e);
-        }
+        sendPartialString(CharBuffer.wrap(text), true);
     }
 
 
@@ -211,19 +161,10 @@ public abstract class WsRemoteEndpointBa
 
 
     @Override
-    public void sendStringByCompletion(String text, SendHandler completion) {
-        boolean locked = writeLock.tryLock();
-        if (!locked) {
-            throw new IllegalStateException(
-                    sm.getString("wsRemoteEndpoint.concurrentMessageSend"));
-        }
-        try {
-            TextMessageSendHandler tmsh = new TextMessageSendHandler(completion,
-                    CharBuffer.wrap(text), true, encoder, encoderBuffer, this);
-            tmsh.write();
-        } finally {
-            writeLock.unlock();
-        }
+    public void sendStringByCompletion(String text, SendHandler handler) {
+        TextMessageSendHandler tmsh = new TextMessageSendHandler(handler,
+                CharBuffer.wrap(text), true, encoder, encoderBuffer, this);
+        tmsh.write();
     }
 
 
@@ -251,70 +192,137 @@ public abstract class WsRemoteEndpointBa
 
 
 
-
-    /**
-     * Sends a control message, blocking until the message is sent.
-     */
-    void sendControlMessage(byte opCode, ByteBuffer payload)
-            throws IOException{
-
-        // Close needs to be sent so disable batching. This will flush any
-        // messages in the buffer
-        if (opCode == Constants.OPCODE_CLOSE) {
-            setBatchingAllowed(false);
-        }
-
-        writeLock.lock();
+    void sendPartialString(CharBuffer part, boolean last) throws IOException {
         try {
-            if (state.isInProgress()) {
-                notInProgress.await();
-            }
             FutureToSendHandler f2sh = new FutureToSendHandler();
-            sendMessage(opCode, payload, true, f2sh);
+            TextMessageSendHandler tmsh = new TextMessageSendHandler(f2sh, part,
+                    last, encoder, encoderBuffer, this);
+            tmsh.write();
             f2sh.get();
         } catch (InterruptedException | ExecutionException e) {
             throw new IOException(e);
-        } finally {
-            notInProgress.signal();
-            writeLock.unlock();
         }
     }
 
 
-    void sendPartialString(CharBuffer fragment, boolean isLast)
+    void startMessageBlock(byte opCode, ByteBuffer payload, boolean last)
             throws IOException {
-        boolean locked = writeLock.tryLock();
-        if (!locked) {
-            throw new IllegalStateException(
-                    sm.getString("wsRemoteEndpoint.concurrentMessageSend"));
-        }
+        FutureToSendHandler f2sh = new FutureToSendHandler();
+        startMessage(opCode, payload, last, f2sh);
         try {
-            FutureToSendHandler f2sh = new FutureToSendHandler();
-            TextMessageSendHandler tmsh = new TextMessageSendHandler(f2sh,
-                    fragment, isLast, encoder, encoderBuffer, this);
-            tmsh.write();
-            f2sh.get();
+            SendResult sr = f2sh.get();
+            if (!sr.isOK()) {
+                if (sr.getException() == null) {
+                    throw new IOException();
+                } else {
+                    throw new IOException(sr.getException());
+                }
+            }
         } catch (InterruptedException | ExecutionException e) {
             throw new IOException(e);
-        } finally {
-            writeLock.unlock();
         }
     }
 
 
-    private void sendMessage(byte opCode, ByteBuffer payload, boolean last,
-            SendHandler completion) {
+    void startMessage(byte opCode, ByteBuffer payload, boolean last,
+            SendHandler handler) {
+        MessagePart mp = new MessagePart(opCode, payload, last, handler, this);
+
+        synchronized (messagePartLock) {
+            if (Constants.OPCODE_CLOSE == mp.getOpCode()) {
+                setBatchingAllowed(false);
+            }
+            if (messagePartInProgress) {
+                if (!Util.isControl(opCode)) {
+                    if (dataMessageInProgress) {
+                        throw new IllegalStateException(
+                                sm.getString("wsRemoteEndpoint.inProgress"));
+                    } else {
+                        dataMessageInProgress = true;
+                    }
+                }
+                messagePartQueue.add(mp);
+            } else {
+                messagePartInProgress = true;
+                writeMessagePart(mp);
+            }
+        }
+    }
+
+
+    void endMessage(SendHandler handler, SendResult result,
+            boolean dataMessage) {
+        synchronized (messagePartLock) {
+
+            if (closed) {
+                close();
+            } else {
+                fragmented = nextFragmented;
+                text = nextText;
+            }
+
+            if (dataMessage) {
+                dataMessageInProgress = false;
+            }
+            MessagePart mpNext = messagePartQueue.poll();
+            if (mpNext == null) {
+                messagePartInProgress = false;
+            } else {
+                writeMessagePart(mpNext);
+            }
+        }
+
+        handler.setResult(result);
+    }
+
 
-        if (!writeLock.isHeldByCurrentThread()) {
-            // Coding problem
+    void writeMessagePart(MessagePart mp) {
+
+        if (closed) {
             throw new IllegalStateException(
-                    "Must hold writeLock before calling this method");
+                    sm.getString("wsRemoteEndpoint.closed"));
         }
 
-        state.startMessage(opCode, last);
+        if (Constants.INTERNAL_OPCODE_FLUSH == mp.getOpCode()) {
+            nextFragmented = fragmented;
+            nextText = text;
+            doWrite(mp.getHandler(), outputBuffer);
+            return;
+        }
+
+        // Control messages may be sent in the middle of fragmented message
+        // so they have no effect on the fragmented or text flags
+        boolean first;
+        if (Util.isControl(mp.getOpCode())) {
+            nextFragmented = fragmented;
+            nextText = text;
+            if (mp.getOpCode() == Constants.OPCODE_CLOSE) {
+                closed = true;
+            }
+            first = true;
+        } else {
+            boolean isText = Util.isText(mp.getOpCode());
 
-        SendMessageSendHandler smsh =
-                new SendMessageSendHandler(state, completion, this);
+            if (fragmented) {
+                // Currently fragmented
+                if (text != isText) {
+                    throw new IllegalStateException(
+                            sm.getString("wsRemoteEndpoint.changeType"));
+                }
+                nextText = text;
+                nextFragmented = !mp.isLast();
+                first = false;
+            } else {
+                // Wasn't fragmented. Might be now
+                if (mp.isLast()) {
+                    nextFragmented = false;
+                } else {
+                    nextFragmented = true;
+                    nextText = isText;
+                }
+                first = true;
+            }
+        }
 
         byte[] mask;
 
@@ -325,33 +333,106 @@ public abstract class WsRemoteEndpointBa
         }
 
         headerBuffer.clear();
-        writeHeader(headerBuffer, opCode, payload, state.isFirst(), last,
-                isMasked(), mask);
+        writeHeader(headerBuffer, mp.getOpCode(), mp.getPayload(), first,
+                mp.isLast(), isMasked(), mask);
         headerBuffer.flip();
 
         if (getBatchingAllowed() || isMasked()) {
             // Need to write via output buffer
             OutputBufferSendHandler obsh = new OutputBufferSendHandler(
-                    smsh, headerBuffer, payload, mask, outputBuffer,
-                    !getBatchingAllowed(), this);
+                    mp.getHandler(), headerBuffer, mp.getPayload(), mask,
+                    outputBuffer, !getBatchingAllowed(), this);
             obsh.write();
         } else {
             // Can write directly
-            doWrite(smsh, headerBuffer, payload);
+            doWrite(mp.getHandler(), headerBuffer, mp.getPayload());
         }
+
     }
 
 
-    private void endMessage() {
-        writeLock.lock();
-        try {
-            notInProgress.signal();
-        } finally {
-            writeLock.unlock();
+    private static class MessagePart {
+        private final byte opCode;
+        private final ByteBuffer payload;
+        private final boolean last;
+        private final SendHandler handler;
+
+        public MessagePart(byte opCode, ByteBuffer payload, boolean last,
+                SendHandler handler, WsRemoteEndpointBase endpoint) {
+            this.opCode = opCode;
+            this.payload = payload;
+            this.last = last;
+            this.handler = new EndMessageHandler(
+                    endpoint, handler, !Util.isControl(opCode));
+        }
+
+
+        public byte getOpCode() {
+            return opCode;
+        }
+
+
+        public ByteBuffer getPayload() {
+            return payload;
+        }
+
+
+        public boolean isLast() {
+            return last;
+        }
+
+
+        public SendHandler getHandler() {
+            return handler;
         }
     }
 
 
+    /**
+     * Wraps the user provided handler so that the end point is notified when
+     * the message is complete.
+     */
+    private static class EndMessageHandler implements SendHandler {
+
+        private final WsRemoteEndpointBase endpoint;
+        private final SendHandler handler;
+        private final boolean dataMessage;
+
+        public EndMessageHandler(WsRemoteEndpointBase endpoint,
+                SendHandler handler, boolean dataMessage) {
+            this.endpoint = endpoint;
+            this.handler = handler;
+            this.dataMessage = dataMessage;
+        }
+
+
+        @Override
+        public void setResult(SendResult result) {
+            endpoint.endMessage(handler, result, dataMessage);
+        }
+    }
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
 
 
 
@@ -439,81 +520,6 @@ public abstract class WsRemoteEndpointBa
     }
 
 
-    private static class MessageSendStateMachine {
-        private boolean closed = false;
-        private boolean inProgress = false;
-        private boolean fragmented = false;
-        private boolean text = false;
-        private boolean first = false;
-
-        private boolean nextFragmented = false;
-        private boolean nextText = false;
-
-        public synchronized void startMessage(byte opCode, boolean isLast) {
-
-            if (closed) {
-                throw new IllegalStateException(
-                        sm.getString("messageSendStateMachine.closed"));
-            }
-
-            if (inProgress) {
-                throw new IllegalStateException(
-                        sm.getString("messageSendStateMachine.inProgress"));
-            }
-
-            inProgress = true;
-
-            // Control messages may be sent in the middle of fragmented message
-            // so they have no effect on the fragmented or text flags
-            if (Util.isControl(opCode)) {
-                nextFragmented = fragmented;
-                nextText = text;
-                if (opCode == Constants.OPCODE_CLOSE) {
-                    closed = true;
-                }
-                first = true;
-                return;
-            }
-
-            boolean isText = Util.isText(opCode);
-
-            if (fragmented) {
-                // Currently fragmented
-                if (text != isText) {
-                    throw new IllegalStateException(
-                            sm.getString("messageSendStateMachine.changeType"));
-                }
-                nextText = text;
-                nextFragmented = !isLast;
-                first = false;
-            } else {
-                // Wasn't fragmented. Might be now
-                if (isLast) {
-                    nextFragmented = false;
-                } else {
-                    nextFragmented = true;
-                    nextText = isText;
-                }
-                first = true;
-            }
-        }
-
-        public synchronized void endMessage() {
-            inProgress = false;
-            fragmented = nextFragmented;
-            text = nextText;
-        }
-
-        public synchronized boolean isInProgress() {
-            return inProgress;
-        }
-
-        public synchronized boolean isFirst() {
-            return first;
-        }
-    }
-
-
     private static class TextMessageSendHandler implements SendHandler {
 
         private final SendHandler handler;
@@ -543,7 +549,7 @@ public abstract class WsRemoteEndpointBa
             }
             isDone = !cr.isOverflow();
             buffer.flip();
-            endpoint.sendMessage(Constants.OPCODE_TEXT, buffer,
+            endpoint.startMessage(Constants.OPCODE_TEXT, buffer,
                     isDone && isLast, this);
         }
 
@@ -559,35 +565,6 @@ public abstract class WsRemoteEndpointBa
 
 
     /**
-     *  Wraps user provided {@link SendHandler} so that state is updated when
-     *  the message completes.
-     */
-    private static class SendMessageSendHandler implements SendHandler {
-
-        private final MessageSendStateMachine state;
-        private final SendHandler handler;
-        private final WsRemoteEndpointBase endpoint;
-
-        public SendMessageSendHandler(MessageSendStateMachine state,
-                SendHandler handler, WsRemoteEndpointBase endpoint) {
-            this.state = state;
-            this.handler = handler;
-            this.endpoint = endpoint;
-        }
-
-        @Override
-        public void setResult(SendResult result) {
-            state.endMessage();
-            if (state.closed) {
-                endpoint.close();
-            }
-            handler.setResult(result);
-            endpoint.endMessage();
-        }
-    }
-
-
-    /**
      * Used to write data to the output buffer, flushing the buffer if it fills
      * up.
      */
@@ -802,6 +779,7 @@ public abstract class WsRemoteEndpointBa
         }
 
         private void doWrite(boolean last) throws IOException {
+            buffer.flip();
             endpoint.sendPartialString(buffer, last);
             buffer.clear();
         }

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java?rev=1444768&r1=1444767&r2=1444768&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java Mon Feb 11 14:03:35 2013
@@ -253,8 +253,8 @@ public class WsSession implements Sessio
             }
             msg.flip();
             try {
-                wsRemoteEndpoint.sendControlMessage(
-                        Constants.OPCODE_CLOSE, msg);
+                wsRemoteEndpoint.startMessageBlock(
+                        Constants.OPCODE_CLOSE, msg, true);
             } catch (IOException ioe) {
                 // Unable to send close message.
                 // TODO - Ignore?



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org