You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2014/08/22 13:02:19 UTC

svn commit: r1619738 - in /tomcat/trunk: java/org/apache/tomcat/websocket/MessagePart.java java/org/apache/tomcat/websocket/PerMessageDeflate.java java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java webapps/docs/changelog.xml

Author: markt
Date: Fri Aug 22 11:02:19 2014
New Revision: 1619738

URL: http://svn.apache.org/r1619738
Log:
Extend support for the WebSocket permessage-deflate extension to compression of outgoing messages on the server side.

Modified:
    tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java
    tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java
    tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
    tomcat/trunk/webapps/docs/changelog.xml

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java?rev=1619738&r1=1619737&r2=1619738&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java Fri Aug 22 11:02:19 2014
@@ -25,15 +25,17 @@ class MessagePart {
     private final int rsv;
     private final byte opCode;
     private final ByteBuffer payload;
-    private final SendHandler handler;
+    private final SendHandler intermediateHandler;
+    private volatile SendHandler endHandler;
 
     public MessagePart( boolean fin, int rsv, byte opCode, ByteBuffer payload,
-            SendHandler handler) {
+            SendHandler intermediateHandler, SendHandler endHandler) {
         this.fin = fin;
         this.rsv = rsv;
         this.opCode = opCode;
         this.payload = payload;
-        this.handler = handler;
+        this.intermediateHandler = intermediateHandler;
+        this.endHandler = endHandler;
     }
 
 
@@ -57,8 +59,17 @@ class MessagePart {
     }
 
 
-    public SendHandler getHandler() {
-        return handler;
+    public SendHandler getIntermediateHandler() {
+        return intermediateHandler;
+    }
+
+
+    public SendHandler getEndHandler() {
+        return endHandler;
+    }
+
+    public void setEndHandler(SendHandler endHandler) {
+        this.endHandler = endHandler;
     }
 }
 

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java?rev=1619738&r1=1619737&r2=1619738&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java Fri Aug 22 11:02:19 2014
@@ -21,10 +21,12 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.zip.DataFormatException;
+import java.util.zip.Deflater;
 import java.util.zip.Inflater;
 
 import javax.websocket.Extension;
 import javax.websocket.Extension.Parameter;
+import javax.websocket.SendHandler;
 
 import org.apache.tomcat.util.res.StringManager;
 
@@ -47,10 +49,15 @@ public class PerMessageDeflate implement
     private final boolean clientContextTakeover;
     private final int clientMaxWindowBits;
     private final Inflater inflater = new Inflater(true);
-    private final ByteBuffer readBuffer = ByteBuffer.allocate(8192);
+    private final ByteBuffer readBuffer = ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
+    private final Deflater deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true);
 
     private volatile Transformation next;
     private volatile boolean skipDecompression = false;
+    private volatile ByteBuffer writeBuffer = ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
+    private volatile boolean deflaterResetRequired = true;
+    private volatile boolean firstCompressedFrameWritten = false;
+    private volatile byte[] EOM_BUFFER = new byte[EOM_BYTES.length + 1];
 
     static PerMessageDeflate negotiate(List<List<Parameter>> preferences) {
         // Accept the first preference that the server is able to support
@@ -288,25 +295,143 @@ public class PerMessageDeflate implement
 
 
     @Override
-    public List<MessagePart> sendMessagePart(List<MessagePart> messageParts) {
-        List<MessagePart> compressedParts = new ArrayList<>(messageParts.size());
+    public List<MessagePart> sendMessagePart(List<MessagePart> uncompressedParts) {
+        List<MessagePart> allCompressedParts = new ArrayList<>();
 
-        for (MessagePart messagePart : messageParts) {
-            byte opCode = messagePart.getOpCode();
+        for (MessagePart uncompressedPart : uncompressedParts) {
+            byte opCode = uncompressedPart.getOpCode();
             if (Util.isControl(opCode)) {
                 // Control messages can appear in the middle of other messages
                 // and must not be compressed. Pass it straight through
-                compressedParts.add(messagePart);
+                allCompressedParts.add(uncompressedPart);
             } else {
-                // TODO: Implement compression of sent messages
-                compressedParts.add(messagePart);
+                List<MessagePart> compressedParts = new ArrayList<>();
+                ByteBuffer uncompressedPayload = uncompressedPart.getPayload();
+                SendHandler uncompressedIntermediateHandler =
+                        uncompressedPart.getIntermediateHandler();
+
+                // Need to reset the deflater at the start of every message
+                if (deflaterResetRequired) {
+                    deflater.reset();
+                    deflaterResetRequired = false;
+                    firstCompressedFrameWritten = false;
+                }
+
+                deflater.setInput(uncompressedPayload.array(),
+                        uncompressedPayload.arrayOffset() + uncompressedPayload.position(),
+                        uncompressedPayload.remaining());
+
+                int flush = (uncompressedPart.isFin() ? Deflater.SYNC_FLUSH : Deflater.NO_FLUSH);
+                boolean deflateRequired = true;
+
+                while(deflateRequired) {
+                    ByteBuffer compressedPayload = writeBuffer;
+
+                    int written = deflater.deflate(compressedPayload.array(),
+                            compressedPayload.arrayOffset() + compressedPayload.position(),
+                            compressedPayload.remaining(), flush);
+                    compressedPayload.position(compressedPayload.position() + written);
+
+                    if (!uncompressedPart.isFin() && compressedPayload.hasRemaining() && deflater.needsInput()) {
+                        // This message part has been fully processed by the
+                        // deflater. Fire the send handler for this message part
+                        // and move on to the next message part.
+                        break;
+                    }
+
+                    // If this point is reached, a new compressed message part
+                    // will be created...
+                    MessagePart compressedPart;
+
+                    // .. and a new writeBuffer will be required.
+                    writeBuffer = ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
+
+                    // Flip the compressed payload ready for writing
+                    compressedPayload.flip();
+
+                    boolean fin = uncompressedPart.isFin();
+                    boolean full = compressedPayload.limit() == compressedPayload.capacity();
+                    boolean needsInput = deflater.needsInput();
+
+                    if (fin && !full && needsInput) {
+                        // End of compressed message. Drop EOM bytes and output.
+                        compressedPayload.limit(compressedPayload.limit() - EOM_BYTES.length);
+                        compressedPart = new MessagePart(true, getRsv(uncompressedPart),
+                                opCode, compressedPayload, uncompressedIntermediateHandler,
+                                uncompressedIntermediateHandler);
+                        deflaterResetRequired = true;
+                        deflateRequired = false;
+                    } else if (full && !needsInput) {
+                        // Write buffer full and input message not fully read.
+                        // Output and start new compressed part.
+                        compressedPart = new MessagePart(false, getRsv(uncompressedPart),
+                                opCode, compressedPayload, uncompressedIntermediateHandler,
+                                uncompressedIntermediateHandler);
+                    } else if (!fin && full && needsInput) {
+                        // Write buffer full and input message not fully read.
+                        // Output and get more data.
+                        compressedPart = new MessagePart(false, getRsv(uncompressedPart),
+                                opCode, compressedPayload, uncompressedIntermediateHandler,
+                                uncompressedIntermediateHandler);
+                        deflateRequired = false;
+                    } else if (fin && full && needsInput) {
+                        // Write buffer full. Input fully read. Deflater may be
+                        // in one of four states:
+                        // - output complete (just happened to align with end of
+                        //   buffer
+                        // - in middle of EOM bytes
+                        // - about to write EOM bytes
+                        // - more data to write
+                        int eomBufferWritten = deflater.deflate(EOM_BUFFER, 0, EOM_BUFFER.length, Deflater.SYNC_FLUSH);
+                        if (eomBufferWritten < EOM_BUFFER.length) {
+                            // EOM has just been completed
+                            compressedPayload.limit(compressedPayload.limit() - EOM_BYTES.length + eomBufferWritten);
+                            compressedPart = new MessagePart(true,
+                                    getRsv(uncompressedPart), opCode, compressedPayload,
+                                    uncompressedIntermediateHandler, uncompressedIntermediateHandler);
+                            deflaterResetRequired = true;
+                            deflateRequired = false;
+                        } else {
+                            // More data to write
+                            // Copy bytes to new write buffer
+                            writeBuffer.put(EOM_BUFFER, 0, eomBufferWritten);
+                            compressedPart = new MessagePart(false,
+                                    getRsv(uncompressedPart), opCode, compressedPayload,
+                                    uncompressedIntermediateHandler, uncompressedIntermediateHandler);
+                        }
+                    } else {
+                        throw new IllegalStateException("Should never happen");
+                    }
+
+                    // Add the newly created compressed part to the set of parts
+                    // to pass on to the next transformation.
+                    compressedParts.add(compressedPart);
+                }
+
+                SendHandler uncompressedEndHandler = uncompressedPart.getEndHandler();
+                int size = compressedParts.size();
+                if (size > 0) {
+                    compressedParts.get(size - 1).setEndHandler(uncompressedEndHandler);
+                }
+
+                allCompressedParts.addAll(compressedParts);
             }
         }
 
         if (next == null) {
-            return compressedParts;
+            return allCompressedParts;
         } else {
-            return next.sendMessagePart(compressedParts);
+            return next.sendMessagePart(allCompressedParts);
         }
     }
+
+
+    private int getRsv(MessagePart uncompressedMessagePart) {
+        int result = uncompressedMessagePart.getRsv();
+        if (!firstCompressedFrameWritten) {
+            result += RSV_BITMASK;
+            firstCompressedFrameWritten = true;
+        }
+        return result;
+    }
 }

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java?rev=1619738&r1=1619737&r2=1619738&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java Fri Aug 22 11:02:19 2014
@@ -61,6 +61,9 @@ public abstract class WsRemoteEndpointIm
 
     private final StateMachine stateMachine = new StateMachine();
 
+    private final IntermediateMessageHandler intermediateMessageHandler =
+            new IntermediateMessageHandler(this);
+
     private Transformation transformation = null;
     private boolean messagePartInProgress = false;
     private final Queue<MessagePart> messagePartQueue = new ArrayDeque<>();
@@ -258,10 +261,19 @@ public abstract class WsRemoteEndpointIm
 
         List<MessagePart> messageParts = new ArrayList<>();
         messageParts.add(new MessagePart(last, 0, opCode, payload,
+                intermediateMessageHandler,
                 new EndMessageHandler(this, handler)));
 
         messageParts = transformation.sendMessagePart(messageParts);
 
+        // Some extensions/transformations may buffer messages so it is possible
+        // that no message parts will be returned. If this is the case the
+        // trigger the suppler SendHandler
+        if (messageParts.size() == 0) {
+            handler.onResult(new SendResult());
+            return;
+        }
+
         MessagePart mp = messageParts.remove(0);
 
         boolean doWrite = false;
@@ -329,12 +341,15 @@ public abstract class WsRemoteEndpointIm
 
         wsSession.updateLastActive();
 
-        handler.onResult(result);
+        // Some handlers, such as the IntermediateMessageHandler, do not have a
+        // nested handler so handler may be null.
+        if (handler != null) {
+            handler.onResult(result);
+        }
     }
 
 
     void writeMessagePart(MessagePart mp) {
-
         if (closed) {
             throw new IllegalStateException(
                     sm.getString("wsRemoteEndpoint.closed"));
@@ -343,7 +358,7 @@ public abstract class WsRemoteEndpointIm
         if (Constants.INTERNAL_OPCODE_FLUSH == mp.getOpCode()) {
             nextFragmented = fragmented;
             nextText = text;
-            doWrite(mp.getHandler(), outputBuffer);
+            doWrite(mp.getEndHandler(), outputBuffer);
             return;
         }
 
@@ -397,14 +412,13 @@ public abstract class WsRemoteEndpointIm
         if (getBatchingAllowed() || isMasked()) {
             // Need to write via output buffer
             OutputBufferSendHandler obsh = new OutputBufferSendHandler(
-                    mp.getHandler(), headerBuffer, mp.getPayload(), mask,
+                    mp.getEndHandler(), headerBuffer, mp.getPayload(), mask,
                     outputBuffer, !getBatchingAllowed(), this);
             obsh.write();
         } else {
             // Can write directly
-            doWrite(mp.getHandler(), headerBuffer, mp.getPayload());
+            doWrite(mp.getEndHandler(), headerBuffer, mp.getPayload());
         }
-
     }
 
 
@@ -446,6 +460,31 @@ public abstract class WsRemoteEndpointIm
     }
 
 
+    /**
+     * If a transformation needs to split a {@link MessagePart} into multiple
+     * {@link MessagePart}s, it uses this handler as the end handler for each of
+     * the additional {@link MessagePart}s. This handler notifies this this
+     * class that the {@link MessagePart} has been processed and that the next
+     * {@link MessagePart} in the queue should be started. The final
+     * {@link MessagePart} will use the {@link EndMessageHandler} provided with
+     * the original {@link MessagePart}.
+     */
+    private static class IntermediateMessageHandler implements SendHandler {
+
+        private final WsRemoteEndpointImplBase endpoint;
+
+        public IntermediateMessageHandler(WsRemoteEndpointImplBase endpoint) {
+            this.endpoint = endpoint;
+        }
+
+
+        @Override
+        public void onResult(SendResult result) {
+            endpoint.endMessage(null, result);
+        }
+    }
+
+
     public void sendObject(Object obj) throws IOException {
         Future<Void> f = sendObjectByFuture(obj);
         try {

Modified: tomcat/trunk/webapps/docs/changelog.xml
URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1619738&r1=1619737&r2=1619738&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/trunk/webapps/docs/changelog.xml Fri Aug 22 11:02:19 2014
@@ -54,6 +54,14 @@
       </fix>
     </changelog>
   </subsection>
+  <subsection name="WebSocket">
+    <changelog>
+      <add>
+        Extend support for the <code>permessage-deflate</code> extension to
+        compression of outgoing messages on the server side. (markt)
+      </add>
+    </changelog>
+  </subsection>
   <subsection name="Other">
     <changelog>
       <add>



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Re: svn commit: r1619738 - in /tomcat/trunk: java/org/apache/tomcat/websocket/MessagePart.java java/org/apache/tomcat/websocket/PerMessageDeflate.java java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java webapps/docs/changelog.xml

Posted by Mark Thomas <ma...@apache.org>.
On 12/09/2014 13:04, Violeta Georgieva wrote:
> Hi,
> 
> 2014-08-22 14:02 GMT+03:00 <ma...@apache.org>:
>>
>> Author: markt
>> Date: Fri Aug 22 11:02:19 2014
>> New Revision: 1619738
>>
>> URL: http://svn.apache.org/r1619738
>> Log:
>> Extend support for the WebSocket permessage-deflate extension to
> compression of outgoing messages on the server side.
> 
> I would like to back-port this to 7.0.x.
> Wdyt?

No objections here. You might want to check the rest of the websocket
package for changes that haven't been back-ported as well.

Mark


> 
> Regards,
> Violeta
> 
> 
>> Modified:
>>     tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java
>>     tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java
>>
> tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
>>     tomcat/trunk/webapps/docs/changelog.xml
>>
>> Modified: tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java
>> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java?rev=1619738&r1=1619737&r2=1619738&view=diff
>>
> ==============================================================================
>> --- tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java
> (original)
>> +++ tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java Fri
> Aug 22 11:02:19 2014
>> @@ -25,15 +25,17 @@ class MessagePart {
>>      private final int rsv;
>>      private final byte opCode;
>>      private final ByteBuffer payload;
>> -    private final SendHandler handler;
>> +    private final SendHandler intermediateHandler;
>> +    private volatile SendHandler endHandler;
>>
>>      public MessagePart( boolean fin, int rsv, byte opCode, ByteBuffer
> payload,
>> -            SendHandler handler) {
>> +            SendHandler intermediateHandler, SendHandler endHandler) {
>>          this.fin = fin;
>>          this.rsv = rsv;
>>          this.opCode = opCode;
>>          this.payload = payload;
>> -        this.handler = handler;
>> +        this.intermediateHandler = intermediateHandler;
>> +        this.endHandler = endHandler;
>>      }
>>
>>
>> @@ -57,8 +59,17 @@ class MessagePart {
>>      }
>>
>>
>> -    public SendHandler getHandler() {
>> -        return handler;
>> +    public SendHandler getIntermediateHandler() {
>> +        return intermediateHandler;
>> +    }
>> +
>> +
>> +    public SendHandler getEndHandler() {
>> +        return endHandler;
>> +    }
>> +
>> +    public void setEndHandler(SendHandler endHandler) {
>> +        this.endHandler = endHandler;
>>      }
>>  }
>>
>>
>> Modified:
> tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java
>> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java?rev=1619738&r1=1619737&r2=1619738&view=diff
>>
> ==============================================================================
>> --- tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java
> (original)
>> +++ tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java
> Fri Aug 22 11:02:19 2014
>> @@ -21,10 +21,12 @@ import java.nio.ByteBuffer;
>>  import java.util.ArrayList;
>>  import java.util.List;
>>  import java.util.zip.DataFormatException;
>> +import java.util.zip.Deflater;
>>  import java.util.zip.Inflater;
>>
>>  import javax.websocket.Extension;
>>  import javax.websocket.Extension.Parameter;
>> +import javax.websocket.SendHandler;
>>
>>  import org.apache.tomcat.util.res.StringManager;
>>
>> @@ -47,10 +49,15 @@ public class PerMessageDeflate implement
>>      private final boolean clientContextTakeover;
>>      private final int clientMaxWindowBits;
>>      private final Inflater inflater = new Inflater(true);
>> -    private final ByteBuffer readBuffer = ByteBuffer.allocate(8192);
>> +    private final ByteBuffer readBuffer =
> ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
>> +    private final Deflater deflater = new
> Deflater(Deflater.DEFAULT_COMPRESSION, true);
>>
>>      private volatile Transformation next;
>>      private volatile boolean skipDecompression = false;
>> +    private volatile ByteBuffer writeBuffer =
> ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
>> +    private volatile boolean deflaterResetRequired = true;
>> +    private volatile boolean firstCompressedFrameWritten = false;
>> +    private volatile byte[] EOM_BUFFER = new byte[EOM_BYTES.length + 1];
>>
>>      static PerMessageDeflate negotiate(List<List<Parameter>>
> preferences) {
>>          // Accept the first preference that the server is able to support
>> @@ -288,25 +295,143 @@ public class PerMessageDeflate implement
>>
>>
>>      @Override
>> -    public List<MessagePart> sendMessagePart(List<MessagePart>
> messageParts) {
>> -        List<MessagePart> compressedParts = new
> ArrayList<>(messageParts.size());
>> +    public List<MessagePart> sendMessagePart(List<MessagePart>
> uncompressedParts) {
>> +        List<MessagePart> allCompressedParts = new ArrayList<>();
>>
>> -        for (MessagePart messagePart : messageParts) {
>> -            byte opCode = messagePart.getOpCode();
>> +        for (MessagePart uncompressedPart : uncompressedParts) {
>> +            byte opCode = uncompressedPart.getOpCode();
>>              if (Util.isControl(opCode)) {
>>                  // Control messages can appear in the middle of other
> messages
>>                  // and must not be compressed. Pass it straight through
>> -                compressedParts.add(messagePart);
>> +                allCompressedParts.add(uncompressedPart);
>>              } else {
>> -                // TODO: Implement compression of sent messages
>> -                compressedParts.add(messagePart);
>> +                List<MessagePart> compressedParts = new ArrayList<>();
>> +                ByteBuffer uncompressedPayload =
> uncompressedPart.getPayload();
>> +                SendHandler uncompressedIntermediateHandler =
>> +                        uncompressedPart.getIntermediateHandler();
>> +
>> +                // Need to reset the deflater at the start of every
> message
>> +                if (deflaterResetRequired) {
>> +                    deflater.reset();
>> +                    deflaterResetRequired = false;
>> +                    firstCompressedFrameWritten = false;
>> +                }
>> +
>> +                deflater.setInput(uncompressedPayload.array(),
>> +                        uncompressedPayload.arrayOffset() +
> uncompressedPayload.position(),
>> +                        uncompressedPayload.remaining());
>> +
>> +                int flush = (uncompressedPart.isFin() ?
> Deflater.SYNC_FLUSH : Deflater.NO_FLUSH);
>> +                boolean deflateRequired = true;
>> +
>> +                while(deflateRequired) {
>> +                    ByteBuffer compressedPayload = writeBuffer;
>> +
>> +                    int written =
> deflater.deflate(compressedPayload.array(),
>> +                            compressedPayload.arrayOffset() +
> compressedPayload.position(),
>> +                            compressedPayload.remaining(), flush);
>> +
>  compressedPayload.position(compressedPayload.position() + written);
>> +
>> +                    if (!uncompressedPart.isFin() &&
> compressedPayload.hasRemaining() && deflater.needsInput()) {
>> +                        // This message part has been fully processed by
> the
>> +                        // deflater. Fire the send handler for this
> message part
>> +                        // and move on to the next message part.
>> +                        break;
>> +                    }
>> +
>> +                    // If this point is reached, a new compressed
> message part
>> +                    // will be created...
>> +                    MessagePart compressedPart;
>> +
>> +                    // .. and a new writeBuffer will be required.
>> +                    writeBuffer =
> ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
>> +
>> +                    // Flip the compressed payload ready for writing
>> +                    compressedPayload.flip();
>> +
>> +                    boolean fin = uncompressedPart.isFin();
>> +                    boolean full = compressedPayload.limit() ==
> compressedPayload.capacity();
>> +                    boolean needsInput = deflater.needsInput();
>> +
>> +                    if (fin && !full && needsInput) {
>> +                        // End of compressed message. Drop EOM bytes and
> output.
>> +
>  compressedPayload.limit(compressedPayload.limit() - EOM_BYTES.length);
>> +                        compressedPart = new MessagePart(true,
> getRsv(uncompressedPart),
>> +                                opCode, compressedPayload,
> uncompressedIntermediateHandler,
>> +                                uncompressedIntermediateHandler);
>> +                        deflaterResetRequired = true;
>> +                        deflateRequired = false;
>> +                    } else if (full && !needsInput) {
>> +                        // Write buffer full and input message not fully
> read.
>> +                        // Output and start new compressed part.
>> +                        compressedPart = new MessagePart(false,
> getRsv(uncompressedPart),
>> +                                opCode, compressedPayload,
> uncompressedIntermediateHandler,
>> +                                uncompressedIntermediateHandler);
>> +                    } else if (!fin && full && needsInput) {
>> +                        // Write buffer full and input message not fully
> read.
>> +                        // Output and get more data.
>> +                        compressedPart = new MessagePart(false,
> getRsv(uncompressedPart),
>> +                                opCode, compressedPayload,
> uncompressedIntermediateHandler,
>> +                                uncompressedIntermediateHandler);
>> +                        deflateRequired = false;
>> +                    } else if (fin && full && needsInput) {
>> +                        // Write buffer full. Input fully read. Deflater
> may be
>> +                        // in one of four states:
>> +                        // - output complete (just happened to align
> with end of
>> +                        //   buffer
>> +                        // - in middle of EOM bytes
>> +                        // - about to write EOM bytes
>> +                        // - more data to write
>> +                        int eomBufferWritten =
> deflater.deflate(EOM_BUFFER, 0, EOM_BUFFER.length, Deflater.SYNC_FLUSH);
>> +                        if (eomBufferWritten < EOM_BUFFER.length) {
>> +                            // EOM has just been completed
>> +
>  compressedPayload.limit(compressedPayload.limit() - EOM_BYTES.length +
> eomBufferWritten);
>> +                            compressedPart = new MessagePart(true,
>> +                                    getRsv(uncompressedPart), opCode,
> compressedPayload,
>> +                                    uncompressedIntermediateHandler,
> uncompressedIntermediateHandler);
>> +                            deflaterResetRequired = true;
>> +                            deflateRequired = false;
>> +                        } else {
>> +                            // More data to write
>> +                            // Copy bytes to new write buffer
>> +                            writeBuffer.put(EOM_BUFFER, 0,
> eomBufferWritten);
>> +                            compressedPart = new MessagePart(false,
>> +                                    getRsv(uncompressedPart), opCode,
> compressedPayload,
>> +                                    uncompressedIntermediateHandler,
> uncompressedIntermediateHandler);
>> +                        }
>> +                    } else {
>> +                        throw new IllegalStateException("Should never
> happen");
>> +                    }
>> +
>> +                    // Add the newly created compressed part to the set
> of parts
>> +                    // to pass on to the next transformation.
>> +                    compressedParts.add(compressedPart);
>> +                }
>> +
>> +                SendHandler uncompressedEndHandler =
> uncompressedPart.getEndHandler();
>> +                int size = compressedParts.size();
>> +                if (size > 0) {
>> +                    compressedParts.get(size -
> 1).setEndHandler(uncompressedEndHandler);
>> +                }
>> +
>> +                allCompressedParts.addAll(compressedParts);
>>              }
>>          }
>>
>>          if (next == null) {
>> -            return compressedParts;
>> +            return allCompressedParts;
>>          } else {
>> -            return next.sendMessagePart(compressedParts);
>> +            return next.sendMessagePart(allCompressedParts);
>>          }
>>      }
>> +
>> +
>> +    private int getRsv(MessagePart uncompressedMessagePart) {
>> +        int result = uncompressedMessagePart.getRsv();
>> +        if (!firstCompressedFrameWritten) {
>> +            result += RSV_BITMASK;
>> +            firstCompressedFrameWritten = true;
>> +        }
>> +        return result;
>> +    }
>>  }
>>
>> Modified:
> tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
>> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java?rev=1619738&r1=1619737&r2=1619738&view=diff
>>
> ==============================================================================
>> ---
> tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
> (original)
>> +++
> tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
> Fri Aug 22 11:02:19 2014
>> @@ -61,6 +61,9 @@ public abstract class WsRemoteEndpointIm
>>
>>      private final StateMachine stateMachine = new StateMachine();
>>
>> +    private final IntermediateMessageHandler intermediateMessageHandler =
>> +            new IntermediateMessageHandler(this);
>> +
>>      private Transformation transformation = null;
>>      private boolean messagePartInProgress = false;
>>      private final Queue<MessagePart> messagePartQueue = new
> ArrayDeque<>();
>> @@ -258,10 +261,19 @@ public abstract class WsRemoteEndpointIm
>>
>>          List<MessagePart> messageParts = new ArrayList<>();
>>          messageParts.add(new MessagePart(last, 0, opCode, payload,
>> +                intermediateMessageHandler,
>>                  new EndMessageHandler(this, handler)));
>>
>>          messageParts = transformation.sendMessagePart(messageParts);
>>
>> +        // Some extensions/transformations may buffer messages so it is
> possible
>> +        // that no message parts will be returned. If this is the case
> the
>> +        // trigger the suppler SendHandler
>> +        if (messageParts.size() == 0) {
>> +            handler.onResult(new SendResult());
>> +            return;
>> +        }
>> +
>>          MessagePart mp = messageParts.remove(0);
>>
>>          boolean doWrite = false;
>> @@ -329,12 +341,15 @@ public abstract class WsRemoteEndpointIm
>>
>>          wsSession.updateLastActive();
>>
>> -        handler.onResult(result);
>> +        // Some handlers, such as the IntermediateMessageHandler, do not
> have a
>> +        // nested handler so handler may be null.
>> +        if (handler != null) {
>> +            handler.onResult(result);
>> +        }
>>      }
>>
>>
>>      void writeMessagePart(MessagePart mp) {
>> -
>>          if (closed) {
>>              throw new IllegalStateException(
>>                      sm.getString("wsRemoteEndpoint.closed"));
>> @@ -343,7 +358,7 @@ public abstract class WsRemoteEndpointIm
>>          if (Constants.INTERNAL_OPCODE_FLUSH == mp.getOpCode()) {
>>              nextFragmented = fragmented;
>>              nextText = text;
>> -            doWrite(mp.getHandler(), outputBuffer);
>> +            doWrite(mp.getEndHandler(), outputBuffer);
>>              return;
>>          }
>>
>> @@ -397,14 +412,13 @@ public abstract class WsRemoteEndpointIm
>>          if (getBatchingAllowed() || isMasked()) {
>>              // Need to write via output buffer
>>              OutputBufferSendHandler obsh = new OutputBufferSendHandler(
>> -                    mp.getHandler(), headerBuffer, mp.getPayload(), mask,
>> +                    mp.getEndHandler(), headerBuffer, mp.getPayload(),
> mask,
>>                      outputBuffer, !getBatchingAllowed(), this);
>>              obsh.write();
>>          } else {
>>              // Can write directly
>> -            doWrite(mp.getHandler(), headerBuffer, mp.getPayload());
>> +            doWrite(mp.getEndHandler(), headerBuffer, mp.getPayload());
>>          }
>> -
>>      }
>>
>>
>> @@ -446,6 +460,31 @@ public abstract class WsRemoteEndpointIm
>>      }
>>
>>
>> +    /**
>> +     * If a transformation needs to split a {@link MessagePart} into
> multiple
>> +     * {@link MessagePart}s, it uses this handler as the end handler for
> each of
>> +     * the additional {@link MessagePart}s. This handler notifies this
> this
>> +     * class that the {@link MessagePart} has been processed and that
> the next
>> +     * {@link MessagePart} in the queue should be started. The final
>> +     * {@link MessagePart} will use the {@link EndMessageHandler}
> provided with
>> +     * the original {@link MessagePart}.
>> +     */
>> +    private static class IntermediateMessageHandler implements
> SendHandler {
>> +
>> +        private final WsRemoteEndpointImplBase endpoint;
>> +
>> +        public IntermediateMessageHandler(WsRemoteEndpointImplBase
> endpoint) {
>> +            this.endpoint = endpoint;
>> +        }
>> +
>> +
>> +        @Override
>> +        public void onResult(SendResult result) {
>> +            endpoint.endMessage(null, result);
>> +        }
>> +    }
>> +
>> +
>>      public void sendObject(Object obj) throws IOException {
>>          Future<Void> f = sendObjectByFuture(obj);
>>          try {
>>
>> Modified: tomcat/trunk/webapps/docs/changelog.xml
>> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1619738&r1=1619737&r2=1619738&view=diff
>>
> ==============================================================================
>> --- tomcat/trunk/webapps/docs/changelog.xml (original)
>> +++ tomcat/trunk/webapps/docs/changelog.xml Fri Aug 22 11:02:19 2014
>> @@ -54,6 +54,14 @@
>>        </fix>
>>      </changelog>
>>    </subsection>
>> +  <subsection name="WebSocket">
>> +    <changelog>
>> +      <add>
>> +        Extend support for the <code>permessage-deflate</code> extension
> to
>> +        compression of outgoing messages on the server side. (markt)
>> +      </add>
>> +    </changelog>
>> +  </subsection>
>>    <subsection name="Other">
>>      <changelog>
>>        <add>
>>
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
>> For additional commands, e-mail: dev-help@tomcat.apache.org
>>
> 


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Re: svn commit: r1619738 - in /tomcat/trunk: java/org/apache/tomcat/websocket/MessagePart.java java/org/apache/tomcat/websocket/PerMessageDeflate.java java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java webapps/docs/changelog.xml

Posted by Violeta Georgieva <mi...@gmail.com>.
Hi,

2014-08-22 14:02 GMT+03:00 <ma...@apache.org>:
>
> Author: markt
> Date: Fri Aug 22 11:02:19 2014
> New Revision: 1619738
>
> URL: http://svn.apache.org/r1619738
> Log:
> Extend support for the WebSocket permessage-deflate extension to
compression of outgoing messages on the server side.

I would like to back-port this to 7.0.x.
Wdyt?

Regards,
Violeta


> Modified:
>     tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java
>     tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java
>
tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
>     tomcat/trunk/webapps/docs/changelog.xml
>
> Modified: tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java
> URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java?rev=1619738&r1=1619737&r2=1619738&view=diff
>
==============================================================================
> --- tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java
(original)
> +++ tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java Fri
Aug 22 11:02:19 2014
> @@ -25,15 +25,17 @@ class MessagePart {
>      private final int rsv;
>      private final byte opCode;
>      private final ByteBuffer payload;
> -    private final SendHandler handler;
> +    private final SendHandler intermediateHandler;
> +    private volatile SendHandler endHandler;
>
>      public MessagePart( boolean fin, int rsv, byte opCode, ByteBuffer
payload,
> -            SendHandler handler) {
> +            SendHandler intermediateHandler, SendHandler endHandler) {
>          this.fin = fin;
>          this.rsv = rsv;
>          this.opCode = opCode;
>          this.payload = payload;
> -        this.handler = handler;
> +        this.intermediateHandler = intermediateHandler;
> +        this.endHandler = endHandler;
>      }
>
>
> @@ -57,8 +59,17 @@ class MessagePart {
>      }
>
>
> -    public SendHandler getHandler() {
> -        return handler;
> +    public SendHandler getIntermediateHandler() {
> +        return intermediateHandler;
> +    }
> +
> +
> +    public SendHandler getEndHandler() {
> +        return endHandler;
> +    }
> +
> +    public void setEndHandler(SendHandler endHandler) {
> +        this.endHandler = endHandler;
>      }
>  }
>
>
> Modified:
tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java
> URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java?rev=1619738&r1=1619737&r2=1619738&view=diff
>
==============================================================================
> --- tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java
(original)
> +++ tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java
Fri Aug 22 11:02:19 2014
> @@ -21,10 +21,12 @@ import java.nio.ByteBuffer;
>  import java.util.ArrayList;
>  import java.util.List;
>  import java.util.zip.DataFormatException;
> +import java.util.zip.Deflater;
>  import java.util.zip.Inflater;
>
>  import javax.websocket.Extension;
>  import javax.websocket.Extension.Parameter;
> +import javax.websocket.SendHandler;
>
>  import org.apache.tomcat.util.res.StringManager;
>
> @@ -47,10 +49,15 @@ public class PerMessageDeflate implement
>      private final boolean clientContextTakeover;
>      private final int clientMaxWindowBits;
>      private final Inflater inflater = new Inflater(true);
> -    private final ByteBuffer readBuffer = ByteBuffer.allocate(8192);
> +    private final ByteBuffer readBuffer =
ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
> +    private final Deflater deflater = new
Deflater(Deflater.DEFAULT_COMPRESSION, true);
>
>      private volatile Transformation next;
>      private volatile boolean skipDecompression = false;
> +    private volatile ByteBuffer writeBuffer =
ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
> +    private volatile boolean deflaterResetRequired = true;
> +    private volatile boolean firstCompressedFrameWritten = false;
> +    private volatile byte[] EOM_BUFFER = new byte[EOM_BYTES.length + 1];
>
>      static PerMessageDeflate negotiate(List<List<Parameter>>
preferences) {
>          // Accept the first preference that the server is able to support
> @@ -288,25 +295,143 @@ public class PerMessageDeflate implement
>
>
>      @Override
> -    public List<MessagePart> sendMessagePart(List<MessagePart>
messageParts) {
> -        List<MessagePart> compressedParts = new
ArrayList<>(messageParts.size());
> +    public List<MessagePart> sendMessagePart(List<MessagePart>
uncompressedParts) {
> +        List<MessagePart> allCompressedParts = new ArrayList<>();
>
> -        for (MessagePart messagePart : messageParts) {
> -            byte opCode = messagePart.getOpCode();
> +        for (MessagePart uncompressedPart : uncompressedParts) {
> +            byte opCode = uncompressedPart.getOpCode();
>              if (Util.isControl(opCode)) {
>                  // Control messages can appear in the middle of other
messages
>                  // and must not be compressed. Pass it straight through
> -                compressedParts.add(messagePart);
> +                allCompressedParts.add(uncompressedPart);
>              } else {
> -                // TODO: Implement compression of sent messages
> -                compressedParts.add(messagePart);
> +                List<MessagePart> compressedParts = new ArrayList<>();
> +                ByteBuffer uncompressedPayload =
uncompressedPart.getPayload();
> +                SendHandler uncompressedIntermediateHandler =
> +                        uncompressedPart.getIntermediateHandler();
> +
> +                // Need to reset the deflater at the start of every
message
> +                if (deflaterResetRequired) {
> +                    deflater.reset();
> +                    deflaterResetRequired = false;
> +                    firstCompressedFrameWritten = false;
> +                }
> +
> +                deflater.setInput(uncompressedPayload.array(),
> +                        uncompressedPayload.arrayOffset() +
uncompressedPayload.position(),
> +                        uncompressedPayload.remaining());
> +
> +                int flush = (uncompressedPart.isFin() ?
Deflater.SYNC_FLUSH : Deflater.NO_FLUSH);
> +                boolean deflateRequired = true;
> +
> +                while(deflateRequired) {
> +                    ByteBuffer compressedPayload = writeBuffer;
> +
> +                    int written =
deflater.deflate(compressedPayload.array(),
> +                            compressedPayload.arrayOffset() +
compressedPayload.position(),
> +                            compressedPayload.remaining(), flush);
> +
 compressedPayload.position(compressedPayload.position() + written);
> +
> +                    if (!uncompressedPart.isFin() &&
compressedPayload.hasRemaining() && deflater.needsInput()) {
> +                        // This message part has been fully processed by
the
> +                        // deflater. Fire the send handler for this
message part
> +                        // and move on to the next message part.
> +                        break;
> +                    }
> +
> +                    // If this point is reached, a new compressed
message part
> +                    // will be created...
> +                    MessagePart compressedPart;
> +
> +                    // .. and a new writeBuffer will be required.
> +                    writeBuffer =
ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
> +
> +                    // Flip the compressed payload ready for writing
> +                    compressedPayload.flip();
> +
> +                    boolean fin = uncompressedPart.isFin();
> +                    boolean full = compressedPayload.limit() ==
compressedPayload.capacity();
> +                    boolean needsInput = deflater.needsInput();
> +
> +                    if (fin && !full && needsInput) {
> +                        // End of compressed message. Drop EOM bytes and
output.
> +
 compressedPayload.limit(compressedPayload.limit() - EOM_BYTES.length);
> +                        compressedPart = new MessagePart(true,
getRsv(uncompressedPart),
> +                                opCode, compressedPayload,
uncompressedIntermediateHandler,
> +                                uncompressedIntermediateHandler);
> +                        deflaterResetRequired = true;
> +                        deflateRequired = false;
> +                    } else if (full && !needsInput) {
> +                        // Write buffer full and input message not fully
read.
> +                        // Output and start new compressed part.
> +                        compressedPart = new MessagePart(false,
getRsv(uncompressedPart),
> +                                opCode, compressedPayload,
uncompressedIntermediateHandler,
> +                                uncompressedIntermediateHandler);
> +                    } else if (!fin && full && needsInput) {
> +                        // Write buffer full and input message not fully
read.
> +                        // Output and get more data.
> +                        compressedPart = new MessagePart(false,
getRsv(uncompressedPart),
> +                                opCode, compressedPayload,
uncompressedIntermediateHandler,
> +                                uncompressedIntermediateHandler);
> +                        deflateRequired = false;
> +                    } else if (fin && full && needsInput) {
> +                        // Write buffer full. Input fully read. Deflater
may be
> +                        // in one of four states:
> +                        // - output complete (just happened to align
with end of
> +                        //   buffer
> +                        // - in middle of EOM bytes
> +                        // - about to write EOM bytes
> +                        // - more data to write
> +                        int eomBufferWritten =
deflater.deflate(EOM_BUFFER, 0, EOM_BUFFER.length, Deflater.SYNC_FLUSH);
> +                        if (eomBufferWritten < EOM_BUFFER.length) {
> +                            // EOM has just been completed
> +
 compressedPayload.limit(compressedPayload.limit() - EOM_BYTES.length +
eomBufferWritten);
> +                            compressedPart = new MessagePart(true,
> +                                    getRsv(uncompressedPart), opCode,
compressedPayload,
> +                                    uncompressedIntermediateHandler,
uncompressedIntermediateHandler);
> +                            deflaterResetRequired = true;
> +                            deflateRequired = false;
> +                        } else {
> +                            // More data to write
> +                            // Copy bytes to new write buffer
> +                            writeBuffer.put(EOM_BUFFER, 0,
eomBufferWritten);
> +                            compressedPart = new MessagePart(false,
> +                                    getRsv(uncompressedPart), opCode,
compressedPayload,
> +                                    uncompressedIntermediateHandler,
uncompressedIntermediateHandler);
> +                        }
> +                    } else {
> +                        throw new IllegalStateException("Should never
happen");
> +                    }
> +
> +                    // Add the newly created compressed part to the set
of parts
> +                    // to pass on to the next transformation.
> +                    compressedParts.add(compressedPart);
> +                }
> +
> +                SendHandler uncompressedEndHandler =
uncompressedPart.getEndHandler();
> +                int size = compressedParts.size();
> +                if (size > 0) {
> +                    compressedParts.get(size -
1).setEndHandler(uncompressedEndHandler);
> +                }
> +
> +                allCompressedParts.addAll(compressedParts);
>              }
>          }
>
>          if (next == null) {
> -            return compressedParts;
> +            return allCompressedParts;
>          } else {
> -            return next.sendMessagePart(compressedParts);
> +            return next.sendMessagePart(allCompressedParts);
>          }
>      }
> +
> +
> +    private int getRsv(MessagePart uncompressedMessagePart) {
> +        int result = uncompressedMessagePart.getRsv();
> +        if (!firstCompressedFrameWritten) {
> +            result += RSV_BITMASK;
> +            firstCompressedFrameWritten = true;
> +        }
> +        return result;
> +    }
>  }
>
> Modified:
tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
> URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java?rev=1619738&r1=1619737&r2=1619738&view=diff
>
==============================================================================
> ---
tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
(original)
> +++
tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
Fri Aug 22 11:02:19 2014
> @@ -61,6 +61,9 @@ public abstract class WsRemoteEndpointIm
>
>      private final StateMachine stateMachine = new StateMachine();
>
> +    private final IntermediateMessageHandler intermediateMessageHandler =
> +            new IntermediateMessageHandler(this);
> +
>      private Transformation transformation = null;
>      private boolean messagePartInProgress = false;
>      private final Queue<MessagePart> messagePartQueue = new
ArrayDeque<>();
> @@ -258,10 +261,19 @@ public abstract class WsRemoteEndpointIm
>
>          List<MessagePart> messageParts = new ArrayList<>();
>          messageParts.add(new MessagePart(last, 0, opCode, payload,
> +                intermediateMessageHandler,
>                  new EndMessageHandler(this, handler)));
>
>          messageParts = transformation.sendMessagePart(messageParts);
>
> +        // Some extensions/transformations may buffer messages so it is
possible
> +        // that no message parts will be returned. If this is the case
the
> +        // trigger the suppler SendHandler
> +        if (messageParts.size() == 0) {
> +            handler.onResult(new SendResult());
> +            return;
> +        }
> +
>          MessagePart mp = messageParts.remove(0);
>
>          boolean doWrite = false;
> @@ -329,12 +341,15 @@ public abstract class WsRemoteEndpointIm
>
>          wsSession.updateLastActive();
>
> -        handler.onResult(result);
> +        // Some handlers, such as the IntermediateMessageHandler, do not
have a
> +        // nested handler so handler may be null.
> +        if (handler != null) {
> +            handler.onResult(result);
> +        }
>      }
>
>
>      void writeMessagePart(MessagePart mp) {
> -
>          if (closed) {
>              throw new IllegalStateException(
>                      sm.getString("wsRemoteEndpoint.closed"));
> @@ -343,7 +358,7 @@ public abstract class WsRemoteEndpointIm
>          if (Constants.INTERNAL_OPCODE_FLUSH == mp.getOpCode()) {
>              nextFragmented = fragmented;
>              nextText = text;
> -            doWrite(mp.getHandler(), outputBuffer);
> +            doWrite(mp.getEndHandler(), outputBuffer);
>              return;
>          }
>
> @@ -397,14 +412,13 @@ public abstract class WsRemoteEndpointIm
>          if (getBatchingAllowed() || isMasked()) {
>              // Need to write via output buffer
>              OutputBufferSendHandler obsh = new OutputBufferSendHandler(
> -                    mp.getHandler(), headerBuffer, mp.getPayload(), mask,
> +                    mp.getEndHandler(), headerBuffer, mp.getPayload(),
mask,
>                      outputBuffer, !getBatchingAllowed(), this);
>              obsh.write();
>          } else {
>              // Can write directly
> -            doWrite(mp.getHandler(), headerBuffer, mp.getPayload());
> +            doWrite(mp.getEndHandler(), headerBuffer, mp.getPayload());
>          }
> -
>      }
>
>
> @@ -446,6 +460,31 @@ public abstract class WsRemoteEndpointIm
>      }
>
>
> +    /**
> +     * If a transformation needs to split a {@link MessagePart} into
multiple
> +     * {@link MessagePart}s, it uses this handler as the end handler for
each of
> +     * the additional {@link MessagePart}s. This handler notifies this
this
> +     * class that the {@link MessagePart} has been processed and that
the next
> +     * {@link MessagePart} in the queue should be started. The final
> +     * {@link MessagePart} will use the {@link EndMessageHandler}
provided with
> +     * the original {@link MessagePart}.
> +     */
> +    private static class IntermediateMessageHandler implements
SendHandler {
> +
> +        private final WsRemoteEndpointImplBase endpoint;
> +
> +        public IntermediateMessageHandler(WsRemoteEndpointImplBase
endpoint) {
> +            this.endpoint = endpoint;
> +        }
> +
> +
> +        @Override
> +        public void onResult(SendResult result) {
> +            endpoint.endMessage(null, result);
> +        }
> +    }
> +
> +
>      public void sendObject(Object obj) throws IOException {
>          Future<Void> f = sendObjectByFuture(obj);
>          try {
>
> Modified: tomcat/trunk/webapps/docs/changelog.xml
> URL:
http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1619738&r1=1619737&r2=1619738&view=diff
>
==============================================================================
> --- tomcat/trunk/webapps/docs/changelog.xml (original)
> +++ tomcat/trunk/webapps/docs/changelog.xml Fri Aug 22 11:02:19 2014
> @@ -54,6 +54,14 @@
>        </fix>
>      </changelog>
>    </subsection>
> +  <subsection name="WebSocket">
> +    <changelog>
> +      <add>
> +        Extend support for the <code>permessage-deflate</code> extension
to
> +        compression of outgoing messages on the server side. (markt)
> +      </add>
> +    </changelog>
> +  </subsection>
>    <subsection name="Other">
>      <changelog>
>        <add>
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
> For additional commands, e-mail: dev-help@tomcat.apache.org
>