You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2013/01/30 20:51:48 UTC

svn commit: r1440622 - in /tomcat/trunk: java/javax/websocket/ java/org/apache/catalina/loader/ java/org/apache/tomcat/websocket/ java/org/apache/tomcat/websocket/server/ test/org/apache/tomcat/websocket/

Author: markt
Date: Wed Jan 30 19:51:47 2013
New Revision: 1440622

URL: http://svn.apache.org/viewvc?rev=1440622&view=rev
Log:
Implement client and server async timeout
Note this doesn't work for BIO as it always blocks and only pretends to do async writes.

Added:
    tomcat/trunk/java/org/apache/tomcat/websocket/server/WsListener.java   (with props)
    tomcat/trunk/java/org/apache/tomcat/websocket/server/WsTimeout.java   (with props)
Modified:
    tomcat/trunk/java/javax/websocket/RemoteEndpoint.java
    tomcat/trunk/java/javax/websocket/WebSocketContainer.java
    tomcat/trunk/java/org/apache/catalina/loader/WebappClassLoader.java
    tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java
    tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointClient.java
    tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java
    tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java
    tomcat/trunk/java/org/apache/tomcat/websocket/server/ServerContainerImpl.java
    tomcat/trunk/java/org/apache/tomcat/websocket/server/WsProtocolHandler.java
    tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointServer.java
    tomcat/trunk/java/org/apache/tomcat/websocket/server/WsSci.java
    tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java

Modified: tomcat/trunk/java/javax/websocket/RemoteEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/javax/websocket/RemoteEndpoint.java?rev=1440622&r1=1440621&r2=1440622&view=diff
==============================================================================
--- tomcat/trunk/java/javax/websocket/RemoteEndpoint.java (original)
+++ tomcat/trunk/java/javax/websocket/RemoteEndpoint.java Wed Jan 30 19:51:47 2013
@@ -28,7 +28,20 @@ public interface RemoteEndpoint {
     void setBatchingAllowed(boolean batchingAllowed);
     boolean getBatchingAllowed();
     void flushBatch();
+
+    /**
+     * Obtain the timeout (in milliseconds) for sending a message
+     * asynchronously. A non-positive value means an infinite timeout. The
+     * default value is determined by
+     * {@link WebSocketContainer#getDefaultAsyncSendTimeout()}.
+     */
     long getAsyncSendTimeout();
+
+    /**
+     * Set the timeout (in milliseconds) for sending a message asynchronously. A
+     * non-positive value means an infinite timeout. The default value is
+     * determined by {@link WebSocketContainer#getDefaultAsyncSendTimeout()}.
+     */
     void setAsyncSendTimeout(long timeout);
 
     /**

Modified: tomcat/trunk/java/javax/websocket/WebSocketContainer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/javax/websocket/WebSocketContainer.java?rev=1440622&r1=1440621&r2=1440622&view=diff
==============================================================================
--- tomcat/trunk/java/javax/websocket/WebSocketContainer.java (original)
+++ tomcat/trunk/java/javax/websocket/WebSocketContainer.java Wed Jan 30 19:51:47 2013
@@ -21,8 +21,16 @@ import java.util.Set;
 
 public interface WebSocketContainer {
 
+    /**
+     * Obtain the default timeout (in milliseconds) for sending a message
+     * asynchronously. A non-positive value means an infinite timeout.
+     */
     long getDefaultAsyncSendTimeout();
 
+    /**
+     * Set the default timeout (in milliseconds) for sending a message
+     * asynchronously. A non-positive value means an infinite timeout.
+     */
     void setAsyncSendTimeout(long timeout);
 
     Session connectToServer(Class<?> annotatedEndpointClass, URI path)

Modified: tomcat/trunk/java/org/apache/catalina/loader/WebappClassLoader.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/loader/WebappClassLoader.java?rev=1440622&r1=1440621&r2=1440622&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/loader/WebappClassLoader.java (original)
+++ tomcat/trunk/java/org/apache/catalina/loader/WebappClassLoader.java Wed Jan 30 19:51:47 2013
@@ -2233,18 +2233,19 @@ public class WebappClassLoader
             Object[] table = (Object[]) internalTableField.get(map);
             if (table != null) {
                 for (int j =0; j < table.length; j++) {
-                    if (table[j] != null) {
+                    Object obj = table[j];
+                    if (obj != null) {
                         boolean potentialLeak = false;
                         // Check the key
-                        Object key = ((Reference<?>) table[j]).get();
+                        Object key = ((Reference<?>) obj).get();
                         if (this.equals(key) || loadedByThisOrChild(key)) {
                             potentialLeak = true;
                         }
                         // Check the value
                         Field valueField =
-                            table[j].getClass().getDeclaredField("value");
+                                obj.getClass().getDeclaredField("value");
                         valueField.setAccessible(true);
-                        Object value = valueField.get(table[j]);
+                        Object value = valueField.get(obj);
                         if (this.equals(value) || loadedByThisOrChild(value)) {
                             potentialLeak = true;
                         }

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java?rev=1440622&r1=1440621&r2=1440622&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java Wed Jan 30 19:51:47 2013
@@ -49,41 +49,42 @@ public abstract class WsRemoteEndpointBa
     private final AtomicBoolean toBytesInProgress = new AtomicBoolean(false);
     private final CharsetEncoder encoder = Charset.forName("UTF8").newEncoder();
     private final MessageSendStateMachine state = new MessageSendStateMachine();
+
+    private volatile long asyncSendTimeout = -1;
+
     // Max length for WebSocket frame header is 14 bytes
     protected final ByteBuffer header = ByteBuffer.allocate(14);
     protected ByteBuffer payload = null;
 
 
     @Override
-    public void setBatchingAllowed(boolean batchingAllowed) {
-        // TODO Auto-generated method stub
-
+    public long getAsyncSendTimeout() {
+        return asyncSendTimeout;
     }
 
 
     @Override
-    public boolean getBatchingAllowed() {
-        // TODO Auto-generated method stub
-        return false;
+    public void setAsyncSendTimeout(long timeout) {
+        this.asyncSendTimeout = timeout;
     }
 
 
     @Override
-    public void flushBatch() {
+    public void setBatchingAllowed(boolean batchingAllowed) {
         // TODO Auto-generated method stub
 
     }
 
 
     @Override
-    public long getAsyncSendTimeout() {
+    public boolean getBatchingAllowed() {
         // TODO Auto-generated method stub
-        return 0;
+        return false;
     }
 
 
     @Override
-    public void setAsyncSendTimeout(long timeout) {
+    public void flushBatch() {
         // TODO Auto-generated method stub
 
     }

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointClient.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointClient.java?rev=1440622&r1=1440621&r2=1440622&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointClient.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointClient.java Wed Jan 30 19:51:47 2013
@@ -38,8 +38,13 @@ public class WsRemoteEndpointClient exte
 
     @Override
     protected void sendMessage(WsCompletionHandler handler) {
-        channel.write(new ByteBuffer[] {header, payload}, 0, 2, Long.MAX_VALUE,
-                TimeUnit.DAYS, null, handler);
+        long timeout = getAsyncSendTimeout();
+        if (timeout < 1) {
+            timeout = Long.MAX_VALUE;
+
+        }
+        channel.write(new ByteBuffer[] {header, payload}, 0, 2,
+                getAsyncSendTimeout(), TimeUnit.MILLISECONDS, null, handler);
     }
 
     @Override

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java?rev=1440622&r1=1440621&r2=1440622&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java Wed Jan 30 19:51:47 2013
@@ -76,6 +76,8 @@ public class WsSession implements Sessio
         this.wsRemoteEndpoint = wsRemoteEndpoint;
         this.webSocketContainer = webSocketContainer;
         applicationClassLoader = Thread.currentThread().getContextClassLoader();
+        wsRemoteEndpoint.setAsyncSendTimeout(
+                webSocketContainer.getDefaultAsyncSendTimeout());
     }
 
 

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java?rev=1440622&r1=1440621&r2=1440622&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java Wed Jan 30 19:51:47 2013
@@ -54,6 +54,7 @@ public class WsWebSocketContainer implem
     private static final byte[] crlf = new byte[] {13, 10};
     private static final int defaultBufferSize = 8 * 1024;
 
+    private long defaultAsyncTimeout = -1;
     private int binaryBufferSize = defaultBufferSize;
     private int textBufferSize = defaultBufferSize;
 
@@ -399,16 +400,25 @@ public class WsWebSocketContainer implem
     }
 
 
+    /**
+     * {@inheritDoc}
+     *
+     * The default value for this implementation is -1.
+     */
     @Override
     public long getDefaultAsyncSendTimeout() {
-        // TODO Auto-generated method stub
-        return 0;
+        return defaultAsyncTimeout;
     }
 
 
+    /**
+     * {@inheritDoc}
+     *
+     * The default value for this implementation is -1.
+     */
     @Override
     public void setAsyncSendTimeout(long timeout) {
-        // TODO Auto-generated method stub
+        this.defaultAsyncTimeout = timeout;
     }
 
     private static class WsHandshakeResponse implements HandshakeResponse {

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/ServerContainerImpl.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/ServerContainerImpl.java?rev=1440622&r1=1440621&r2=1440622&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/server/ServerContainerImpl.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/server/ServerContainerImpl.java Wed Jan 30 19:51:47 2013
@@ -70,6 +70,8 @@ public class ServerContainerImpl extends
         return result;
     }
 
+    private final WsTimeout wsTimeout;
+    private final Thread timeoutThread;
 
     private volatile ServletContext servletContext = null;
     private Map<String,ServerEndpointConfiguration> configMap =
@@ -80,11 +82,18 @@ public class ServerContainerImpl extends
 
 
     private ServerContainerImpl() {
-        // Hide default constructor
+        wsTimeout = new WsTimeout();
+        timeoutThread = new Thread(wsTimeout);
+        timeoutThread.setName(WsTimeout.THREAD_NAME_PREFIX + this);
+        timeoutThread.start();
     }
 
 
     public void setServletContext(ServletContext servletContext) {
+        if (this.servletContext == servletContext) {
+            return;
+        }
+
         this.servletContext = servletContext;
 
         // Configure servlet context wide defaults
@@ -99,6 +108,10 @@ public class ServerContainerImpl extends
         if (value != null) {
             setMaxTextMessageBufferSize(Long.parseLong(value));
         }
+
+        // Update the timeout thread name
+        timeoutThread.setName(
+                WsTimeout.THREAD_NAME_PREFIX + servletContext.getContextPath());
     }
 
 
@@ -212,6 +225,25 @@ public class ServerContainerImpl extends
     }
 
 
+    protected WsTimeout getTimeout() {
+        return wsTimeout;
+    }
+
+
+    protected void stop() {
+        wsTimeout.stop();
+        int count = 0;
+        while (count < 50 && timeoutThread.isAlive()) {
+            count ++;
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                // Ignore
+            }
+        }
+    }
+
+
     /**
      * Converts a path defined for a WebSocket endpoint into a path that can be
      * used as a servlet mapping.

Added: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsListener.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsListener.java?rev=1440622&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsListener.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsListener.java Wed Jan 30 19:51:47 2013
@@ -0,0 +1,41 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.tomcat.websocket.server;
+
+import javax.servlet.ServletContextEvent;
+import javax.servlet.ServletContextListener;
+
+/**
+ * This will be added automatically to a {@link javax.servlet.ServletContext} by
+ * the {@link WsSci}. If the {@link WsSci} is disabled, this listener must be
+ * added manually to every {@link javax.servlet.ServletContext} that uses
+ * WebSocket.
+ */
+public class WsListener implements ServletContextListener {
+
+    @Override
+    public void contextInitialized(ServletContextEvent sce) {
+        ServerContainerImpl sc = ServerContainerImpl.getServerContainer();
+        sc.setServletContext(sce.getServletContext());
+    }
+
+    @Override
+    public void contextDestroyed(ServletContextEvent sce) {
+        ServerContainerImpl sc = ServerContainerImpl.getServerContainer();
+        sc.stop();
+    }
+}

Propchange: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsProtocolHandler.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsProtocolHandler.java?rev=1440622&r1=1440621&r2=1440622&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsProtocolHandler.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsProtocolHandler.java Wed Jan 30 19:51:47 2013
@@ -29,7 +29,6 @@ import javax.websocket.CloseReason;
 import javax.websocket.CloseReason.CloseCodes;
 import javax.websocket.Endpoint;
 import javax.websocket.EndpointConfiguration;
-import javax.websocket.WebSocketContainer;
 
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
@@ -50,14 +49,14 @@ public class WsProtocolHandler implement
     private final Endpoint ep;
     private final EndpointConfiguration endpointConfig;
     private final ClassLoader applicationClassLoader;
-    private final WebSocketContainer webSocketContainer;
+    private final ServerContainerImpl webSocketContainer;
 
     private WsSession wsSession;
 
 
     public WsProtocolHandler(Endpoint ep,
             EndpointConfiguration endpointConfig,
-            WebSocketContainer wsc) {
+            ServerContainerImpl wsc) {
         this.ep = ep;
         this.endpointConfig = endpointConfig;
         this.webSocketContainer = wsc;
@@ -84,7 +83,7 @@ public class WsProtocolHandler implement
         t.setContextClassLoader(applicationClassLoader);
         try {
             WsRemoteEndpointServer wsRemoteEndpointServer =
-                    new WsRemoteEndpointServer(sos);
+                    new WsRemoteEndpointServer(sos, webSocketContainer);
             wsSession = new WsSession(
                     ep, wsRemoteEndpointServer, webSocketContainer);
             WsFrameServer wsFrame = new WsFrameServer(

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointServer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointServer.java?rev=1440622&r1=1440621&r2=1440622&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointServer.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointServer.java Wed Jan 30 19:51:47 2013
@@ -17,6 +17,7 @@
 package org.apache.tomcat.websocket.server;
 
 import java.io.IOException;
+import java.net.SocketTimeoutException;
 
 import javax.servlet.ServletOutputStream;
 
@@ -38,15 +39,19 @@ public class WsRemoteEndpointServer exte
             LogFactory.getLog(WsProtocolHandler.class);
 
     private final ServletOutputStream sos;
+    private final WsTimeout wsTimeout;
     private volatile WsCompletionHandler handler = null;
+    private volatile long timeoutExpiry = -1;
     private volatile boolean close;
     private volatile Long size = null;
     private volatile boolean headerWritten = false;
     private volatile boolean payloadWritten = false;
 
 
-    public WsRemoteEndpointServer(ServletOutputStream sos) {
+    public WsRemoteEndpointServer(ServletOutputStream sos,
+            ServerContainerImpl serverContainer) {
         this.sos = sos;
+        this.wsTimeout = serverContainer.getTimeout();
     }
 
 
@@ -79,19 +84,29 @@ public class WsRemoteEndpointServer exte
                     sos.write(payload.array(), payload.arrayOffset(),
                             payload.limit());
                 } else {
+                    wsTimeout.unregister(this);
                     if (close) {
-                        sos.close();
+                        close();
                     }
                     handler.completed(size, null);
-                    size = null;
-                    handler = null;
-                    headerWritten = false;
-                    payloadWritten = false;
+                    nextWrite();
                     break;
                 }
             }
         } catch (IOException ioe) {
+            wsTimeout.unregister(this);
+            close();
             handler.failed(ioe, null);
+            nextWrite();
+        }
+        if (handler != null) {
+            // Async write is in progress
+
+            timeoutExpiry = getAsyncSendTimeout() + System.currentTimeMillis();
+            if (timeoutExpiry > 0) {
+                // Register with timeout thread
+                wsTimeout.register(this);
+            }
         }
     }
 
@@ -105,5 +120,26 @@ public class WsRemoteEndpointServer exte
                 log.info(sm.getString("wsRemoteEndpointServer.closeFailed"), e);
             }
         }
+        wsTimeout.unregister(this);
+    }
+
+
+    protected long getTimeoutExpiry() {
+        return timeoutExpiry;
+    }
+
+
+    protected void onTimeout() {
+        close();
+        handler.failed(new SocketTimeoutException(), null);
+        nextWrite();
+    }
+
+
+    private void nextWrite() {
+        handler = null;
+        size = null;
+        headerWritten = false;
+        payloadWritten = false;
     }
 }

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsSci.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsSci.java?rev=1440622&r1=1440621&r2=1440622&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsSci.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsSci.java Wed Jan 30 19:51:47 2013
@@ -35,12 +35,14 @@ public class WsSci implements ServletCon
     @Override
     public void onStartup(Set<Class<?>> clazzes, ServletContext ctx)
             throws ServletException {
-        // Need to configure the ServletContext in all cases
-        ServerContainerImpl sc = ServerContainerImpl.getServerContainer();
-        sc.setServletContext(ctx);
+
+        ctx.addListener(WsListener.class);
+
         if (clazzes == null || clazzes.size() == 0) {
             return;
         }
+
+        ServerContainerImpl sc = ServerContainerImpl.getServerContainer();
         for (Class<?> clazz : clazzes) {
             WebSocketEndpoint annotation =
                     clazz.getAnnotation(WebSocketEndpoint.class);

Added: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsTimeout.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsTimeout.java?rev=1440622&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsTimeout.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsTimeout.java Wed Jan 30 19:51:47 2013
@@ -0,0 +1,108 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.tomcat.websocket.server;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+/**
+ * Provides timeouts for asynchronous web socket writes. On the server side we
+ * only have access to {@link javax.servlet.ServletOutputStream} and
+ * {@link javax.servlet.ServletInputStream} so there is no way to set a timeout
+ * for writes to the client. Hence the separate thread.
+ */
+public class WsTimeout implements Runnable {
+
+    public static final String THREAD_NAME_PREFIX = "Websocket Timeout - ";
+
+    private final Set<WsRemoteEndpointServer> endpoints =
+            new ConcurrentSkipListSet<>(new EndpointComparator());
+    private volatile boolean running = true;
+
+    public void stop() {
+        running = false;
+        synchronized (this) {
+            this.notify();
+        }
+    }
+
+
+    @Override
+    public void run() {
+        while (running) {
+            // Wait for one second - no need for timeouts more frequently than
+            // that
+            synchronized (this) {
+                try {
+                    wait(1000);
+                } catch (InterruptedException e) {
+                    // Ignore
+                }
+            }
+
+            long now = System.currentTimeMillis();
+            Iterator<WsRemoteEndpointServer> iter = endpoints.iterator();
+            while (iter.hasNext()) {
+                WsRemoteEndpointServer endpoint = iter.next();
+                if (endpoint.getTimeoutExpiry() < now) {
+                    System.out.println(now);
+                    endpoint.onTimeout();
+                } else {
+                    // Endpoints are ordered by timeout expiry so we reach this
+                    // point there is no need to check the remaining endpoints
+                    break;
+                }
+            }
+        }
+    }
+
+
+    public void register(WsRemoteEndpointServer endpoint) {
+        endpoints.add(endpoint);
+    }
+
+
+    public void unregister(WsRemoteEndpointServer endpoint) {
+        endpoints.remove(endpoint);
+    }
+
+
+    /**
+     * Note: this comparator imposes orderings that are inconsistent with equals
+     */
+    private static class EndpointComparator implements
+            Comparator<WsRemoteEndpointServer> {
+
+        @Override
+        public int compare(WsRemoteEndpointServer o1,
+                WsRemoteEndpointServer o2) {
+
+            long t1 = o1.getTimeoutExpiry();
+            long t2 = o2.getTimeoutExpiry();
+
+            if (t1 < t2) {
+                return -1;
+            } else if (t1 == t2) {
+                return 0;
+            } else {
+                return 1;
+            }
+        }
+    }
+}

Propchange: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsTimeout.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java?rev=1440622&r1=1440621&r2=1440622&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java (original)
+++ tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java Wed Jan 30 19:51:47 2013
@@ -22,6 +22,8 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import javax.servlet.ServletContextEvent;
@@ -29,12 +31,15 @@ import javax.servlet.ServletContextListe
 import javax.websocket.CloseReason;
 import javax.websocket.ContainerProvider;
 import javax.websocket.DefaultClientConfiguration;
+import javax.websocket.DeploymentException;
 import javax.websocket.Endpoint;
 import javax.websocket.EndpointConfiguration;
 import javax.websocket.MessageHandler;
+import javax.websocket.SendResult;
 import javax.websocket.Session;
 import javax.websocket.WebSocketContainer;
 import javax.websocket.WebSocketMessage;
+import javax.websocket.server.DefaultServerConfiguration;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -42,7 +47,9 @@ import org.junit.Test;
 import org.apache.catalina.Context;
 import org.apache.catalina.startup.Tomcat;
 import org.apache.catalina.startup.TomcatBaseTest;
+import org.apache.coyote.http11.Http11Protocol;
 import org.apache.tomcat.websocket.server.ServerContainerImpl;
+import org.apache.tomcat.websocket.server.WsListener;
 
 public class TestWsWebSocketContainer extends TomcatBaseTest {
 
@@ -50,6 +57,8 @@ public class TestWsWebSocketContainer ex
     private static final String MESSAGE_TEXT_4K;
     private static final byte[] MESSAGE_BINARY_4K = new byte[4096];
 
+    private static final long TIMEOUT_MS = 5 * 1000;
+
     static {
         StringBuilder sb = new StringBuilder(4096);
         for (int i = 0; i < 4096; i++) {
@@ -246,6 +255,161 @@ public class TestWsWebSocketContainer ex
         }
     }
 
+
+    @Test
+    public void testTimeoutClientContainer() throws Exception {
+        doTestTimeoutClient(true);
+    }
+
+
+    @Test
+    public void testTimeoutClientEndpoint() throws Exception {
+        doTestTimeoutClient(false);
+    }
+
+
+    private void doTestTimeoutClient(boolean setTimeoutOnContainer)
+            throws Exception {
+
+        Tomcat tomcat = getTomcatInstance();
+        // Must have a real docBase - just use temp
+        Context ctx =
+            tomcat.addContext("", System.getProperty("java.io.tmpdir"));
+        ctx.addApplicationListener(BlockingConfig.class.getName());
+
+        WebSocketContainer wsContainer = ContainerProvider.getClientContainer();
+
+        // Reset client buffer size as client container is retained between
+        // tests
+        wsContainer.setMaxBinaryMessageBufferSize(8192);
+        wsContainer.setMaxTextMessageBufferSize(8192);
+
+
+        // Set the async timeout
+        if (setTimeoutOnContainer) {
+            wsContainer.setAsyncSendTimeout(TIMEOUT_MS);
+        }
+
+        tomcat.start();
+
+        Session wsSession = wsContainer.connectToServer(TesterEndpoint.class,
+                new DefaultClientConfiguration(), new URI("http://localhost:" +
+                        getPort() + BlockingConfig.PATH));
+
+        if (!setTimeoutOnContainer) {
+            wsSession.getRemote().setAsyncSendTimeout(TIMEOUT_MS);
+        }
+
+        long lastSend = 0;
+        boolean isOK = true;
+        SendResult sr = null;
+
+        // Should send quickly until the network buffers fill up and then block
+        // until the timeout kicks in
+        while (isOK) {
+            Future<SendResult> f = wsSession.getRemote().sendBytesByFuture(
+                    ByteBuffer.wrap(MESSAGE_BINARY_4K));
+            lastSend = System.currentTimeMillis();
+            sr = f.get();
+            isOK = sr.isOK();
+        }
+
+        long timeout = System.currentTimeMillis() - lastSend;
+
+        // Check correct time passed
+        Assert.assertTrue(timeout >= TIMEOUT_MS);
+
+        // Check the timeout wasn't too long
+        Assert.assertTrue(timeout < TIMEOUT_MS*2);
+
+        if (sr == null) {
+            Assert.fail();
+        } else {
+            Assert.assertNotNull(sr.getException());
+        }
+    }
+
+
+    @Test
+    public void testTimeoutServerContainer() throws Exception {
+        doTestTimeoutServer(true);
+    }
+
+
+    @Test
+    public void testTimeoutServerEndpoint() throws Exception {
+        doTestTimeoutServer(false);
+    }
+
+
+    private static volatile boolean timoutOnContainer = false;
+
+    private void doTestTimeoutServer(boolean setTimeoutOnContainer)
+            throws Exception {
+
+        /*
+         * Note: There are all sorts of horrible uses of statics in this test
+         *       because the API uses classes and the tests really need access
+         *       to the instances which simply isn't possible.
+         */
+        timoutOnContainer = setTimeoutOnContainer;
+
+        Tomcat tomcat = getTomcatInstance();
+
+        if (getProtocol().equals(Http11Protocol.class.getName())) {
+            // This will never work for BIO
+            return;
+        }
+
+        // Must have a real docBase - just use temp
+        Context ctx =
+            tomcat.addContext("", System.getProperty("java.io.tmpdir"));
+        ctx.addApplicationListener(WsListener.class.getName());
+        ctx.addApplicationListener(ConstantTxConfig.class.getName());
+
+        WebSocketContainer wsContainer = ContainerProvider.getClientContainer();
+
+        // Reset client buffer size as client container is retained between
+        // tests
+        wsContainer.setMaxBinaryMessageBufferSize(8192);
+        wsContainer.setMaxTextMessageBufferSize(8192);
+
+        tomcat.start();
+
+        Session wsSession = wsContainer.connectToServer(TesterEndpoint.class,
+                new DefaultClientConfiguration(), new URI("http://localhost:" +
+                        getPort() + ConstantTxConfig.PATH));
+
+        wsSession.addMessageHandler(new BlockingBinaryHandler());
+
+        int loops = 0;
+        while (loops < 60) {
+            Thread.sleep(1000);
+            if (!ConstantTxEndpoint.getRunning()) {
+                break;
+            }
+        }
+
+        // Check nothing really bad happened
+        Assert.assertNull(ConstantTxEndpoint.getException());
+
+        System.out.println(ConstantTxEndpoint.getTimeout());
+        // Check correct time passed
+        Assert.assertTrue(ConstantTxEndpoint.getTimeout() >= TIMEOUT_MS);
+
+        // Check the timeout wasn't too long
+        Assert.assertTrue(ConstantTxEndpoint.getTimeout() < TIMEOUT_MS*2);
+
+        if (ConstantTxEndpoint.getSendResult() == null) {
+            Assert.fail();
+        } else {
+            Assert.assertNotNull(
+                    ConstantTxEndpoint.getSendResult().getException());
+        }
+
+    }
+
+
     private abstract static class TesterMessageHandler<T>
             implements MessageHandler.Basic<T> {
 
@@ -411,4 +575,151 @@ public class TestWsWebSocketContainer ex
             }
         }
     }
+
+
+    public static class BlockingConfig implements ServletContextListener {
+
+        public static final String PATH = "/block";
+
+        @Override
+        public void contextInitialized(ServletContextEvent sce) {
+            ServerContainerImpl sc = ServerContainerImpl.getServerContainer();
+            sc.publishServer(BlockingPojo.class, sce.getServletContext(), PATH);
+        }
+
+        @Override
+        public void contextDestroyed(ServletContextEvent sce) {
+            // NO-OP
+        }
+    }
+
+
+    public static class BlockingPojo {
+        @SuppressWarnings("unused")
+        @WebSocketMessage
+        public void echoTextMessage(Session session, String msg, boolean last) {
+            try {
+                Thread.sleep(60000);
+            } catch (InterruptedException e) {
+                // Ignore
+            }
+        }
+
+
+        @SuppressWarnings("unused")
+        @WebSocketMessage
+        public void echoBinaryMessage(Session session, ByteBuffer msg,
+                boolean last) {
+            try {
+                Thread.sleep(TIMEOUT_MS * 10);
+            } catch (InterruptedException e) {
+                // Ignore
+            }
+        }
+    }
+
+
+    public static class BlockingBinaryHandler
+            implements MessageHandler.Async<ByteBuffer> {
+
+        @Override
+        public void onMessage(ByteBuffer messagePart, boolean last) {
+            try {
+                Thread.sleep(TIMEOUT_MS * 10);
+            } catch (InterruptedException e) {
+                // Ignore
+            }
+        }
+    }
+
+
+    public static class ConstantTxEndpoint extends Endpoint {
+
+        // Have to be static to be able to retrieve results from test case
+        private static volatile long timeout = -1;
+        private static volatile boolean ok = true;
+        private static volatile SendResult sr = null;
+        private static volatile Exception exception = null;
+        private static volatile boolean running = true;
+
+
+        @Override
+        public void onOpen(Session session, EndpointConfiguration config) {
+
+            // Reset everything
+            timeout = -1;
+            ok = true;
+            sr = null;
+            exception = null;
+            running = true;
+
+            if (!TestWsWebSocketContainer.timoutOnContainer) {
+                session.getRemote().setAsyncSendTimeout(TIMEOUT_MS);
+            }
+
+            long lastSend = 0;
+
+            // Should send quickly until the network buffers fill up and then
+            // block until the timeout kicks in
+            try {
+                while (ok) {
+                    lastSend = System.currentTimeMillis();
+                    Future<SendResult> f = session.getRemote().sendBytesByFuture(
+                            ByteBuffer.wrap(MESSAGE_BINARY_4K));
+                    sr = f.get();
+                    ok = sr.isOK();
+                }
+            } catch (ExecutionException | InterruptedException e) {
+                exception = e;
+            }
+            timeout = System.currentTimeMillis() - lastSend;
+            running = false;
+        }
+
+        public static long getTimeout() {
+            return timeout;
+        }
+
+        public static boolean isOK() {
+            return ok;
+        }
+
+        public static SendResult getSendResult() {
+            return sr;
+        }
+
+        public static Exception getException() {
+            return exception;
+        }
+
+        public static boolean getRunning() {
+            return running;
+        }
+    }
+
+
+    public static class ConstantTxConfig implements ServletContextListener {
+
+        private static final String PATH = "/test";
+
+        @Override
+        public void contextInitialized(ServletContextEvent sce) {
+            ServerContainerImpl sc = ServerContainerImpl.getServerContainer();
+            sc.setServletContext(sce.getServletContext());
+            try {
+                sc.publishServer(ConstantTxEndpoint.class, PATH,
+                        DefaultServerConfiguration.class);
+                if (TestWsWebSocketContainer.timoutOnContainer) {
+                    sc.setAsyncSendTimeout(TIMEOUT_MS);
+                }
+            } catch (DeploymentException e) {
+                throw new IllegalStateException(e);
+            }
+        }
+
+        @Override
+        public void contextDestroyed(ServletContextEvent sce) {
+            // NO-OP
+        }
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org