You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2015/02/27 16:00:39 UTC
svn commit: r1662698 - in /tomcat/trunk: java/org/apache/tomcat/websocket/
java/org/apache/tomcat/websocket/server/ test/org/apache/tomcat/websocket/
Author: markt
Date: Fri Feb 27 15:00:39 2015
New Revision: 1662698
URL: http://svn.apache.org/r1662698
Log:
Switch to using blocking writes directly
Modified:
tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java
tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java
tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplClient.java
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java?rev=1662698&r1=1662697&r2=1662698&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java Fri Feb 27 15:00:39 2015
@@ -27,15 +27,18 @@ class MessagePart {
private final ByteBuffer payload;
private final SendHandler intermediateHandler;
private volatile SendHandler endHandler;
+ private final long blockingWriteTimeoutExpiry;
public MessagePart( boolean fin, int rsv, byte opCode, ByteBuffer payload,
- SendHandler intermediateHandler, SendHandler endHandler) {
+ SendHandler intermediateHandler, SendHandler endHandler,
+ long blockingWriteTimeoutExpiry) {
this.fin = fin;
this.rsv = rsv;
this.opCode = opCode;
this.payload = payload;
this.intermediateHandler = intermediateHandler;
this.endHandler = endHandler;
+ this.blockingWriteTimeoutExpiry = blockingWriteTimeoutExpiry;
}
@@ -71,6 +74,10 @@ class MessagePart {
public void setEndHandler(SendHandler endHandler) {
this.endHandler = endHandler;
}
+
+ public long getBlockingWriteTimeoutExpiry() {
+ return blockingWriteTimeoutExpiry;
+ }
}
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java?rev=1662698&r1=1662697&r2=1662698&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java Fri Feb 27 15:00:39 2015
@@ -362,13 +362,14 @@ public class PerMessageDeflate implement
boolean fin = uncompressedPart.isFin();
boolean full = compressedPayload.limit() == compressedPayload.capacity();
boolean needsInput = deflater.needsInput();
+ long blockingWriteTimeoutExpiry = uncompressedPart.getBlockingWriteTimeoutExpiry();
if (fin && !full && needsInput) {
// End of compressed message. Drop EOM bytes and output.
compressedPayload.limit(compressedPayload.limit() - EOM_BYTES.length);
compressedPart = new MessagePart(true, getRsv(uncompressedPart),
opCode, compressedPayload, uncompressedIntermediateHandler,
- uncompressedIntermediateHandler);
+ uncompressedIntermediateHandler, blockingWriteTimeoutExpiry);
deflateRequired = false;
startNewMessage();
} else if (full && !needsInput) {
@@ -376,13 +377,13 @@ public class PerMessageDeflate implement
// Output and start new compressed part.
compressedPart = new MessagePart(false, getRsv(uncompressedPart),
opCode, compressedPayload, uncompressedIntermediateHandler,
- uncompressedIntermediateHandler);
+ uncompressedIntermediateHandler, blockingWriteTimeoutExpiry);
} else if (!fin && full && needsInput) {
// Write buffer full and input message not fully read.
// Output and get more data.
compressedPart = new MessagePart(false, getRsv(uncompressedPart),
opCode, compressedPayload, uncompressedIntermediateHandler,
- uncompressedIntermediateHandler);
+ uncompressedIntermediateHandler, blockingWriteTimeoutExpiry);
deflateRequired = false;
} else if (fin && full && needsInput) {
// Write buffer full. Input fully read. Deflater may be
@@ -398,7 +399,8 @@ public class PerMessageDeflate implement
compressedPayload.limit(compressedPayload.limit() - EOM_BYTES.length + eomBufferWritten);
compressedPart = new MessagePart(true,
getRsv(uncompressedPart), opCode, compressedPayload,
- uncompressedIntermediateHandler, uncompressedIntermediateHandler);
+ uncompressedIntermediateHandler, uncompressedIntermediateHandler,
+ blockingWriteTimeoutExpiry);
deflateRequired = false;
startNewMessage();
} else {
@@ -407,7 +409,8 @@ public class PerMessageDeflate implement
writeBuffer.put(EOM_BUFFER, 0, eomBufferWritten);
compressedPart = new MessagePart(false,
getRsv(uncompressedPart), opCode, compressedPayload,
- uncompressedIntermediateHandler, uncompressedIntermediateHandler);
+ uncompressedIntermediateHandler, uncompressedIntermediateHandler,
+ blockingWriteTimeoutExpiry);
}
} else {
throw new IllegalStateException("Should never happen");
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java?rev=1662698&r1=1662697&r2=1662698&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java Fri Feb 27 15:00:39 2015
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -57,6 +58,8 @@ public abstract class WsRemoteEndpointIm
public static final String BLOCKING_SEND_TIMEOUT_PROPERTY =
"org.apache.tomcat.websocket.BLOCKING_SEND_TIMEOUT";
+ protected static final SendResult SENDRESULT_OK = new SendResult();
+
private final Log log = LogFactory.getLog(WsRemoteEndpointImplBase.class);
private final StateMachine stateMachine = new StateMachine();
@@ -65,7 +68,7 @@ public abstract class WsRemoteEndpointIm
new IntermediateMessageHandler(this);
private Transformation transformation = null;
- private boolean messagePartInProgress = false;
+ private final Semaphore messagePartInProgress = new Semaphore(1);
private final Queue<MessagePart> messagePartQueue = new ArrayDeque<>();
private final Object messagePartLock = new Object();
@@ -266,21 +269,53 @@ public abstract class WsRemoteEndpointIm
// trigger a session close and depending on timing the client
// session may close before we can read the timeout.
long timeout = getBlockingSendTimeout();
- FutureToSendHandler f2sh = new FutureToSendHandler(wsSession);
- startMessage(opCode, payload, last, f2sh);
- try {
- if (timeout == -1) {
- f2sh.get();
- } else {
- f2sh.get(timeout, TimeUnit.MILLISECONDS);
+ long timeoutExpiry;
+ if (timeout < 0) {
+ timeoutExpiry = Long.MAX_VALUE;
+ } else {
+ timeoutExpiry = System.currentTimeMillis() + timeout;
+ }
+
+ wsSession.updateLastActive();
+
+ BlockingSendHandler bsh = new BlockingSendHandler();
+
+ List<MessagePart> messageParts = new ArrayList<>();
+ messageParts.add(new MessagePart(last, 0, opCode, payload, bsh, bsh, timeoutExpiry));
+
+ messageParts = transformation.sendMessagePart(messageParts);
+
+ // Some extensions/transformations may buffer messages so it is possible
+ // that no message parts will be returned. If this is the case simply
+ // return.
+ if (messageParts.size() == 0) {
+ return;
+ }
+
+ synchronized (messagePartLock) {
+ try {
+ if (!messagePartInProgress.tryAcquire(timeout, TimeUnit.MILLISECONDS)) {
+ // TODO i18n
+ throw new IOException();
+ }
+ } catch (InterruptedException e) {
+ // TODO i18n
+ throw new IOException(e);
}
- if (payload != null) {
- payload.clear();
+ }
+
+ for (MessagePart mp : messageParts) {
+ writeMessagePart(mp);
+ if (!bsh.getSendResult().isOK()) {
+ throw new IOException (bsh.getSendResult().getException());
}
- } catch (InterruptedException | ExecutionException |
- TimeoutException e) {
- throw new IOException(e);
}
+
+ if (payload != null) {
+ payload.clear();
+ }
+
+ endMessage(null, null);
}
@@ -292,7 +327,7 @@ public abstract class WsRemoteEndpointIm
List<MessagePart> messageParts = new ArrayList<>();
messageParts.add(new MessagePart(last, 0, opCode, payload,
intermediateMessageHandler,
- new EndMessageHandler(this, handler)));
+ new EndMessageHandler(this, handler), -1));
messageParts = transformation.sendMessagePart(messageParts);
@@ -313,7 +348,9 @@ public abstract class WsRemoteEndpointIm
// the session has been closed. Complain loudly.
log.warn(sm.getString("wsRemoteEndpoint.flushOnCloseFailed"));
}
- if (messagePartInProgress) {
+ if (messagePartInProgress.tryAcquire()) {
+ doWrite = true;
+ } else {
// When a control message is sent while another message is being
// sent, the control message is queued. Chances are the
// subsequent data message part will end up queued while the
@@ -324,9 +361,6 @@ public abstract class WsRemoteEndpointIm
// Add it to the queue
messagePartQueue.add(mp);
- } else {
- messagePartInProgress = true;
- doWrite = true;
}
// Add any remaining messages to the queue
messagePartQueue.addAll(messageParts);
@@ -350,7 +384,7 @@ public abstract class WsRemoteEndpointIm
mpNext = messagePartQueue.poll();
if (mpNext == null) {
- messagePartInProgress = false;
+ messagePartInProgress.release();
} else if (!closed){
// Session may have been closed unexpectedly in the middle of
// sending a fragmented message closing the endpoint. If this
@@ -388,7 +422,7 @@ public abstract class WsRemoteEndpointIm
outputBuffer.flip();
SendHandler flushHandler = new OutputBufferFlushSendHandler(
outputBuffer, mp.getEndHandler());
- doWrite(flushHandler, outputBuffer);
+ doWrite(flushHandler, mp.getBlockingWriteTimeoutExpiry(), outputBuffer);
return;
}
@@ -442,12 +476,14 @@ public abstract class WsRemoteEndpointIm
if (getBatchingAllowed() || isMasked()) {
// Need to write via output buffer
OutputBufferSendHandler obsh = new OutputBufferSendHandler(
- mp.getEndHandler(), headerBuffer, mp.getPayload(), mask,
+ mp.getEndHandler(), mp.getBlockingWriteTimeoutExpiry(),
+ headerBuffer, mp.getPayload(), mask,
outputBuffer, !getBatchingAllowed(), this);
obsh.write();
} else {
// Can write directly
- doWrite(mp.getEndHandler(), headerBuffer, mp.getPayload());
+ doWrite(mp.getEndHandler(), mp.getBlockingWriteTimeoutExpiry(),
+ headerBuffer, mp.getPayload());
}
}
@@ -639,7 +675,8 @@ public abstract class WsRemoteEndpointIm
}
- protected abstract void doWrite(SendHandler handler, ByteBuffer... data);
+ protected abstract void doWrite(SendHandler handler, long blockingWrieTimeoutExpiry,
+ ByteBuffer... data);
protected abstract boolean isMasked();
protected abstract void doClose();
@@ -756,6 +793,7 @@ public abstract class WsRemoteEndpointIm
private static class OutputBufferSendHandler implements SendHandler {
private final SendHandler handler;
+ private final long blockingWriteTimeoutExpiry;
private final ByteBuffer headerBuffer;
private final ByteBuffer payload;
private final byte[] mask;
@@ -765,9 +803,11 @@ public abstract class WsRemoteEndpointIm
private int maskIndex = 0;
public OutputBufferSendHandler(SendHandler completion,
+ long blockingWriteTimeoutExpiry,
ByteBuffer headerBuffer, ByteBuffer payload, byte[] mask,
ByteBuffer outputBuffer, boolean flushRequired,
WsRemoteEndpointImplBase endpoint) {
+ this.blockingWriteTimeoutExpiry = blockingWriteTimeoutExpiry;
this.handler = completion;
this.headerBuffer = headerBuffer;
this.payload = payload;
@@ -785,7 +825,7 @@ public abstract class WsRemoteEndpointIm
if (headerBuffer.hasRemaining()) {
// Still more headers to write, need to flush
outputBuffer.flip();
- endpoint.doWrite(this, outputBuffer);
+ endpoint.doWrite(this, blockingWriteTimeoutExpiry, outputBuffer);
return;
}
@@ -819,7 +859,7 @@ public abstract class WsRemoteEndpointIm
payload.limit(payloadLimit);
// Still more headers to write, need to flush
outputBuffer.flip();
- endpoint.doWrite(this, outputBuffer);
+ endpoint.doWrite(this, blockingWriteTimeoutExpiry, outputBuffer);
return;
}
@@ -828,7 +868,7 @@ public abstract class WsRemoteEndpointIm
if (outputBuffer.remaining() == 0) {
handler.onResult(new SendResult());
} else {
- endpoint.doWrite(this, outputBuffer);
+ endpoint.doWrite(this, blockingWriteTimeoutExpiry, outputBuffer);
}
} else {
handler.onResult(new SendResult());
@@ -840,7 +880,7 @@ public abstract class WsRemoteEndpointIm
public void onResult(SendResult result) {
if (result.isOK()) {
if (outputBuffer.hasRemaining()) {
- endpoint.doWrite(this, outputBuffer);
+ endpoint.doWrite(this, blockingWriteTimeoutExpiry, outputBuffer);
} else {
outputBuffer.clear();
write();
@@ -1169,4 +1209,19 @@ public abstract class WsRemoteEndpointIm
handler.onResult(result);
}
}
+
+
+ private static class BlockingSendHandler implements SendHandler {
+
+ private SendResult sendResult = null;
+
+ @Override
+ public void onResult(SendResult result) {
+ sendResult = result;
+ }
+
+ public SendResult getSendResult() {
+ return sendResult;
+ }
+ }
}
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplClient.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplClient.java?rev=1662698&r1=1662697&r2=1662698&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplClient.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplClient.java Fri Feb 27 15:00:39 2015
@@ -16,10 +16,14 @@
*/
package org.apache.tomcat.websocket;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.websocket.SendHandler;
+import javax.websocket.SendResult;
public class WsRemoteEndpointImplClient extends WsRemoteEndpointImplBase {
@@ -37,20 +41,31 @@ public class WsRemoteEndpointImplClient
@Override
- protected void doWrite(SendHandler handler, ByteBuffer... data) {
- long timeout = getSendTimeout();
- if (timeout < 1) {
- timeout = Long.MAX_VALUE;
-
- }
- SendHandlerToCompletionHandler sh2ch =
- new SendHandlerToCompletionHandler(handler);
- try {
- channel.write(data, 0, data.length, timeout, TimeUnit.MILLISECONDS,
- null, sh2ch);
- } catch (IllegalStateException ise) {
- sh2ch.failed(ise, null);
+ protected void doWrite(SendHandler handler, long blockingWriteTimeoutExpiry,
+ ByteBuffer... data) {
+ long timeout;
+ for (ByteBuffer byteBuffer : data) {
+ if (blockingWriteTimeoutExpiry == -1) {
+ timeout = getSendTimeout();
+ if (timeout < 1) {
+ timeout = Long.MAX_VALUE;
+ }
+ } else {
+ timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis();
+ if (timeout < 0) {
+ SendResult sr = new SendResult(new IOException("Blocking write timeout"));
+ handler.onResult(sr);
+ }
+ }
+
+ try {
+ channel.write(byteBuffer).get(timeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ handler.onResult(new SendResult(e));
+ return;
+ }
}
+ handler.onResult(SENDRESULT_OK);
}
@Override
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java?rev=1662698&r1=1662697&r2=1662698&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java Fri Feb 27 15:00:39 2015
@@ -72,12 +72,44 @@ public class WsRemoteEndpointImplServer
@Override
- protected void doWrite(SendHandler handler, ByteBuffer... buffers) {
- this.handler = handler;
- this.buffers = buffers;
- // This is definitely the same thread that triggered the write so a
- // dispatch will be required.
- onWritePossible(true);
+ protected void doWrite(SendHandler handler, long blockingWriteTimeoutExpiry,
+ ByteBuffer... buffers) {
+ if (blockingWriteTimeoutExpiry == -1) {
+ this.handler = handler;
+ this.buffers = buffers;
+ // This is definitely the same thread that triggered the write so a
+ // dispatch will be required.
+ onWritePossible(true);
+ } else {
+ // Blocking
+ for (ByteBuffer buffer : buffers) {
+ long timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis();
+ if (timeout < 0) {
+ // TODO i18n
+ SendResult sr = new SendResult(new IOException("Blocking write timeout"));
+ handler.onResult(sr);
+ return;
+ }
+ socketWrapper.setWriteTimeout(timeout);
+ try {
+ socketWrapper.write(true, buffer.array(), buffer.arrayOffset(),
+ buffer.limit());
+ timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis();
+ if (timeout < 0) {
+ // TODO i18n
+ SendResult sr = new SendResult(new IOException("Blocking write timeout"));
+ handler.onResult(sr);
+ return;
+ }
+ socketWrapper.setWriteTimeout(timeout);
+ socketWrapper.flush(true);
+ handler.onResult(SENDRESULT_OK);
+ } catch (IOException e) {
+ SendResult sr = new SendResult(e);
+ handler.onResult(sr);
+ }
+ }
+ }
}
Modified: tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java?rev=1662698&r1=1662697&r2=1662698&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java (original)
+++ tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java Fri Feb 27 15:00:39 2015
@@ -343,9 +343,9 @@ public class TestWsWebSocketContainer ex
Exception exception = null;
try {
while (true) {
+ lastSend = System.currentTimeMillis();
Future<Void> f = wsSession.getAsyncRemote().sendBinary(
ByteBuffer.wrap(MESSAGE_BINARY_4K));
- lastSend = System.currentTimeMillis();
f.get();
}
} catch (Exception e) {
@@ -354,8 +354,8 @@ public class TestWsWebSocketContainer ex
long timeout = System.currentTimeMillis() - lastSend;
- // Clear the server side block and prevent and further blocks to allow
- // the server to shutdown cleanly
+ // Clear the server side block and prevent further blocks to allow the
+ // server to shutdown cleanly
BlockingPojo.clearBlock();
String msg = "Time out was [" + timeout + "] ms";
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org