You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by vi...@apache.org on 2014/09/17 12:37:11 UTC
svn commit: r1625506 - in /tomcat/tc7.0.x/trunk: ./
java/org/apache/tomcat/websocket/MessagePart.java
java/org/apache/tomcat/websocket/PerMessageDeflate.java
java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
webapps/docs/changelog.xml
Author: violetagg
Date: Wed Sep 17 10:37:10 2014
New Revision: 1625506
URL: http://svn.apache.org/r1625506
Log:
Merged revision 1619738 from tomcat/trunk:
Extend support for the WebSocket permessage-deflate extension to compression of outgoing messages on the server side.
Modified:
tomcat/tc7.0.x/trunk/ (props changed)
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/MessagePart.java
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml
Propchange: tomcat/tc7.0.x/trunk/
------------------------------------------------------------------------------
Merged /tomcat/trunk:r1619738
Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/MessagePart.java
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/MessagePart.java?rev=1625506&r1=1625505&r2=1625506&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/MessagePart.java (original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/MessagePart.java Wed Sep 17 10:37:10 2014
@@ -25,15 +25,17 @@ class MessagePart {
private final int rsv;
private final byte opCode;
private final ByteBuffer payload;
- private final SendHandler handler;
+ private final SendHandler intermediateHandler;
+ private volatile SendHandler endHandler;
public MessagePart( boolean fin, int rsv, byte opCode, ByteBuffer payload,
- SendHandler handler) {
+ SendHandler intermediateHandler, SendHandler endHandler) {
this.fin = fin;
this.rsv = rsv;
this.opCode = opCode;
this.payload = payload;
- this.handler = handler;
+ this.intermediateHandler = intermediateHandler;
+ this.endHandler = endHandler;
}
@@ -57,8 +59,17 @@ class MessagePart {
}
- public SendHandler getHandler() {
- return handler;
+ public SendHandler getIntermediateHandler() {
+ return intermediateHandler;
+ }
+
+
+ public SendHandler getEndHandler() {
+ return endHandler;
+ }
+
+ public void setEndHandler(SendHandler endHandler) {
+ this.endHandler = endHandler;
}
}
Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java?rev=1625506&r1=1625505&r2=1625506&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java (original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java Wed Sep 17 10:37:10 2014
@@ -21,10 +21,12 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.DataFormatException;
+import java.util.zip.Deflater;
import java.util.zip.Inflater;
import javax.websocket.Extension;
import javax.websocket.Extension.Parameter;
+import javax.websocket.SendHandler;
import org.apache.tomcat.util.res.StringManager;
@@ -47,10 +49,15 @@ public class PerMessageDeflate implement
private final boolean clientContextTakeover;
private final int clientMaxWindowBits;
private final Inflater inflater = new Inflater(true);
- private final ByteBuffer readBuffer = ByteBuffer.allocate(8192);
+ private final ByteBuffer readBuffer = ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
+ private final Deflater deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true);
private volatile Transformation next;
private volatile boolean skipDecompression = false;
+ private volatile ByteBuffer writeBuffer = ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
+ private volatile boolean deflaterResetRequired = true;
+ private volatile boolean firstCompressedFrameWritten = false;
+ private volatile byte[] EOM_BUFFER = new byte[EOM_BYTES.length + 1];
static PerMessageDeflate negotiate(List<List<Parameter>> preferences) {
// Accept the first preference that the server is able to support
@@ -288,25 +295,143 @@ public class PerMessageDeflate implement
@Override
- public List<MessagePart> sendMessagePart(List<MessagePart> messageParts) {
- List<MessagePart> compressedParts = new ArrayList<MessagePart>(messageParts.size());
+ public List<MessagePart> sendMessagePart(List<MessagePart> uncompressedParts) {
+ List<MessagePart> allCompressedParts = new ArrayList<MessagePart>();
- for (MessagePart messagePart : messageParts) {
- byte opCode = messagePart.getOpCode();
+ for (MessagePart uncompressedPart : uncompressedParts) {
+ byte opCode = uncompressedPart.getOpCode();
if (Util.isControl(opCode)) {
// Control messages can appear in the middle of other messages
// and must not be compressed. Pass it straight through
- compressedParts.add(messagePart);
+ allCompressedParts.add(uncompressedPart);
} else {
- // TODO: Implement compression of sent messages
- compressedParts.add(messagePart);
+ List<MessagePart> compressedParts = new ArrayList<MessagePart>();
+ ByteBuffer uncompressedPayload = uncompressedPart.getPayload();
+ SendHandler uncompressedIntermediateHandler =
+ uncompressedPart.getIntermediateHandler();
+
+ // Need to reset the deflater at the start of every message
+ if (deflaterResetRequired) {
+ deflater.reset();
+ deflaterResetRequired = false;
+ firstCompressedFrameWritten = false;
+ }
+
+ deflater.setInput(uncompressedPayload.array(),
+ uncompressedPayload.arrayOffset() + uncompressedPayload.position(),
+ uncompressedPayload.remaining());
+
+ int flush = (uncompressedPart.isFin() ? Deflater.SYNC_FLUSH : Deflater.NO_FLUSH);
+ boolean deflateRequired = true;
+
+ while(deflateRequired) {
+ ByteBuffer compressedPayload = writeBuffer;
+
+ int written = deflater.deflate(compressedPayload.array(),
+ compressedPayload.arrayOffset() + compressedPayload.position(),
+ compressedPayload.remaining(), flush);
+ compressedPayload.position(compressedPayload.position() + written);
+
+ if (!uncompressedPart.isFin() && compressedPayload.hasRemaining() && deflater.needsInput()) {
+ // This message part has been fully processed by the
+ // deflater. Fire the send handler for this message part
+ // and move on to the next message part.
+ break;
+ }
+
+ // If this point is reached, a new compressed message part
+ // will be created...
+ MessagePart compressedPart;
+
+ // .. and a new writeBuffer will be required.
+ writeBuffer = ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
+
+ // Flip the compressed payload ready for writing
+ compressedPayload.flip();
+
+ boolean fin = uncompressedPart.isFin();
+ boolean full = compressedPayload.limit() == compressedPayload.capacity();
+ boolean needsInput = deflater.needsInput();
+
+ if (fin && !full && needsInput) {
+ // End of compressed message. Drop EOM bytes and output.
+ compressedPayload.limit(compressedPayload.limit() - EOM_BYTES.length);
+ compressedPart = new MessagePart(true, getRsv(uncompressedPart),
+ opCode, compressedPayload, uncompressedIntermediateHandler,
+ uncompressedIntermediateHandler);
+ deflaterResetRequired = true;
+ deflateRequired = false;
+ } else if (full && !needsInput) {
+ // Write buffer full and input message not fully read.
+ // Output and start new compressed part.
+ compressedPart = new MessagePart(false, getRsv(uncompressedPart),
+ opCode, compressedPayload, uncompressedIntermediateHandler,
+ uncompressedIntermediateHandler);
+ } else if (!fin && full && needsInput) {
+ // Write buffer full and input message not fully read.
+ // Output and get more data.
+ compressedPart = new MessagePart(false, getRsv(uncompressedPart),
+ opCode, compressedPayload, uncompressedIntermediateHandler,
+ uncompressedIntermediateHandler);
+ deflateRequired = false;
+ } else if (fin && full && needsInput) {
+ // Write buffer full. Input fully read. Deflater may be
+ // in one of four states:
+ // - output complete (just happened to align with end of
+ // buffer
+ // - in middle of EOM bytes
+ // - about to write EOM bytes
+ // - more data to write
+ int eomBufferWritten = deflater.deflate(EOM_BUFFER, 0, EOM_BUFFER.length, Deflater.SYNC_FLUSH);
+ if (eomBufferWritten < EOM_BUFFER.length) {
+ // EOM has just been completed
+ compressedPayload.limit(compressedPayload.limit() - EOM_BYTES.length + eomBufferWritten);
+ compressedPart = new MessagePart(true,
+ getRsv(uncompressedPart), opCode, compressedPayload,
+ uncompressedIntermediateHandler, uncompressedIntermediateHandler);
+ deflaterResetRequired = true;
+ deflateRequired = false;
+ } else {
+ // More data to write
+ // Copy bytes to new write buffer
+ writeBuffer.put(EOM_BUFFER, 0, eomBufferWritten);
+ compressedPart = new MessagePart(false,
+ getRsv(uncompressedPart), opCode, compressedPayload,
+ uncompressedIntermediateHandler, uncompressedIntermediateHandler);
+ }
+ } else {
+ throw new IllegalStateException("Should never happen");
+ }
+
+ // Add the newly created compressed part to the set of parts
+ // to pass on to the next transformation.
+ compressedParts.add(compressedPart);
+ }
+
+ SendHandler uncompressedEndHandler = uncompressedPart.getEndHandler();
+ int size = compressedParts.size();
+ if (size > 0) {
+ compressedParts.get(size - 1).setEndHandler(uncompressedEndHandler);
+ }
+
+ allCompressedParts.addAll(compressedParts);
}
}
if (next == null) {
- return compressedParts;
+ return allCompressedParts;
} else {
- return next.sendMessagePart(compressedParts);
+ return next.sendMessagePart(allCompressedParts);
}
}
+
+
+ private int getRsv(MessagePart uncompressedMessagePart) {
+ int result = uncompressedMessagePart.getRsv();
+ if (!firstCompressedFrameWritten) {
+ result += RSV_BITMASK;
+ firstCompressedFrameWritten = true;
+ }
+ return result;
+ }
}
Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java?rev=1625506&r1=1625505&r2=1625506&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java (original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java Wed Sep 17 10:37:10 2014
@@ -61,6 +61,9 @@ public abstract class WsRemoteEndpointIm
private final StateMachine stateMachine = new StateMachine();
+ private final IntermediateMessageHandler intermediateMessageHandler =
+ new IntermediateMessageHandler(this);
+
private Transformation transformation = null;
private boolean messagePartInProgress = false;
private final Queue<MessagePart> messagePartQueue = new ArrayDeque<MessagePart>();
@@ -261,10 +264,19 @@ public abstract class WsRemoteEndpointIm
List<MessagePart> messageParts = new ArrayList<MessagePart>();
messageParts.add(new MessagePart(last, 0, opCode, payload,
+ intermediateMessageHandler,
new EndMessageHandler(this, handler)));
messageParts = transformation.sendMessagePart(messageParts);
+ // Some extensions/transformations may buffer messages so it is possible
+ // that no message parts will be returned. If this is the case the
+ // trigger the suppler SendHandler
+ if (messageParts.size() == 0) {
+ handler.onResult(new SendResult());
+ return;
+ }
+
MessagePart mp = messageParts.remove(0);
boolean doWrite = false;
@@ -332,12 +344,15 @@ public abstract class WsRemoteEndpointIm
wsSession.updateLastActive();
- handler.onResult(result);
+ // Some handlers, such as the IntermediateMessageHandler, do not have a
+ // nested handler so handler may be null.
+ if (handler != null) {
+ handler.onResult(result);
+ }
}
void writeMessagePart(MessagePart mp) {
-
if (closed) {
throw new IllegalStateException(
sm.getString("wsRemoteEndpoint.closed"));
@@ -346,7 +361,7 @@ public abstract class WsRemoteEndpointIm
if (Constants.INTERNAL_OPCODE_FLUSH == mp.getOpCode()) {
nextFragmented = fragmented;
nextText = text;
- doWrite(mp.getHandler(), outputBuffer);
+ doWrite(mp.getEndHandler(), outputBuffer);
return;
}
@@ -400,14 +415,13 @@ public abstract class WsRemoteEndpointIm
if (getBatchingAllowed() || isMasked()) {
// Need to write via output buffer
OutputBufferSendHandler obsh = new OutputBufferSendHandler(
- mp.getHandler(), headerBuffer, mp.getPayload(), mask,
+ mp.getEndHandler(), headerBuffer, mp.getPayload(), mask,
outputBuffer, !getBatchingAllowed(), this);
obsh.write();
} else {
// Can write directly
- doWrite(mp.getHandler(), headerBuffer, mp.getPayload());
+ doWrite(mp.getEndHandler(), headerBuffer, mp.getPayload());
}
-
}
@@ -449,6 +463,31 @@ public abstract class WsRemoteEndpointIm
}
+ /**
+ * If a transformation needs to split a {@link MessagePart} into multiple
+ * {@link MessagePart}s, it uses this handler as the end handler for each of
+ * the additional {@link MessagePart}s. This handler notifies this this
+ * class that the {@link MessagePart} has been processed and that the next
+ * {@link MessagePart} in the queue should be started. The final
+ * {@link MessagePart} will use the {@link EndMessageHandler} provided with
+ * the original {@link MessagePart}.
+ */
+ private static class IntermediateMessageHandler implements SendHandler {
+
+ private final WsRemoteEndpointImplBase endpoint;
+
+ public IntermediateMessageHandler(WsRemoteEndpointImplBase endpoint) {
+ this.endpoint = endpoint;
+ }
+
+
+ @Override
+ public void onResult(SendResult result) {
+ endpoint.endMessage(null, result);
+ }
+ }
+
+
public void sendObject(Object obj) throws IOException {
Future<Void> f = sendObjectByFuture(obj);
try {
Modified: tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml?rev=1625506&r1=1625505&r2=1625506&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml Wed Sep 17 10:37:10 2014
@@ -251,6 +251,10 @@
It is expected that support will be extended to outgoing messages and to
the client side shortly. (markt)
</add>
+ <add>
+ Extend support for the <code>permessage-deflate</code> extension to
+ compression of outgoing messages on the server side. (markt)
+ </add>
</changelog>
</subsection>
<subsection name="Web applications">
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org