You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2013/02/11 15:03:35 UTC
svn commit: r1444768 - in /tomcat/trunk/java/org/apache/tomcat/websocket:
Constants.java LocalStrings.properties WsRemoteEndpointBase.java
WsSession.java
Author: markt
Date: Mon Feb 11 14:03:35 2013
New Revision: 1444768
URL: http://svn.apache.org/r1444768
Log:
Implement new approach to locking
Modified:
tomcat/trunk/java/org/apache/tomcat/websocket/Constants.java
tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties
tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java
tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/Constants.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/Constants.java?rev=1444768&r1=1444767&r2=1444768&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/Constants.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/Constants.java Mon Feb 11 14:03:35 2013
@@ -31,6 +31,11 @@ public class Constants {
public static final byte OPCODE_PING = 0x09;
public static final byte OPCODE_PONG = 0x0A;
+ // Internal OP Codes
+ // RFC 6455 limits OP Codes to 4 bits so these should never clash
+ // Always set bit 4 so these will be treated as control codes
+ static final byte INTERNAL_OPCODE_FLUSH = 0x18;
+
// Client connection
public static final String HOST_HEADER_NAME = "Host";
public static final String UPGRADE_HEADER_NAME = "Upgrade";
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties?rev=1444768&r1=1444767&r2=1444768&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties Mon Feb 11 14:03:35 2013
@@ -13,10 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-messageSendStateMachine.changeType=When sending a fragmented message, all fragments bust be of the same type
-messageSendStateMachine.closed=Message will not be sent because the WebSocket session has been closed
-messageSendStateMachine.inProgress=Message will not be sent because the WebSocket session is currently sending another message
-
# Note the wsFrame.* messages are used as close reasons in WebSocket control
# frames and therefore must be 123 bytes (not characters) or less in length.
# Messages are encoded using UTF-8 where a single character may be encoded in
@@ -35,7 +31,10 @@ wsFrame.oneByteCloseCode=The client sent
wsFrame.textMessageTooBig=The decoded text message was too big for the output buffer and the endpoint does not support partial messages
wsFrame.wrongRsv=The client frame set the reserved bits to [{0}] which was not supported by this endpoint
+wsRemoteEndpoint.closed=Message will not be sent because the WebSocket session has been closed
+wsRemoteEndpoint.changeType=When sending a fragmented message, all fragments bust be of the same type
wsRemoteEndpoint.concurrentMessageSend=Messages may not be sent concurrently even when using the asynchronous send messages. The client must wait for the previous message to complete before sending the next.
+wsRemoteEndpoint.inProgress=Message will not be sent because the WebSocket session is currently sending another message
wsSession.duplicateHandlerBinary=A binary message handler has already been configured
wsSession.duplicateHandlerPong=A pong message handler has already been configured
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java?rev=1444768&r1=1444767&r2=1444768&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java Mon Feb 11 14:03:35 2013
@@ -24,14 +24,14 @@ import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CoderResult;
+import java.util.ArrayDeque;
+import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
import javax.websocket.EncodeException;
import javax.websocket.RemoteEndpoint;
@@ -45,10 +45,18 @@ public abstract class WsRemoteEndpointBa
private static final StringManager sm =
StringManager.getManager(Constants.PACKAGE_NAME);
- private final ReentrantLock writeLock = new ReentrantLock();
- private final Condition notInProgress = writeLock.newCondition();
- // Must hold writeLock above to modify state
- private final MessageSendStateMachine state = new MessageSendStateMachine();
+ private boolean messagePartInProgress = false;
+ private Queue<MessagePart> messagePartQueue = new ArrayDeque<>();
+ private final Object messagePartLock = new Object();
+ private boolean dataMessageInProgress = false;
+
+ // State
+ private boolean closed = false;
+ private boolean fragmented = false;
+ private boolean nextFragmented = false;
+ private boolean text = false;
+ private boolean nextText = false;
+
// Max size of WebSocket header is 14 bytes
private final ByteBuffer headerBuffer = ByteBuffer.allocate(14);
private final ByteBuffer outputBuffer = ByteBuffer.allocate(8192);
@@ -89,38 +97,17 @@ public abstract class WsRemoteEndpointBa
@Override
public void flushBatch() {
- // Have to hold lock to flush output buffer
- writeLock.lock();
try {
- while (state.isInProgress()) {
- notInProgress.await();
- }
- FutureToSendHandler f2sh = new FutureToSendHandler();
- doWrite(f2sh, outputBuffer);
- f2sh.get();
- } catch (InterruptedException | ExecutionException e) {
+ startMessageBlock(Constants.INTERNAL_OPCODE_FLUSH, null, true);
+ } catch (IOException e) {
// TODO Log this? Runtime exception? Something else?
- } finally {
- writeLock.unlock();
}
}
@Override
public void sendBytes(ByteBuffer data) throws IOException {
- Future<SendResult> f = sendBytesByFuture(data);
- try {
- SendResult sr = f.get();
- if (!sr.isOK()) {
- if (sr.getException() == null) {
- throw new IOException();
- } else {
- throw new IOException(sr.getException());
- }
- }
- } catch (InterruptedException | ExecutionException e) {
- throw new IOException(e);
- }
+ startMessageBlock(Constants.OPCODE_BINARY, data, true);
}
@@ -133,72 +120,35 @@ public abstract class WsRemoteEndpointBa
@Override
- public void sendBytesByCompletion(ByteBuffer data, SendHandler completion) {
- boolean locked = writeLock.tryLock();
- if (!locked) {
- throw new IllegalStateException(
- sm.getString("wsRemoteEndpoint.concurrentMessageSend"));
- }
- try {
- byte opCode = Constants.OPCODE_BINARY;
- boolean isLast = true;
- sendMessage(opCode, data, isLast, completion);
- } finally {
- writeLock.unlock();
- }
+ public void sendBytesByCompletion(ByteBuffer data, SendHandler handler) {
+ startMessage(Constants.OPCODE_BINARY, data, true, handler);
}
@Override
- public void sendPartialBytes(ByteBuffer partialByte, boolean isLast)
+ public void sendPartialBytes(ByteBuffer partialByte, boolean last)
throws IOException {
- boolean locked = writeLock.tryLock();
- if (!locked) {
- throw new IllegalStateException(
- sm.getString("wsRemoteEndpoint.concurrentMessageSend"));
- }
- try {
- byte opCode = Constants.OPCODE_BINARY;
- FutureToSendHandler f2sh = new FutureToSendHandler();
- sendMessage(opCode, partialByte, isLast, f2sh);
- f2sh.get();
- } catch (InterruptedException | ExecutionException e) {
- throw new IOException(e);
- } finally {
- writeLock.unlock();
- }
+ startMessageBlock(Constants.OPCODE_BINARY, partialByte, last);
}
@Override
public void sendPing(ByteBuffer applicationData) throws IOException,
IllegalArgumentException {
- sendControlMessage(Constants.OPCODE_PING, applicationData);
+ startMessageBlock(Constants.OPCODE_PING, applicationData, true);
}
@Override
public void sendPong(ByteBuffer applicationData) throws IOException,
IllegalArgumentException {
- sendControlMessage(Constants.OPCODE_PONG, applicationData);
+ startMessageBlock(Constants.OPCODE_PONG, applicationData, true);
}
@Override
public void sendString(String text) throws IOException {
- Future<SendResult> f = sendStringByFuture(text);
- try {
- SendResult sr = f.get();
- if (!sr.isOK()) {
- if (sr.getException() == null) {
- throw new IOException();
- } else {
- throw new IOException(sr.getException());
- }
- }
- } catch (InterruptedException | ExecutionException e) {
- throw new IOException(e);
- }
+ sendPartialString(CharBuffer.wrap(text), true);
}
@@ -211,19 +161,10 @@ public abstract class WsRemoteEndpointBa
@Override
- public void sendStringByCompletion(String text, SendHandler completion) {
- boolean locked = writeLock.tryLock();
- if (!locked) {
- throw new IllegalStateException(
- sm.getString("wsRemoteEndpoint.concurrentMessageSend"));
- }
- try {
- TextMessageSendHandler tmsh = new TextMessageSendHandler(completion,
- CharBuffer.wrap(text), true, encoder, encoderBuffer, this);
- tmsh.write();
- } finally {
- writeLock.unlock();
- }
+ public void sendStringByCompletion(String text, SendHandler handler) {
+ TextMessageSendHandler tmsh = new TextMessageSendHandler(handler,
+ CharBuffer.wrap(text), true, encoder, encoderBuffer, this);
+ tmsh.write();
}
@@ -251,70 +192,137 @@ public abstract class WsRemoteEndpointBa
-
- /**
- * Sends a control message, blocking until the message is sent.
- */
- void sendControlMessage(byte opCode, ByteBuffer payload)
- throws IOException{
-
- // Close needs to be sent so disable batching. This will flush any
- // messages in the buffer
- if (opCode == Constants.OPCODE_CLOSE) {
- setBatchingAllowed(false);
- }
-
- writeLock.lock();
+ void sendPartialString(CharBuffer part, boolean last) throws IOException {
try {
- if (state.isInProgress()) {
- notInProgress.await();
- }
FutureToSendHandler f2sh = new FutureToSendHandler();
- sendMessage(opCode, payload, true, f2sh);
+ TextMessageSendHandler tmsh = new TextMessageSendHandler(f2sh, part,
+ last, encoder, encoderBuffer, this);
+ tmsh.write();
f2sh.get();
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
- } finally {
- notInProgress.signal();
- writeLock.unlock();
}
}
- void sendPartialString(CharBuffer fragment, boolean isLast)
+ void startMessageBlock(byte opCode, ByteBuffer payload, boolean last)
throws IOException {
- boolean locked = writeLock.tryLock();
- if (!locked) {
- throw new IllegalStateException(
- sm.getString("wsRemoteEndpoint.concurrentMessageSend"));
- }
+ FutureToSendHandler f2sh = new FutureToSendHandler();
+ startMessage(opCode, payload, last, f2sh);
try {
- FutureToSendHandler f2sh = new FutureToSendHandler();
- TextMessageSendHandler tmsh = new TextMessageSendHandler(f2sh,
- fragment, isLast, encoder, encoderBuffer, this);
- tmsh.write();
- f2sh.get();
+ SendResult sr = f2sh.get();
+ if (!sr.isOK()) {
+ if (sr.getException() == null) {
+ throw new IOException();
+ } else {
+ throw new IOException(sr.getException());
+ }
+ }
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
- } finally {
- writeLock.unlock();
}
}
- private void sendMessage(byte opCode, ByteBuffer payload, boolean last,
- SendHandler completion) {
+ void startMessage(byte opCode, ByteBuffer payload, boolean last,
+ SendHandler handler) {
+ MessagePart mp = new MessagePart(opCode, payload, last, handler, this);
+
+ synchronized (messagePartLock) {
+ if (Constants.OPCODE_CLOSE == mp.getOpCode()) {
+ setBatchingAllowed(false);
+ }
+ if (messagePartInProgress) {
+ if (!Util.isControl(opCode)) {
+ if (dataMessageInProgress) {
+ throw new IllegalStateException(
+ sm.getString("wsRemoteEndpoint.inProgress"));
+ } else {
+ dataMessageInProgress = true;
+ }
+ }
+ messagePartQueue.add(mp);
+ } else {
+ messagePartInProgress = true;
+ writeMessagePart(mp);
+ }
+ }
+ }
+
+
+ void endMessage(SendHandler handler, SendResult result,
+ boolean dataMessage) {
+ synchronized (messagePartLock) {
+
+ if (closed) {
+ close();
+ } else {
+ fragmented = nextFragmented;
+ text = nextText;
+ }
+
+ if (dataMessage) {
+ dataMessageInProgress = false;
+ }
+ MessagePart mpNext = messagePartQueue.poll();
+ if (mpNext == null) {
+ messagePartInProgress = false;
+ } else {
+ writeMessagePart(mpNext);
+ }
+ }
+
+ handler.setResult(result);
+ }
+
- if (!writeLock.isHeldByCurrentThread()) {
- // Coding problem
+ void writeMessagePart(MessagePart mp) {
+
+ if (closed) {
throw new IllegalStateException(
- "Must hold writeLock before calling this method");
+ sm.getString("wsRemoteEndpoint.closed"));
}
- state.startMessage(opCode, last);
+ if (Constants.INTERNAL_OPCODE_FLUSH == mp.getOpCode()) {
+ nextFragmented = fragmented;
+ nextText = text;
+ doWrite(mp.getHandler(), outputBuffer);
+ return;
+ }
+
+ // Control messages may be sent in the middle of fragmented message
+ // so they have no effect on the fragmented or text flags
+ boolean first;
+ if (Util.isControl(mp.getOpCode())) {
+ nextFragmented = fragmented;
+ nextText = text;
+ if (mp.getOpCode() == Constants.OPCODE_CLOSE) {
+ closed = true;
+ }
+ first = true;
+ } else {
+ boolean isText = Util.isText(mp.getOpCode());
- SendMessageSendHandler smsh =
- new SendMessageSendHandler(state, completion, this);
+ if (fragmented) {
+ // Currently fragmented
+ if (text != isText) {
+ throw new IllegalStateException(
+ sm.getString("wsRemoteEndpoint.changeType"));
+ }
+ nextText = text;
+ nextFragmented = !mp.isLast();
+ first = false;
+ } else {
+ // Wasn't fragmented. Might be now
+ if (mp.isLast()) {
+ nextFragmented = false;
+ } else {
+ nextFragmented = true;
+ nextText = isText;
+ }
+ first = true;
+ }
+ }
byte[] mask;
@@ -325,33 +333,106 @@ public abstract class WsRemoteEndpointBa
}
headerBuffer.clear();
- writeHeader(headerBuffer, opCode, payload, state.isFirst(), last,
- isMasked(), mask);
+ writeHeader(headerBuffer, mp.getOpCode(), mp.getPayload(), first,
+ mp.isLast(), isMasked(), mask);
headerBuffer.flip();
if (getBatchingAllowed() || isMasked()) {
// Need to write via output buffer
OutputBufferSendHandler obsh = new OutputBufferSendHandler(
- smsh, headerBuffer, payload, mask, outputBuffer,
- !getBatchingAllowed(), this);
+ mp.getHandler(), headerBuffer, mp.getPayload(), mask,
+ outputBuffer, !getBatchingAllowed(), this);
obsh.write();
} else {
// Can write directly
- doWrite(smsh, headerBuffer, payload);
+ doWrite(mp.getHandler(), headerBuffer, mp.getPayload());
}
+
}
- private void endMessage() {
- writeLock.lock();
- try {
- notInProgress.signal();
- } finally {
- writeLock.unlock();
+ private static class MessagePart {
+ private final byte opCode;
+ private final ByteBuffer payload;
+ private final boolean last;
+ private final SendHandler handler;
+
+ public MessagePart(byte opCode, ByteBuffer payload, boolean last,
+ SendHandler handler, WsRemoteEndpointBase endpoint) {
+ this.opCode = opCode;
+ this.payload = payload;
+ this.last = last;
+ this.handler = new EndMessageHandler(
+ endpoint, handler, !Util.isControl(opCode));
+ }
+
+
+ public byte getOpCode() {
+ return opCode;
+ }
+
+
+ public ByteBuffer getPayload() {
+ return payload;
+ }
+
+
+ public boolean isLast() {
+ return last;
+ }
+
+
+ public SendHandler getHandler() {
+ return handler;
}
}
+ /**
+ * Wraps the user provided handler so that the end point is notified when
+ * the message is complete.
+ */
+ private static class EndMessageHandler implements SendHandler {
+
+ private final WsRemoteEndpointBase endpoint;
+ private final SendHandler handler;
+ private final boolean dataMessage;
+
+ public EndMessageHandler(WsRemoteEndpointBase endpoint,
+ SendHandler handler, boolean dataMessage) {
+ this.endpoint = endpoint;
+ this.handler = handler;
+ this.dataMessage = dataMessage;
+ }
+
+
+ @Override
+ public void setResult(SendResult result) {
+ endpoint.endMessage(handler, result, dataMessage);
+ }
+ }
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -439,81 +520,6 @@ public abstract class WsRemoteEndpointBa
}
- private static class MessageSendStateMachine {
- private boolean closed = false;
- private boolean inProgress = false;
- private boolean fragmented = false;
- private boolean text = false;
- private boolean first = false;
-
- private boolean nextFragmented = false;
- private boolean nextText = false;
-
- public synchronized void startMessage(byte opCode, boolean isLast) {
-
- if (closed) {
- throw new IllegalStateException(
- sm.getString("messageSendStateMachine.closed"));
- }
-
- if (inProgress) {
- throw new IllegalStateException(
- sm.getString("messageSendStateMachine.inProgress"));
- }
-
- inProgress = true;
-
- // Control messages may be sent in the middle of fragmented message
- // so they have no effect on the fragmented or text flags
- if (Util.isControl(opCode)) {
- nextFragmented = fragmented;
- nextText = text;
- if (opCode == Constants.OPCODE_CLOSE) {
- closed = true;
- }
- first = true;
- return;
- }
-
- boolean isText = Util.isText(opCode);
-
- if (fragmented) {
- // Currently fragmented
- if (text != isText) {
- throw new IllegalStateException(
- sm.getString("messageSendStateMachine.changeType"));
- }
- nextText = text;
- nextFragmented = !isLast;
- first = false;
- } else {
- // Wasn't fragmented. Might be now
- if (isLast) {
- nextFragmented = false;
- } else {
- nextFragmented = true;
- nextText = isText;
- }
- first = true;
- }
- }
-
- public synchronized void endMessage() {
- inProgress = false;
- fragmented = nextFragmented;
- text = nextText;
- }
-
- public synchronized boolean isInProgress() {
- return inProgress;
- }
-
- public synchronized boolean isFirst() {
- return first;
- }
- }
-
-
private static class TextMessageSendHandler implements SendHandler {
private final SendHandler handler;
@@ -543,7 +549,7 @@ public abstract class WsRemoteEndpointBa
}
isDone = !cr.isOverflow();
buffer.flip();
- endpoint.sendMessage(Constants.OPCODE_TEXT, buffer,
+ endpoint.startMessage(Constants.OPCODE_TEXT, buffer,
isDone && isLast, this);
}
@@ -559,35 +565,6 @@ public abstract class WsRemoteEndpointBa
/**
- * Wraps user provided {@link SendHandler} so that state is updated when
- * the message completes.
- */
- private static class SendMessageSendHandler implements SendHandler {
-
- private final MessageSendStateMachine state;
- private final SendHandler handler;
- private final WsRemoteEndpointBase endpoint;
-
- public SendMessageSendHandler(MessageSendStateMachine state,
- SendHandler handler, WsRemoteEndpointBase endpoint) {
- this.state = state;
- this.handler = handler;
- this.endpoint = endpoint;
- }
-
- @Override
- public void setResult(SendResult result) {
- state.endMessage();
- if (state.closed) {
- endpoint.close();
- }
- handler.setResult(result);
- endpoint.endMessage();
- }
- }
-
-
- /**
* Used to write data to the output buffer, flushing the buffer if it fills
* up.
*/
@@ -802,6 +779,7 @@ public abstract class WsRemoteEndpointBa
}
private void doWrite(boolean last) throws IOException {
+ buffer.flip();
endpoint.sendPartialString(buffer, last);
buffer.clear();
}
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java?rev=1444768&r1=1444767&r2=1444768&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java Mon Feb 11 14:03:35 2013
@@ -253,8 +253,8 @@ public class WsSession implements Sessio
}
msg.flip();
try {
- wsRemoteEndpoint.sendControlMessage(
- Constants.OPCODE_CLOSE, msg);
+ wsRemoteEndpoint.startMessageBlock(
+ Constants.OPCODE_CLOSE, msg, true);
} catch (IOException ioe) {
// Unable to send close message.
// TODO - Ignore?
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org