You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by vi...@apache.org on 2014/09/17 12:37:11 UTC

svn commit: r1625506 - in /tomcat/tc7.0.x/trunk: ./ java/org/apache/tomcat/websocket/MessagePart.java java/org/apache/tomcat/websocket/PerMessageDeflate.java java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java webapps/docs/changelog.xml

Author: violetagg
Date: Wed Sep 17 10:37:10 2014
New Revision: 1625506

URL: http://svn.apache.org/r1625506
Log:
Merged revision 1619738 from tomcat/trunk:
Extend support for the WebSocket permessage-deflate extension to compression of outgoing messages on the server side.

Modified:
    tomcat/tc7.0.x/trunk/   (props changed)
    tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/MessagePart.java
    tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java
    tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
    tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml

Propchange: tomcat/tc7.0.x/trunk/
------------------------------------------------------------------------------
  Merged /tomcat/trunk:r1619738

Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/MessagePart.java
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/MessagePart.java?rev=1625506&r1=1625505&r2=1625506&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/MessagePart.java (original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/MessagePart.java Wed Sep 17 10:37:10 2014
@@ -25,15 +25,17 @@ class MessagePart {
     private final int rsv;
     private final byte opCode;
     private final ByteBuffer payload;
-    private final SendHandler handler;
+    private final SendHandler intermediateHandler;
+    private volatile SendHandler endHandler;
 
     public MessagePart( boolean fin, int rsv, byte opCode, ByteBuffer payload,
-            SendHandler handler) {
+            SendHandler intermediateHandler, SendHandler endHandler) {
         this.fin = fin;
         this.rsv = rsv;
         this.opCode = opCode;
         this.payload = payload;
-        this.handler = handler;
+        this.intermediateHandler = intermediateHandler;
+        this.endHandler = endHandler;
     }
 
 
@@ -57,8 +59,17 @@ class MessagePart {
     }
 
 
-    public SendHandler getHandler() {
-        return handler;
+    public SendHandler getIntermediateHandler() {
+        return intermediateHandler;
+    }
+
+
+    public SendHandler getEndHandler() {
+        return endHandler;
+    }
+
+    public void setEndHandler(SendHandler endHandler) {
+        this.endHandler = endHandler;
     }
 }
 

Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java?rev=1625506&r1=1625505&r2=1625506&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java (original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java Wed Sep 17 10:37:10 2014
@@ -21,10 +21,12 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.zip.DataFormatException;
+import java.util.zip.Deflater;
 import java.util.zip.Inflater;
 
 import javax.websocket.Extension;
 import javax.websocket.Extension.Parameter;
+import javax.websocket.SendHandler;
 
 import org.apache.tomcat.util.res.StringManager;
 
@@ -47,10 +49,15 @@ public class PerMessageDeflate implement
     private final boolean clientContextTakeover;
     private final int clientMaxWindowBits;
     private final Inflater inflater = new Inflater(true);
-    private final ByteBuffer readBuffer = ByteBuffer.allocate(8192);
+    private final ByteBuffer readBuffer = ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
+    private final Deflater deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true);
 
     private volatile Transformation next;
     private volatile boolean skipDecompression = false;
+    private volatile ByteBuffer writeBuffer = ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
+    private volatile boolean deflaterResetRequired = true;
+    private volatile boolean firstCompressedFrameWritten = false;
+    private volatile byte[] EOM_BUFFER = new byte[EOM_BYTES.length + 1];
 
     static PerMessageDeflate negotiate(List<List<Parameter>> preferences) {
         // Accept the first preference that the server is able to support
@@ -288,25 +295,143 @@ public class PerMessageDeflate implement
 
 
     @Override
-    public List<MessagePart> sendMessagePart(List<MessagePart> messageParts) {
-        List<MessagePart> compressedParts = new ArrayList<MessagePart>(messageParts.size());
+    public List<MessagePart> sendMessagePart(List<MessagePart> uncompressedParts) {
+        List<MessagePart> allCompressedParts = new ArrayList<MessagePart>();
 
-        for (MessagePart messagePart : messageParts) {
-            byte opCode = messagePart.getOpCode();
+        for (MessagePart uncompressedPart : uncompressedParts) {
+            byte opCode = uncompressedPart.getOpCode();
             if (Util.isControl(opCode)) {
                 // Control messages can appear in the middle of other messages
                 // and must not be compressed. Pass it straight through
-                compressedParts.add(messagePart);
+                allCompressedParts.add(uncompressedPart);
             } else {
-                // TODO: Implement compression of sent messages
-                compressedParts.add(messagePart);
+                List<MessagePart> compressedParts = new ArrayList<MessagePart>();
+                ByteBuffer uncompressedPayload = uncompressedPart.getPayload();
+                SendHandler uncompressedIntermediateHandler =
+                        uncompressedPart.getIntermediateHandler();
+
+                // Need to reset the deflater at the start of every message
+                if (deflaterResetRequired) {
+                    deflater.reset();
+                    deflaterResetRequired = false;
+                    firstCompressedFrameWritten = false;
+                }
+
+                deflater.setInput(uncompressedPayload.array(),
+                        uncompressedPayload.arrayOffset() + uncompressedPayload.position(),
+                        uncompressedPayload.remaining());
+
+                int flush = (uncompressedPart.isFin() ? Deflater.SYNC_FLUSH : Deflater.NO_FLUSH);
+                boolean deflateRequired = true;
+
+                while(deflateRequired) {
+                    ByteBuffer compressedPayload = writeBuffer;
+
+                    int written = deflater.deflate(compressedPayload.array(),
+                            compressedPayload.arrayOffset() + compressedPayload.position(),
+                            compressedPayload.remaining(), flush);
+                    compressedPayload.position(compressedPayload.position() + written);
+
+                    if (!uncompressedPart.isFin() && compressedPayload.hasRemaining() && deflater.needsInput()) {
+                        // This message part has been fully processed by the
+                        // deflater. Fire the send handler for this message part
+                        // and move on to the next message part.
+                        break;
+                    }
+
+                    // If this point is reached, a new compressed message part
+                    // will be created...
+                    MessagePart compressedPart;
+
+                    // .. and a new writeBuffer will be required.
+                    writeBuffer = ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
+
+                    // Flip the compressed payload ready for writing
+                    compressedPayload.flip();
+
+                    boolean fin = uncompressedPart.isFin();
+                    boolean full = compressedPayload.limit() == compressedPayload.capacity();
+                    boolean needsInput = deflater.needsInput();
+
+                    if (fin && !full && needsInput) {
+                        // End of compressed message. Drop EOM bytes and output.
+                        compressedPayload.limit(compressedPayload.limit() - EOM_BYTES.length);
+                        compressedPart = new MessagePart(true, getRsv(uncompressedPart),
+                                opCode, compressedPayload, uncompressedIntermediateHandler,
+                                uncompressedIntermediateHandler);
+                        deflaterResetRequired = true;
+                        deflateRequired = false;
+                    } else if (full && !needsInput) {
+                        // Write buffer full and input message not fully read.
+                        // Output and start new compressed part.
+                        compressedPart = new MessagePart(false, getRsv(uncompressedPart),
+                                opCode, compressedPayload, uncompressedIntermediateHandler,
+                                uncompressedIntermediateHandler);
+                    } else if (!fin && full && needsInput) {
+                        // Write buffer full and input message not fully read.
+                        // Output and get more data.
+                        compressedPart = new MessagePart(false, getRsv(uncompressedPart),
+                                opCode, compressedPayload, uncompressedIntermediateHandler,
+                                uncompressedIntermediateHandler);
+                        deflateRequired = false;
+                    } else if (fin && full && needsInput) {
+                        // Write buffer full. Input fully read. Deflater may be
+                        // in one of four states:
+                        // - output complete (just happened to align with end of
+                        //   buffer
+                        // - in middle of EOM bytes
+                        // - about to write EOM bytes
+                        // - more data to write
+                        int eomBufferWritten = deflater.deflate(EOM_BUFFER, 0, EOM_BUFFER.length, Deflater.SYNC_FLUSH);
+                        if (eomBufferWritten < EOM_BUFFER.length) {
+                            // EOM has just been completed
+                            compressedPayload.limit(compressedPayload.limit() - EOM_BYTES.length + eomBufferWritten);
+                            compressedPart = new MessagePart(true,
+                                    getRsv(uncompressedPart), opCode, compressedPayload,
+                                    uncompressedIntermediateHandler, uncompressedIntermediateHandler);
+                            deflaterResetRequired = true;
+                            deflateRequired = false;
+                        } else {
+                            // More data to write
+                            // Copy bytes to new write buffer
+                            writeBuffer.put(EOM_BUFFER, 0, eomBufferWritten);
+                            compressedPart = new MessagePart(false,
+                                    getRsv(uncompressedPart), opCode, compressedPayload,
+                                    uncompressedIntermediateHandler, uncompressedIntermediateHandler);
+                        }
+                    } else {
+                        throw new IllegalStateException("Should never happen");
+                    }
+
+                    // Add the newly created compressed part to the set of parts
+                    // to pass on to the next transformation.
+                    compressedParts.add(compressedPart);
+                }
+
+                SendHandler uncompressedEndHandler = uncompressedPart.getEndHandler();
+                int size = compressedParts.size();
+                if (size > 0) {
+                    compressedParts.get(size - 1).setEndHandler(uncompressedEndHandler);
+                }
+
+                allCompressedParts.addAll(compressedParts);
             }
         }
 
         if (next == null) {
-            return compressedParts;
+            return allCompressedParts;
         } else {
-            return next.sendMessagePart(compressedParts);
+            return next.sendMessagePart(allCompressedParts);
         }
     }
+
+
+    private int getRsv(MessagePart uncompressedMessagePart) {
+        int result = uncompressedMessagePart.getRsv();
+        if (!firstCompressedFrameWritten) {
+            result += RSV_BITMASK;
+            firstCompressedFrameWritten = true;
+        }
+        return result;
+    }
 }

Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java?rev=1625506&r1=1625505&r2=1625506&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java (original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java Wed Sep 17 10:37:10 2014
@@ -61,6 +61,9 @@ public abstract class WsRemoteEndpointIm
 
     private final StateMachine stateMachine = new StateMachine();
 
+    private final IntermediateMessageHandler intermediateMessageHandler =
+            new IntermediateMessageHandler(this);
+
     private Transformation transformation = null;
     private boolean messagePartInProgress = false;
     private final Queue<MessagePart> messagePartQueue = new ArrayDeque<MessagePart>();
@@ -261,10 +264,19 @@ public abstract class WsRemoteEndpointIm
 
         List<MessagePart> messageParts = new ArrayList<MessagePart>();
         messageParts.add(new MessagePart(last, 0, opCode, payload,
+                intermediateMessageHandler,
                 new EndMessageHandler(this, handler)));
 
         messageParts = transformation.sendMessagePart(messageParts);
 
+        // Some extensions/transformations may buffer messages so it is possible
+        // that no message parts will be returned. If this is the case the
+        // trigger the suppler SendHandler
+        if (messageParts.size() == 0) {
+            handler.onResult(new SendResult());
+            return;
+        }
+
         MessagePart mp = messageParts.remove(0);
 
         boolean doWrite = false;
@@ -332,12 +344,15 @@ public abstract class WsRemoteEndpointIm
 
         wsSession.updateLastActive();
 
-        handler.onResult(result);
+        // Some handlers, such as the IntermediateMessageHandler, do not have a
+        // nested handler so handler may be null.
+        if (handler != null) {
+            handler.onResult(result);
+        }
     }
 
 
     void writeMessagePart(MessagePart mp) {
-
         if (closed) {
             throw new IllegalStateException(
                     sm.getString("wsRemoteEndpoint.closed"));
@@ -346,7 +361,7 @@ public abstract class WsRemoteEndpointIm
         if (Constants.INTERNAL_OPCODE_FLUSH == mp.getOpCode()) {
             nextFragmented = fragmented;
             nextText = text;
-            doWrite(mp.getHandler(), outputBuffer);
+            doWrite(mp.getEndHandler(), outputBuffer);
             return;
         }
 
@@ -400,14 +415,13 @@ public abstract class WsRemoteEndpointIm
         if (getBatchingAllowed() || isMasked()) {
             // Need to write via output buffer
             OutputBufferSendHandler obsh = new OutputBufferSendHandler(
-                    mp.getHandler(), headerBuffer, mp.getPayload(), mask,
+                    mp.getEndHandler(), headerBuffer, mp.getPayload(), mask,
                     outputBuffer, !getBatchingAllowed(), this);
             obsh.write();
         } else {
             // Can write directly
-            doWrite(mp.getHandler(), headerBuffer, mp.getPayload());
+            doWrite(mp.getEndHandler(), headerBuffer, mp.getPayload());
         }
-
     }
 
 
@@ -449,6 +463,31 @@ public abstract class WsRemoteEndpointIm
     }
 
 
+    /**
+     * If a transformation needs to split a {@link MessagePart} into multiple
+     * {@link MessagePart}s, it uses this handler as the end handler for each of
+     * the additional {@link MessagePart}s. This handler notifies this this
+     * class that the {@link MessagePart} has been processed and that the next
+     * {@link MessagePart} in the queue should be started. The final
+     * {@link MessagePart} will use the {@link EndMessageHandler} provided with
+     * the original {@link MessagePart}.
+     */
+    private static class IntermediateMessageHandler implements SendHandler {
+
+        private final WsRemoteEndpointImplBase endpoint;
+
+        public IntermediateMessageHandler(WsRemoteEndpointImplBase endpoint) {
+            this.endpoint = endpoint;
+        }
+
+
+        @Override
+        public void onResult(SendResult result) {
+            endpoint.endMessage(null, result);
+        }
+    }
+
+
     public void sendObject(Object obj) throws IOException {
         Future<Void> f = sendObjectByFuture(obj);
         try {

Modified: tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml?rev=1625506&r1=1625505&r2=1625506&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml Wed Sep 17 10:37:10 2014
@@ -251,6 +251,10 @@
         It is expected that support will be extended to outgoing messages and to
         the client side shortly. (markt)
       </add>
+      <add>
+        Extend support for the <code>permessage-deflate</code> extension to
+        compression of outgoing messages on the server side. (markt)
+      </add>
     </changelog>
   </subsection>
   <subsection name="Web applications">



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org