You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2012/12/27 11:24:00 UTC

svn commit: r1426153 - in /tomcat/trunk: java/org/apache/tomcat/websocket/WsFrame.java java/org/apache/tomcat/websocket/WsRemoteEndpoint.java webapps/examples/WEB-INF/classes/websocket/echo/EchoAnnotation.java webapps/examples/WEB-INF/web.xml

Author: markt
Date: Thu Dec 27 10:24:00 2012
New Revision: 1426153

URL: http://svn.apache.org/viewvc?rev=1426153&view=rev
Log:
Make the annotation based echo example support partial data so huge buffers aren't required to pass the Autobahn tests
Simplify WsFrame by replacing frameStart and headerLength with readPos. When the frame is bigger than the input buffer the former didn't make much sense anyway.
Re-write the processing of text data to make it easier to follow.
Make the application responsible for thread-safety of Servlet[Input|Output]Stream

Modified:
    tomcat/trunk/java/org/apache/tomcat/websocket/WsFrame.java
    tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java
    tomcat/trunk/webapps/examples/WEB-INF/classes/websocket/echo/EchoAnnotation.java
    tomcat/trunk/webapps/examples/WEB-INF/web.xml

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsFrame.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsFrame.java?rev=1426153&r1=1426152&r2=1426153&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsFrame.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsFrame.java Thu Dec 27 10:24:00 2012
@@ -68,16 +68,14 @@ public class WsFrame {
     private boolean fin = false;
     private int rsv = 0;
     private byte opCode = 0;
-    private int frameStart = 0;
-    private int headerLength = 0;
     private byte[] mask = new byte[4];
     private int maskIndex = 0;
     private long payloadLength = 0;
-    private int payloadRead = 0;
     private long payloadWritten = 0;
 
     // Attributes tracking state
     private State state = State.NEW_FRAME;
+    private int readPos = 0;
     private int writePos = 0;
 
     public WsFrame(ServletInputStream sis, WsSession wsSession) {
@@ -97,31 +95,33 @@ public class WsFrame {
      * Called when there is data in the ServletInputStream to process.
      */
     public void onDataAvailable() throws IOException {
-        while (sis.isReady()) {
-            // Fill up the input buffer with as much data as we can
-            int read = sis.read(inputBuffer, writePos,
-                    inputBuffer.length - writePos);
-            if (read == 0) {
-                return;
-            }
-            if (read == -1) {
-                throw new EOFException();
-            }
-            writePos += read;
-            while (true) {
-                if (state == State.NEW_FRAME) {
-                    if (!processInitialHeader()) {
-                        break;
-                    }
+        synchronized (sis) {
+            while (sis.isReady()) {
+                // Fill up the input buffer with as much data as we can
+                int read = sis.read(inputBuffer, writePos,
+                        inputBuffer.length - writePos);
+                if (read == 0) {
+                    return;
                 }
-                if (state == State.PARTIAL_HEADER) {
-                    if (!processRemainingHeader()) {
-                        break;
-                    }
+                if (read == -1) {
+                    throw new EOFException();
                 }
-                if (state == State.DATA) {
-                    if (!processData()) {
-                        break;
+                writePos += read;
+                while (true) {
+                    if (state == State.NEW_FRAME) {
+                        if (!processInitialHeader()) {
+                            break;
+                        }
+                    }
+                    if (state == State.PARTIAL_HEADER) {
+                        if (!processRemainingHeader()) {
+                            break;
+                        }
+                    }
+                    if (state == State.DATA) {
+                        if (!processData()) {
+                            break;
+                        }
                     }
                 }
             }
@@ -135,10 +135,10 @@ public class WsFrame {
      */
     private boolean processInitialHeader() throws IOException {
         // Need at least two bytes of data to do this
-        if (writePos - frameStart < 2) {
+        if (writePos - readPos < 2) {
             return false;
         }
-        int b = inputBuffer[frameStart];
+        int b = inputBuffer[readPos++];
         fin = (b & 0x80) > 0;
         rsv = (b & 0x70) >>> 4;
         if (rsv != 0) {
@@ -183,7 +183,7 @@ public class WsFrame {
             }
             continuationExpected = !fin;
         }
-        b = inputBuffer[frameStart + 1];
+        b = inputBuffer[readPos++];
         // Client data must be masked
         if ((b & 0x80) == 0) {
             throw new IOException(sm.getString("wsFrame.notMasked"));
@@ -199,22 +199,24 @@ public class WsFrame {
      *         processing of the header
      */
     private boolean processRemainingHeader() throws IOException {
-        // Initial 2 bytes already read + 4 for the mask
-        headerLength = 6;
+        // Ignore the 2 bytes already read. 4 for the mask
+        int headerLength = 4;
         // Add additional bytes depending on length
         if (payloadLength == 126) {
             headerLength += 2;
         } else if (payloadLength == 127) {
             headerLength += 8;
         }
-        if (writePos - frameStart < headerLength) {
+        if (writePos - readPos < headerLength) {
             return false;
         }
         // Calculate new payload length if necessary
         if (payloadLength == 126) {
             payloadLength = byteArrayToLong(inputBuffer, 2, 2);
+            readPos += 2;
         } else if (payloadLength == 127) {
             payloadLength = byteArrayToLong(inputBuffer, 2, 8);
+            readPos += 8;
         }
         if (isControl()) {
             if (payloadLength > 125) {
@@ -228,9 +230,9 @@ public class WsFrame {
                 throw new IOException("wsFrame.controlNoFin");
             }
         }
-        System.arraycopy(inputBuffer, frameStart + headerLength - 4, mask, 0, 4);
+        System.arraycopy(inputBuffer, readPos, mask, 0, 4);
+        readPos += 4;
         state = State.DATA;
-        payloadRead = frameStart + headerLength;
         return true;
     }
 
@@ -239,87 +241,16 @@ public class WsFrame {
         checkRoomPayload();
         if (isControl()) {
             return processDataControl();
-        }
-
-        // Unmask data
-        appendPayloadToMessage(messageBufferBinary);
-
-        if (textMessage) {
-            // Convert the bytes to text as early as possible to catch any
-            // conversion issues
-            messageBufferBinary.flip();
-            boolean last = false;
-            while (true) {
-                CoderResult cr = utf8DecoderMessage.decode(
-                        messageBufferBinary, messageBufferText, last);
-                if (cr.isError()) {
-                    throw new WsIOException(new CloseReason(
-                            CloseCodes.NOT_CONSISTENT,
-                            sm.getString("wsFrame.invalidUtf8")));
-                } else if (cr.isOverflow()) {
-                    if (usePartial()) {
-                        messageBufferText.flip();
-                        sendMessageText(false);
-                        messageBufferText.clear();
-                    } else {
-                        throw new WsIOException(new CloseReason(
-                                CloseCodes.TOO_BIG,
-                                sm.getString("wsFrame.textMessageTooBig")));
-                    }
-                } else if (cr.isUnderflow() && !last) {
-                    // Need more data
-                    messageBufferBinary.compact();
-                    if (payloadWritten == payloadLength) {
-                        if (continuationExpected) {
-                            newFrame();
-                            return true;
-                        } else {
-                            messageBufferBinary.flip();
-                            last = true;
-                        }
-                    } else {
-                        return false;
-                    }
-                } else {
-                    // End of input
-                    messageBufferText.flip();
-                    sendMessageText(true);
-                    messageBufferText.clear();
-                    newMessage();
-                    return true;
-                }
-            }
+        } else if (textMessage) {
+            return processDataText();
         } else {
-            if (payloadWritten == payloadLength) {
-                if (continuationExpected) {
-                    if (usePartial()) {
-                        messageBufferBinary.flip();
-                        sendMessageBinary(false);
-                        messageBufferBinary.clear();
-                    }
-                    newFrame();
-                    return true;
-                } else {
-                    messageBufferBinary.flip();
-                    sendMessageBinary(true);
-                    newMessage();
-                    return true;
-                }
-            } else {
-                if (usePartial()) {
-                    messageBufferBinary.flip();
-                    sendMessageBinary(false);
-                    messageBufferBinary.clear();
-                }
-                return false;
-            }
+            return processDataBinary();
         }
     }
 
 
     private boolean processDataControl() throws IOException {
-        appendPayloadToMessage(controlBufferBinary);
-        if (writePos < frameStart + headerLength + payloadLength) {
+        if (!appendPayloadToMessage(controlBufferBinary)) {
             return false;
         }
         controlBufferBinary.flip();
@@ -391,6 +322,125 @@ public class WsFrame {
     }
 
 
+    private boolean processDataText() throws IOException {
+        // Copy the available data to the buffer
+        while (!appendPayloadToMessage(messageBufferBinary)) {
+            // Frame not complete - we ran out of something
+            // Convert bytes to UTF-8
+            messageBufferBinary.flip();
+            while (true) {
+                CoderResult cr = utf8DecoderMessage.decode(
+                        messageBufferBinary, messageBufferText, false);
+                if (cr.isError()) {
+                    throw new WsIOException(new CloseReason(
+                            CloseCodes.NOT_CONSISTENT,
+                            sm.getString("wsFrame.invalidUtf8")));
+                } else if (cr.isOverflow()) {
+                    // Ran out of space in text buffer - flush it
+                    if (usePartial()) {
+                        messageBufferText.flip();
+                        sendMessageText(false);
+                        messageBufferText.clear();
+                    } else {
+                        throw new WsIOException(new CloseReason(
+                                CloseCodes.TOO_BIG,
+                                sm.getString("wsFrame.textMessageTooBig")));
+                    }
+                } else if (cr.isUnderflow()) {
+                    // Need more input
+                    // Compact what we have to create as much space as possible
+                    messageBufferBinary.compact();
+
+                    // What did we run out of?
+                    if (readPos == writePos) {
+                        // Ran out of input data - get some more
+                        return false;
+                    } else {
+                        // Ran out of message buffer - exit inner loop and
+                        // refill
+                        break;
+                    }
+                }
+            }
+        }
+
+        messageBufferBinary.flip();
+        boolean last = false;
+        // Frame is fully received
+        // Convert bytes to UTF-8
+        while (true) {
+            CoderResult cr = utf8DecoderMessage.decode(messageBufferBinary,
+                    messageBufferText, last);
+            if (cr.isError()) {
+                throw new WsIOException(new CloseReason(
+                        CloseCodes.NOT_CONSISTENT,
+                        sm.getString("wsFrame.invalidUtf8")));
+            } else if (cr.isOverflow()) {
+                // Ran out of space in text buffer - flush it
+                if (usePartial()) {
+                    messageBufferText.flip();
+                    sendMessageText(false);
+                    messageBufferText.clear();
+                } else {
+                    throw new WsIOException(new CloseReason(
+                            CloseCodes.TOO_BIG,
+                            sm.getString("wsFrame.textMessageTooBig")));
+                }
+            } else if (cr.isUnderflow() & !last) {
+                // End of frame and possible message as well.
+
+                if (continuationExpected) {
+                    messageBufferBinary.compact();
+                    newFrame();
+                    // Process next frame
+                    return true;
+                } else {
+                    // Make sure coder has flushed all output
+                    last = true;
+                }
+            } else {
+                // End of message
+                messageBufferText.flip();
+                sendMessageText(true);
+
+                newMessage();
+                return true;
+            }
+        }
+    }
+
+
+    private boolean processDataBinary() {
+        // Copy the available data to the buffer
+        while (!appendPayloadToMessage(messageBufferBinary)) {
+            // Frame not complete - what did we run out of?
+            if (readPos == writePos) {
+                // Ran out of input data - get some more
+                return false;
+            } else {
+                // Ran out of message buffer - flush it
+                messageBufferBinary.flip();
+                sendMessageBinary(false);
+                messageBufferBinary.clear();
+            }
+        }
+
+        // Frame is fully received
+        if (continuationExpected) {
+            // More data for this message expected
+            newFrame();
+        } else {
+            // Message is complete - send it
+            messageBufferBinary.flip();
+            sendMessageBinary(true);
+            messageBufferBinary.clear();
+            newMessage();
+        }
+
+        return true;
+    }
+
+
     @SuppressWarnings("unchecked")
     private void sendMessageBinary(boolean last) {
         MessageHandler mh = wsSession.getBinaryMessageHandler();
@@ -415,19 +465,18 @@ public class WsFrame {
 
 
     private void newFrame() {
-        if (frameStart + headerLength + payloadLength == writePos) {
-            frameStart = 0;
+        if (readPos == writePos) {
+            readPos = 0;
             writePos = 0;
-        } else {
-            frameStart = frameStart + headerLength + (int) payloadLength;
         }
 
-        // These get reset in processInitialHeader()
-        // fin, rsv, opCode, headerLength, payloadLength, mask
         maskIndex = 0;
-        payloadRead = 0;
         payloadWritten = 0;
         state = State.NEW_FRAME;
+
+        // These get reset in processInitialHeader()
+        // fin, rsv, opCode, payloadLength, mask
+
         checkRoomHeaders();
     }
 
@@ -435,7 +484,7 @@ public class WsFrame {
     private void checkRoomHeaders() {
         // Is the start of the current frame too near the end of the input
         // buffer?
-        if (inputBuffer.length - frameStart < 131) {
+        if (inputBuffer.length - readPos < 131) {
             // Limit based on a control frame with a full payload
             makeRoom();
         }
@@ -443,22 +492,16 @@ public class WsFrame {
 
 
     private void checkRoomPayload() throws IOException {
-        long frameSize = headerLength + payloadLength;
-        if (inputBuffer.length - frameStart - frameSize < 0) {
+        if (inputBuffer.length - readPos - payloadLength + payloadWritten < 0) {
             if (isControl()) {
                 makeRoom();
                 return;
             }
-            // Might not be enough room
-            if (usePartial()) {
-                // Not a problem - can use partial messages
-                return;
-            }
-            if (inputBuffer.length < frameSize) {
+            if (!usePartial() && (inputBuffer.length < payloadLength)) {
                 // TODO i18n - buffer too small
                 CloseReason cr = new CloseReason(CloseCodes.TOO_BIG,
                         "Buffer size: [" + inputBuffer.length +
-                        "], frame size: [" + frameSize + "]");
+                        "], payload size: [" + payloadLength + "]");
                 wsSession.close(cr);
                 wsSession.onClose(cr);
                 throw new IOException(cr.getReasonPhrase());
@@ -469,11 +512,10 @@ public class WsFrame {
 
 
     private void makeRoom() {
-        System.arraycopy(inputBuffer, frameStart, inputBuffer, 0,
-                writePos - frameStart);
-        writePos = writePos - frameStart;
-        payloadRead = payloadRead - frameStart;
-        frameStart = 0;
+        System.arraycopy(inputBuffer, readPos, inputBuffer, 0,
+                writePos - readPos);
+        writePos = writePos - readPos;
+        readPos = 0;
     }
 
 
@@ -497,17 +539,19 @@ public class WsFrame {
     }
 
 
-    private void appendPayloadToMessage(ByteBuffer dest) {
-        while (payloadWritten < payloadLength && payloadRead < writePos) {
-            byte b = (byte) ((inputBuffer[payloadRead] ^ mask[maskIndex]) & 0xFF);
+    private boolean appendPayloadToMessage(ByteBuffer dest) {
+        while (payloadWritten < payloadLength && readPos < writePos &&
+                dest.hasRemaining()) {
+            byte b = (byte) ((inputBuffer[readPos] ^ mask[maskIndex]) & 0xFF);
             maskIndex++;
             if (maskIndex == 4) {
                 maskIndex = 0;
             }
-            payloadRead++;
+            readPos++;
             payloadWritten++;
             dest.put(b);
         }
+        return (payloadWritten == payloadLength);
     }
 
 

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java?rev=1426153&r1=1426152&r2=1426153&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java Thu Dec 27 10:24:00 2012
@@ -244,22 +244,24 @@ public class WsRemoteEndpoint implements
         }
         header.flip();
 
-        doBlockingWrite(header);
-        doBlockingWrite(message);
-        try {
-            sos.flush();
-        } catch (IOException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        }
-
-        if (Constants.OPCODE_CLOSE == opCode) {
+        synchronized (sos) {
+            doBlockingWrite(header);
+            doBlockingWrite(message);
             try {
-                sos.close();
+                sos.flush();
             } catch (IOException e) {
                 // TODO Auto-generated catch block
                 e.printStackTrace();
             }
+
+            if (Constants.OPCODE_CLOSE == opCode) {
+                try {
+                    sos.close();
+                } catch (IOException e) {
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
+                }
+            }
         }
     }
 

Modified: tomcat/trunk/webapps/examples/WEB-INF/classes/websocket/echo/EchoAnnotation.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/examples/WEB-INF/classes/websocket/echo/EchoAnnotation.java?rev=1426153&r1=1426152&r2=1426153&view=diff
==============================================================================
--- tomcat/trunk/webapps/examples/WEB-INF/classes/websocket/echo/EchoAnnotation.java (original)
+++ tomcat/trunk/webapps/examples/WEB-INF/classes/websocket/echo/EchoAnnotation.java Thu Dec 27 10:24:00 2012
@@ -16,9 +16,11 @@
  */
 package websocket.echo;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import javax.websocket.PongMessage;
+import javax.websocket.Session;
 import javax.websocket.WebSocketMessage;
 import javax.websocket.server.WebSocketEndpoint;
 
@@ -26,13 +28,30 @@ import javax.websocket.server.WebSocketE
 public class EchoAnnotation {
 
     @WebSocketMessage
-    public String echoTextMessage(String msg) {
-        return msg;
+    public void echoTextMessage(Session session, String msg, boolean last) {
+        try {
+            session.getRemote().sendPartialString(msg, last);
+        } catch (IOException e) {
+            try {
+                session.close();
+            } catch (IOException e1) {
+                // Ignore
+            }
+        }
     }
 
     @WebSocketMessage
-    public ByteBuffer echoBinaryMessage(ByteBuffer bb) {
-        return bb;
+    public void echoBinaryMessage(Session session, ByteBuffer bb,
+            boolean last) {
+        try {
+            session.getRemote().sendPartialBytes(bb, last);
+        } catch (IOException e) {
+            try {
+                session.close();
+            } catch (IOException e1) {
+                // Ignore
+            }
+        }
     }
 
     /**

Modified: tomcat/trunk/webapps/examples/WEB-INF/web.xml
URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/examples/WEB-INF/web.xml?rev=1426153&r1=1426152&r2=1426153&view=diff
==============================================================================
--- tomcat/trunk/webapps/examples/WEB-INF/web.xml (original)
+++ tomcat/trunk/webapps/examples/WEB-INF/web.xml Thu Dec 27 10:24:00 2012
@@ -354,11 +354,12 @@
       <url-pattern>/async/stockticker</url-pattern>
     </servlet-mapping>
 
-    <!-- Uncomment the section below to increase the WebSocket read buffer -->
-    <!-- size for the Autobahn testsuite.                                  -->
+    <!-- Uncomment the section below to increase the WebSocket read buffer  -->
+    <!-- size from the default of 8k to 512k                                -->
+    <!--
     <context-param>
       <param-name>wsReadBufferSize</param-name>
-      <!-- 512k -->
       <param-value>524288</param-value>
     </context-param>
+    -->
 </web-app>



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org