You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2015/02/27 16:00:39 UTC

svn commit: r1662698 - in /tomcat/trunk: java/org/apache/tomcat/websocket/ java/org/apache/tomcat/websocket/server/ test/org/apache/tomcat/websocket/

Author: markt
Date: Fri Feb 27 15:00:39 2015
New Revision: 1662698

URL: http://svn.apache.org/r1662698
Log:
Switch to using blocking writes directly

Modified:
    tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java
    tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java
    tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
    tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplClient.java
    tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
    tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java?rev=1662698&r1=1662697&r2=1662698&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java Fri Feb 27 15:00:39 2015
@@ -27,15 +27,18 @@ class MessagePart {
     private final ByteBuffer payload;
     private final SendHandler intermediateHandler;
     private volatile SendHandler endHandler;
+    private final long blockingWriteTimeoutExpiry;
 
     public MessagePart( boolean fin, int rsv, byte opCode, ByteBuffer payload,
-            SendHandler intermediateHandler, SendHandler endHandler) {
+            SendHandler intermediateHandler, SendHandler endHandler,
+            long blockingWriteTimeoutExpiry) {
         this.fin = fin;
         this.rsv = rsv;
         this.opCode = opCode;
         this.payload = payload;
         this.intermediateHandler = intermediateHandler;
         this.endHandler = endHandler;
+        this.blockingWriteTimeoutExpiry = blockingWriteTimeoutExpiry;
     }
 
 
@@ -71,6 +74,10 @@ class MessagePart {
     public void setEndHandler(SendHandler endHandler) {
         this.endHandler = endHandler;
     }
+
+    public long getBlockingWriteTimeoutExpiry() {
+        return blockingWriteTimeoutExpiry;
+    }
 }
 
 

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java?rev=1662698&r1=1662697&r2=1662698&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java Fri Feb 27 15:00:39 2015
@@ -362,13 +362,14 @@ public class PerMessageDeflate implement
                     boolean fin = uncompressedPart.isFin();
                     boolean full = compressedPayload.limit() == compressedPayload.capacity();
                     boolean needsInput = deflater.needsInput();
+                    long blockingWriteTimeoutExpiry = uncompressedPart.getBlockingWriteTimeoutExpiry();
 
                     if (fin && !full && needsInput) {
                         // End of compressed message. Drop EOM bytes and output.
                         compressedPayload.limit(compressedPayload.limit() - EOM_BYTES.length);
                         compressedPart = new MessagePart(true, getRsv(uncompressedPart),
                                 opCode, compressedPayload, uncompressedIntermediateHandler,
-                                uncompressedIntermediateHandler);
+                                uncompressedIntermediateHandler, blockingWriteTimeoutExpiry);
                         deflateRequired = false;
                         startNewMessage();
                     } else if (full && !needsInput) {
@@ -376,13 +377,13 @@ public class PerMessageDeflate implement
                         // Output and start new compressed part.
                         compressedPart = new MessagePart(false, getRsv(uncompressedPart),
                                 opCode, compressedPayload, uncompressedIntermediateHandler,
-                                uncompressedIntermediateHandler);
+                                uncompressedIntermediateHandler, blockingWriteTimeoutExpiry);
                     } else if (!fin && full && needsInput) {
                         // Write buffer full and input message not fully read.
                         // Output and get more data.
                         compressedPart = new MessagePart(false, getRsv(uncompressedPart),
                                 opCode, compressedPayload, uncompressedIntermediateHandler,
-                                uncompressedIntermediateHandler);
+                                uncompressedIntermediateHandler, blockingWriteTimeoutExpiry);
                         deflateRequired = false;
                     } else if (fin && full && needsInput) {
                         // Write buffer full. Input fully read. Deflater may be
@@ -398,7 +399,8 @@ public class PerMessageDeflate implement
                             compressedPayload.limit(compressedPayload.limit() - EOM_BYTES.length + eomBufferWritten);
                             compressedPart = new MessagePart(true,
                                     getRsv(uncompressedPart), opCode, compressedPayload,
-                                    uncompressedIntermediateHandler, uncompressedIntermediateHandler);
+                                    uncompressedIntermediateHandler, uncompressedIntermediateHandler,
+                                    blockingWriteTimeoutExpiry);
                             deflateRequired = false;
                             startNewMessage();
                         } else {
@@ -407,7 +409,8 @@ public class PerMessageDeflate implement
                             writeBuffer.put(EOM_BUFFER, 0, eomBufferWritten);
                             compressedPart = new MessagePart(false,
                                     getRsv(uncompressedPart), opCode, compressedPayload,
-                                    uncompressedIntermediateHandler, uncompressedIntermediateHandler);
+                                    uncompressedIntermediateHandler, uncompressedIntermediateHandler,
+                                    blockingWriteTimeoutExpiry);
                         }
                     } else {
                         throw new IllegalStateException("Should never happen");

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java?rev=1662698&r1=1662697&r2=1662698&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java Fri Feb 27 15:00:39 2015
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -57,6 +58,8 @@ public abstract class WsRemoteEndpointIm
     public static final String BLOCKING_SEND_TIMEOUT_PROPERTY =
             "org.apache.tomcat.websocket.BLOCKING_SEND_TIMEOUT";
 
+    protected static final SendResult SENDRESULT_OK = new SendResult();
+
     private final Log log = LogFactory.getLog(WsRemoteEndpointImplBase.class);
 
     private final StateMachine stateMachine = new StateMachine();
@@ -65,7 +68,7 @@ public abstract class WsRemoteEndpointIm
             new IntermediateMessageHandler(this);
 
     private Transformation transformation = null;
-    private boolean messagePartInProgress = false;
+    private final Semaphore messagePartInProgress = new Semaphore(1);
     private final Queue<MessagePart> messagePartQueue = new ArrayDeque<>();
     private final Object messagePartLock = new Object();
 
@@ -266,21 +269,53 @@ public abstract class WsRemoteEndpointIm
         // trigger a session close and depending on timing the client
         // session may close before we can read the timeout.
         long timeout = getBlockingSendTimeout();
-        FutureToSendHandler f2sh = new FutureToSendHandler(wsSession);
-        startMessage(opCode, payload, last, f2sh);
-        try {
-            if (timeout == -1) {
-                f2sh.get();
-            } else {
-                f2sh.get(timeout, TimeUnit.MILLISECONDS);
+        long timeoutExpiry;
+        if (timeout < 0) {
+            timeoutExpiry = Long.MAX_VALUE;
+        } else {
+            timeoutExpiry = System.currentTimeMillis() + timeout;
+        }
+
+        wsSession.updateLastActive();
+
+        BlockingSendHandler bsh = new BlockingSendHandler();
+
+        List<MessagePart> messageParts = new ArrayList<>();
+        messageParts.add(new MessagePart(last, 0, opCode, payload, bsh, bsh, timeoutExpiry));
+
+        messageParts = transformation.sendMessagePart(messageParts);
+
+        // Some extensions/transformations may buffer messages so it is possible
+        // that no message parts will be returned. If this is the case simply
+        // return.
+        if (messageParts.size() == 0) {
+            return;
+        }
+
+        synchronized (messagePartLock) {
+            try {
+                if (!messagePartInProgress.tryAcquire(timeout, TimeUnit.MILLISECONDS)) {
+                    // TODO i18n
+                    throw new IOException();
+                }
+            } catch (InterruptedException e) {
+                // TODO i18n
+                throw new IOException(e);
             }
-            if (payload != null) {
-                payload.clear();
+        }
+
+        for (MessagePart mp : messageParts) {
+            writeMessagePart(mp);
+            if (!bsh.getSendResult().isOK()) {
+                throw new IOException (bsh.getSendResult().getException());
             }
-        } catch (InterruptedException | ExecutionException |
-                TimeoutException e) {
-            throw new IOException(e);
         }
+
+        if (payload != null) {
+            payload.clear();
+        }
+
+        endMessage(null, null);
     }
 
 
@@ -292,7 +327,7 @@ public abstract class WsRemoteEndpointIm
         List<MessagePart> messageParts = new ArrayList<>();
         messageParts.add(new MessagePart(last, 0, opCode, payload,
                 intermediateMessageHandler,
-                new EndMessageHandler(this, handler)));
+                new EndMessageHandler(this, handler), -1));
 
         messageParts = transformation.sendMessagePart(messageParts);
 
@@ -313,7 +348,9 @@ public abstract class WsRemoteEndpointIm
                 // the session has been closed. Complain loudly.
                 log.warn(sm.getString("wsRemoteEndpoint.flushOnCloseFailed"));
             }
-            if (messagePartInProgress) {
+            if (messagePartInProgress.tryAcquire()) {
+                doWrite = true;
+            } else {
                 // When a control message is sent while another message is being
                 // sent, the control message is queued. Chances are the
                 // subsequent data message part will end up queued while the
@@ -324,9 +361,6 @@ public abstract class WsRemoteEndpointIm
 
                 // Add it to the queue
                 messagePartQueue.add(mp);
-            } else {
-                messagePartInProgress = true;
-                doWrite = true;
             }
             // Add any remaining messages to the queue
             messagePartQueue.addAll(messageParts);
@@ -350,7 +384,7 @@ public abstract class WsRemoteEndpointIm
 
             mpNext = messagePartQueue.poll();
             if (mpNext == null) {
-                messagePartInProgress = false;
+                messagePartInProgress.release();
             } else if (!closed){
                 // Session may have been closed unexpectedly in the middle of
                 // sending a fragmented message closing the endpoint. If this
@@ -388,7 +422,7 @@ public abstract class WsRemoteEndpointIm
             outputBuffer.flip();
             SendHandler flushHandler = new OutputBufferFlushSendHandler(
                     outputBuffer, mp.getEndHandler());
-            doWrite(flushHandler, outputBuffer);
+            doWrite(flushHandler, mp.getBlockingWriteTimeoutExpiry(), outputBuffer);
             return;
         }
 
@@ -442,12 +476,14 @@ public abstract class WsRemoteEndpointIm
         if (getBatchingAllowed() || isMasked()) {
             // Need to write via output buffer
             OutputBufferSendHandler obsh = new OutputBufferSendHandler(
-                    mp.getEndHandler(), headerBuffer, mp.getPayload(), mask,
+                    mp.getEndHandler(), mp.getBlockingWriteTimeoutExpiry(),
+                    headerBuffer, mp.getPayload(), mask,
                     outputBuffer, !getBatchingAllowed(), this);
             obsh.write();
         } else {
             // Can write directly
-            doWrite(mp.getEndHandler(), headerBuffer, mp.getPayload());
+            doWrite(mp.getEndHandler(), mp.getBlockingWriteTimeoutExpiry(),
+                    headerBuffer, mp.getPayload());
         }
     }
 
@@ -639,7 +675,8 @@ public abstract class WsRemoteEndpointIm
     }
 
 
-    protected abstract void doWrite(SendHandler handler, ByteBuffer... data);
+    protected abstract void doWrite(SendHandler handler, long blockingWrieTimeoutExpiry,
+            ByteBuffer... data);
     protected abstract boolean isMasked();
     protected abstract void doClose();
 
@@ -756,6 +793,7 @@ public abstract class WsRemoteEndpointIm
     private static class OutputBufferSendHandler implements SendHandler {
 
         private final SendHandler handler;
+        private final long blockingWriteTimeoutExpiry;
         private final ByteBuffer headerBuffer;
         private final ByteBuffer payload;
         private final byte[] mask;
@@ -765,9 +803,11 @@ public abstract class WsRemoteEndpointIm
         private int maskIndex = 0;
 
         public OutputBufferSendHandler(SendHandler completion,
+                long blockingWriteTimeoutExpiry,
                 ByteBuffer headerBuffer, ByteBuffer payload, byte[] mask,
                 ByteBuffer outputBuffer, boolean flushRequired,
                 WsRemoteEndpointImplBase endpoint) {
+            this.blockingWriteTimeoutExpiry = blockingWriteTimeoutExpiry;
             this.handler = completion;
             this.headerBuffer = headerBuffer;
             this.payload = payload;
@@ -785,7 +825,7 @@ public abstract class WsRemoteEndpointIm
             if (headerBuffer.hasRemaining()) {
                 // Still more headers to write, need to flush
                 outputBuffer.flip();
-                endpoint.doWrite(this, outputBuffer);
+                endpoint.doWrite(this, blockingWriteTimeoutExpiry, outputBuffer);
                 return;
             }
 
@@ -819,7 +859,7 @@ public abstract class WsRemoteEndpointIm
                 payload.limit(payloadLimit);
                 // Still more headers to write, need to flush
                 outputBuffer.flip();
-                endpoint.doWrite(this, outputBuffer);
+                endpoint.doWrite(this, blockingWriteTimeoutExpiry, outputBuffer);
                 return;
             }
 
@@ -828,7 +868,7 @@ public abstract class WsRemoteEndpointIm
                 if (outputBuffer.remaining() == 0) {
                     handler.onResult(new SendResult());
                 } else {
-                    endpoint.doWrite(this, outputBuffer);
+                    endpoint.doWrite(this, blockingWriteTimeoutExpiry, outputBuffer);
                 }
             } else {
                 handler.onResult(new SendResult());
@@ -840,7 +880,7 @@ public abstract class WsRemoteEndpointIm
         public void onResult(SendResult result) {
             if (result.isOK()) {
                 if (outputBuffer.hasRemaining()) {
-                    endpoint.doWrite(this, outputBuffer);
+                    endpoint.doWrite(this, blockingWriteTimeoutExpiry, outputBuffer);
                 } else {
                     outputBuffer.clear();
                     write();
@@ -1169,4 +1209,19 @@ public abstract class WsRemoteEndpointIm
             handler.onResult(result);
         }
     }
+
+
+    private static class BlockingSendHandler implements SendHandler {
+
+        private SendResult sendResult = null;
+
+        @Override
+        public void onResult(SendResult result) {
+            sendResult = result;
+        }
+
+        public SendResult getSendResult() {
+            return sendResult;
+        }
+    }
 }

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplClient.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplClient.java?rev=1662698&r1=1662697&r2=1662698&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplClient.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplClient.java Fri Feb 27 15:00:39 2015
@@ -16,10 +16,14 @@
  */
 package org.apache.tomcat.websocket;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import javax.websocket.SendHandler;
+import javax.websocket.SendResult;
 
 public class WsRemoteEndpointImplClient extends WsRemoteEndpointImplBase {
 
@@ -37,20 +41,31 @@ public class WsRemoteEndpointImplClient
 
 
     @Override
-    protected void doWrite(SendHandler handler, ByteBuffer... data) {
-        long timeout = getSendTimeout();
-        if (timeout < 1) {
-            timeout = Long.MAX_VALUE;
-
-        }
-        SendHandlerToCompletionHandler sh2ch =
-                new SendHandlerToCompletionHandler(handler);
-        try {
-            channel.write(data, 0, data.length, timeout, TimeUnit.MILLISECONDS,
-                    null, sh2ch);
-        } catch (IllegalStateException ise) {
-            sh2ch.failed(ise, null);
+    protected void doWrite(SendHandler handler, long blockingWriteTimeoutExpiry,
+            ByteBuffer... data) {
+        long timeout;
+        for (ByteBuffer byteBuffer : data) {
+            if (blockingWriteTimeoutExpiry == -1) {
+                timeout = getSendTimeout();
+                if (timeout < 1) {
+                    timeout = Long.MAX_VALUE;
+                }
+            } else {
+                timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis();
+                if (timeout < 0) {
+                    SendResult sr = new SendResult(new IOException("Blocking write timeout"));
+                    handler.onResult(sr);
+                }
+            }
+
+            try {
+                channel.write(byteBuffer).get(timeout, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException | ExecutionException | TimeoutException e) {
+                handler.onResult(new SendResult(e));
+                return;
+            }
         }
+        handler.onResult(SENDRESULT_OK);
     }
 
     @Override

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java?rev=1662698&r1=1662697&r2=1662698&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java Fri Feb 27 15:00:39 2015
@@ -72,12 +72,44 @@ public class WsRemoteEndpointImplServer
 
 
     @Override
-    protected void doWrite(SendHandler handler, ByteBuffer... buffers) {
-        this.handler = handler;
-        this.buffers = buffers;
-        // This is definitely the same thread that triggered the write so a
-        // dispatch will be required.
-        onWritePossible(true);
+    protected void doWrite(SendHandler handler, long blockingWriteTimeoutExpiry,
+            ByteBuffer... buffers) {
+        if (blockingWriteTimeoutExpiry == -1) {
+            this.handler = handler;
+            this.buffers = buffers;
+            // This is definitely the same thread that triggered the write so a
+            // dispatch will be required.
+            onWritePossible(true);
+        } else {
+            // Blocking
+            for (ByteBuffer buffer : buffers) {
+                long timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis();
+                if (timeout < 0) {
+                    // TODO i18n
+                    SendResult sr = new SendResult(new IOException("Blocking write timeout"));
+                    handler.onResult(sr);
+                    return;
+                }
+                socketWrapper.setWriteTimeout(timeout);
+                try {
+                    socketWrapper.write(true, buffer.array(), buffer.arrayOffset(),
+                                    buffer.limit());
+                    timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis();
+                    if (timeout < 0) {
+                        // TODO i18n
+                        SendResult sr = new SendResult(new IOException("Blocking write timeout"));
+                        handler.onResult(sr);
+                        return;
+                    }
+                    socketWrapper.setWriteTimeout(timeout);
+                    socketWrapper.flush(true);
+                    handler.onResult(SENDRESULT_OK);
+                } catch (IOException e) {
+                    SendResult sr = new SendResult(e);
+                    handler.onResult(sr);
+                }
+            }
+        }
     }
 
 

Modified: tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java?rev=1662698&r1=1662697&r2=1662698&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java (original)
+++ tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java Fri Feb 27 15:00:39 2015
@@ -343,9 +343,9 @@ public class TestWsWebSocketContainer ex
         Exception exception = null;
         try {
             while (true) {
+                lastSend = System.currentTimeMillis();
                 Future<Void> f = wsSession.getAsyncRemote().sendBinary(
                         ByteBuffer.wrap(MESSAGE_BINARY_4K));
-                lastSend = System.currentTimeMillis();
                 f.get();
             }
         } catch (Exception e) {
@@ -354,8 +354,8 @@ public class TestWsWebSocketContainer ex
 
         long timeout = System.currentTimeMillis() - lastSend;
 
-        // Clear the server side block and prevent and further blocks to allow
-        // the server to shutdown cleanly
+        // Clear the server side block and prevent further blocks to allow the
+        // server to shutdown cleanly
         BlockingPojo.clearBlock();
 
         String msg = "Time out was [" + timeout + "] ms";



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org