You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2014/11/20 07:38:58 UTC
svn commit: r1640690 - in /tomcat/tc7.0.x/trunk: ./
java/org/apache/tomcat/websocket/ test/org/apache/tomcat/websocket/
webapps/docs/
Author: markt
Date: Thu Nov 20 06:38:58 2014
New Revision: 1640690
URL: http://svn.apache.org/r1640690
Log:
Fix various problems identified with flushing batched messages:
- Flush triggered by disabling batching failed to flip buffer before writing and also failed to clear the buffer after writing was complete. This resulted in duplicated and/or corrupted messages.
- The flush triggered by session close was too late since no writes are permitted once the close process starts. This resulted in an exception being thrown.
Modified:
tomcat/tc7.0.x/trunk/ (props changed)
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsSession.java
tomcat/tc7.0.x/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java
tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml
Propchange: tomcat/tc7.0.x/trunk/
------------------------------------------------------------------------------
Merged /tomcat/trunk:r1640688
Merged /tomcat/tc8.0.x/trunk:r1640689
Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties?rev=1640690&r1=1640689&r2=1640690&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties (original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties Thu Nov 20 06:38:58 2014
@@ -68,7 +68,7 @@ wsRemoteEndpoint.closedOutputStream=This
wsRemoteEndpoint.closedWriter=This method may not be called as the Writer has been closed
wsRemoteEndpoint.changeType=When sending a fragmented message, all fragments bust be of the same type
wsRemoteEndpoint.concurrentMessageSend=Messages may not be sent concurrently even when using the asynchronous send messages. The client must wait for the previous message to complete before sending the next.
-wsRemoteEndpoint.flushOnCloseFailed=Flushing batched messages before closing the session failed
+wsRemoteEndpoint.flushOnCloseFailed=Batched messages still enabled after session has been closed. Unable to flush remaining batched message.
wsRemoteEndpoint.invalidEncoder=The specified encoder of type [{0}] could not be instantiated
wsRemoteEndpoint.noEncoder=No encoder specified for object of class [{0}]
wsRemoteEndpoint.wrongState=The remote endpoint was in state [{0}] which is an invalid state for called method
@@ -86,6 +86,7 @@ wsSession.duplicateHandlerBinary=A binar
wsSession.duplicateHandlerPong=A pong message handler has already been configured
wsSession.duplicateHandlerText=A text message handler has already been configured
wsSession.invalidHandlerTypePong=A pong message handler must implement MessageHandler.Basic
+wsSession.flushFailOnClose=Failed to flush batched messages on session close
wsSession.messageFailed=Unable to write the complete message as the WebSocket connection has been closed
wsSession.sendCloseFail=Failed to send close message to remote endpoint
wsSession.removeHandlerFailed=Unable to remove the handler [{0}] as it was not registered with this session
Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java?rev=1640690&r1=1640689&r2=1640690&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java (original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java Thu Nov 20 06:38:58 2014
@@ -305,13 +305,10 @@ public abstract class WsRemoteEndpointIm
boolean doWrite = false;
synchronized (messagePartLock) {
- if (Constants.OPCODE_CLOSE == mp.getOpCode()) {
- try {
- setBatchingAllowed(false);
- } catch (IOException e) {
- log.warn(sm.getString(
- "wsRemoteEndpoint.flushOnCloseFailed"), e);
- }
+ if (Constants.OPCODE_CLOSE == mp.getOpCode() && getBatchingAllowed()) {
+ // Should not happen. To late to send batched messages now since
+ // the session has been closed. Complain loudly.
+ log.warn(sm.getString("wsRemoteEndpoint.flushOnCloseFailed"));
}
if (messagePartInProgress) {
// When a control message is sent while another message is being
@@ -385,7 +382,10 @@ public abstract class WsRemoteEndpointIm
if (Constants.INTERNAL_OPCODE_FLUSH == mp.getOpCode()) {
nextFragmented = fragmented;
nextText = text;
- doWrite(mp.getEndHandler(), outputBuffer);
+ outputBuffer.flip();
+ SendHandler flushHandler = new OutputBufferFlushSendHandler(
+ outputBuffer, mp.getEndHandler());
+ doWrite(flushHandler, outputBuffer);
return;
}
@@ -866,6 +866,30 @@ public abstract class WsRemoteEndpointIm
}
}
+
+ /**
+ * Ensures that tne output buffer is cleared after it has been flushed.
+ */
+ private static class OutputBufferFlushSendHandler implements SendHandler {
+
+ private final ByteBuffer outputBuffer;
+ private final SendHandler handler;
+
+ public OutputBufferFlushSendHandler(ByteBuffer outputBuffer, SendHandler handler) {
+ this.outputBuffer = outputBuffer;
+ this.handler = handler;
+ }
+
+ @Override
+ public void onResult(SendResult result) {
+ if (result.isOK()) {
+ outputBuffer.clear();
+ }
+ handler.onResult(result);
+ }
+ }
+
+
private class WsOutputStream extends OutputStream {
private final WsRemoteEndpointImplBase endpoint;
Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsSession.java
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsSession.java?rev=1640690&r1=1640689&r2=1640690&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsSession.java (original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsSession.java Thu Nov 20 06:38:58 2014
@@ -436,6 +436,13 @@ public class WsSession implements Sessio
return;
}
+ try {
+ wsRemoteEndpoint.setBatchingAllowed(false);
+ } catch (IOException e) {
+ log.warn(sm.getString("wsSession.flushFailOnClose"), e);
+ fireEndpointOnError(e);
+ }
+
state = State.CLOSING;
sendCloseMessage(closeReasonMessage);
@@ -461,6 +468,12 @@ public class WsSession implements Sessio
synchronized (stateLock) {
if (state == State.OPEN) {
+ try {
+ wsRemoteEndpoint.setBatchingAllowed(false);
+ } catch (IOException e) {
+ log.warn(sm.getString("wsSession.flushFailOnClose"), e);
+ fireEndpointOnError(e);
+ }
sendCloseMessage(closeReason);
fireEndpointOnClose(closeReason);
state = State.CLOSED;
@@ -471,7 +484,6 @@ public class WsSession implements Sessio
}
}
-
private void fireEndpointOnClose(CloseReason closeReason) {
// Fire the onClose event
@@ -489,6 +501,21 @@ public class WsSession implements Sessio
}
+
+ private void fireEndpointOnError(Throwable throwable) {
+
+ // Fire the onError event
+ Thread t = Thread.currentThread();
+ ClassLoader cl = t.getContextClassLoader();
+ t.setContextClassLoader(applicationClassLoader);
+ try {
+ localEndpoint.onError(this, throwable);
+ } finally {
+ t.setContextClassLoader(cl);
+ }
+ }
+
+
private void sendCloseMessage(CloseReason closeReason) {
// 125 is maximum size for the payload of a control message
ByteBuffer msg = ByteBuffer.allocate(125);
Modified: tomcat/tc7.0.x/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java?rev=1640690&r1=1640689&r2=1640690&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java (original)
+++ tomcat/tc7.0.x/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java Thu Nov 20 06:38:58 2014
@@ -119,10 +119,14 @@ public class TesterFirehoseServer {
for (int i = 0; i < MESSAGE_COUNT; i++) {
remote.sendText(MESSAGE);
+ if (i % (MESSAGE_COUNT * 0.4) == 0) {
+ remote.setBatchingAllowed(false);
+ remote.setBatchingAllowed(true);
+ }
}
- // Ensure remaining messages are flushed
- remote.setBatchingAllowed(false);
+ // Flushing should happen automatically on session close
+ session.close();
}
@OnError
Modified: tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml?rev=1640690&r1=1640689&r2=1640690&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml Thu Nov 20 06:38:58 2014
@@ -78,6 +78,14 @@
</fix>
</changelog>
</subsection>
+ <subsection name="WebSocket">
+ <changelog>
+ <fix>
+ Correct multiple issues with the flushing of batched messages that could
+ lead to duplicate and/or corrupt messages. (markt)
+ </fix>
+ </changelog>
+ </subsection>
<subsection name="Web applications">
<changelog>
<fix>
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org