You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2013/01/30 20:51:48 UTC
svn commit: r1440622 - in /tomcat/trunk: java/javax/websocket/
java/org/apache/catalina/loader/ java/org/apache/tomcat/websocket/
java/org/apache/tomcat/websocket/server/ test/org/apache/tomcat/websocket/
Author: markt
Date: Wed Jan 30 19:51:47 2013
New Revision: 1440622
URL: http://svn.apache.org/viewvc?rev=1440622&view=rev
Log:
Implement client and server async timeout
Note this doesn't work for BIO as it always blocks and only pretends to do async writes.
Added:
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsListener.java (with props)
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsTimeout.java (with props)
Modified:
tomcat/trunk/java/javax/websocket/RemoteEndpoint.java
tomcat/trunk/java/javax/websocket/WebSocketContainer.java
tomcat/trunk/java/org/apache/catalina/loader/WebappClassLoader.java
tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java
tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointClient.java
tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java
tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java
tomcat/trunk/java/org/apache/tomcat/websocket/server/ServerContainerImpl.java
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsProtocolHandler.java
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointServer.java
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsSci.java
tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java
Modified: tomcat/trunk/java/javax/websocket/RemoteEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/javax/websocket/RemoteEndpoint.java?rev=1440622&r1=1440621&r2=1440622&view=diff
==============================================================================
--- tomcat/trunk/java/javax/websocket/RemoteEndpoint.java (original)
+++ tomcat/trunk/java/javax/websocket/RemoteEndpoint.java Wed Jan 30 19:51:47 2013
@@ -28,7 +28,20 @@ public interface RemoteEndpoint {
void setBatchingAllowed(boolean batchingAllowed);
boolean getBatchingAllowed();
void flushBatch();
+
+ /**
+ * Obtain the timeout (in milliseconds) for sending a message
+ * asynchronously. A non-positive value means an infinite timeout. The
+ * default value is determined by
+ * {@link WebSocketContainer#getDefaultAsyncSendTimeout()}.
+ */
long getAsyncSendTimeout();
+
+ /**
+ * Set the timeout (in milliseconds) for sending a message asynchronously. A
+ * non-positive value means an infinite timeout. The default value is
+ * determined by {@link WebSocketContainer#getDefaultAsyncSendTimeout()}.
+ */
void setAsyncSendTimeout(long timeout);
/**
Modified: tomcat/trunk/java/javax/websocket/WebSocketContainer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/javax/websocket/WebSocketContainer.java?rev=1440622&r1=1440621&r2=1440622&view=diff
==============================================================================
--- tomcat/trunk/java/javax/websocket/WebSocketContainer.java (original)
+++ tomcat/trunk/java/javax/websocket/WebSocketContainer.java Wed Jan 30 19:51:47 2013
@@ -21,8 +21,16 @@ import java.util.Set;
public interface WebSocketContainer {
+ /**
+ * Obtain the default timeout (in milliseconds) for sending a message
+ * asynchronously. A non-positive value means an infinite timeout.
+ */
long getDefaultAsyncSendTimeout();
+ /**
+ * Set the default timeout (in milliseconds) for sending a message
+ * asynchronously. A non-positive value means an infinite timeout.
+ */
void setAsyncSendTimeout(long timeout);
Session connectToServer(Class<?> annotatedEndpointClass, URI path)
Modified: tomcat/trunk/java/org/apache/catalina/loader/WebappClassLoader.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/loader/WebappClassLoader.java?rev=1440622&r1=1440621&r2=1440622&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/loader/WebappClassLoader.java (original)
+++ tomcat/trunk/java/org/apache/catalina/loader/WebappClassLoader.java Wed Jan 30 19:51:47 2013
@@ -2233,18 +2233,19 @@ public class WebappClassLoader
Object[] table = (Object[]) internalTableField.get(map);
if (table != null) {
for (int j =0; j < table.length; j++) {
- if (table[j] != null) {
+ Object obj = table[j];
+ if (obj != null) {
boolean potentialLeak = false;
// Check the key
- Object key = ((Reference<?>) table[j]).get();
+ Object key = ((Reference<?>) obj).get();
if (this.equals(key) || loadedByThisOrChild(key)) {
potentialLeak = true;
}
// Check the value
Field valueField =
- table[j].getClass().getDeclaredField("value");
+ obj.getClass().getDeclaredField("value");
valueField.setAccessible(true);
- Object value = valueField.get(table[j]);
+ Object value = valueField.get(obj);
if (this.equals(value) || loadedByThisOrChild(value)) {
potentialLeak = true;
}
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java?rev=1440622&r1=1440621&r2=1440622&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java Wed Jan 30 19:51:47 2013
@@ -49,41 +49,42 @@ public abstract class WsRemoteEndpointBa
private final AtomicBoolean toBytesInProgress = new AtomicBoolean(false);
private final CharsetEncoder encoder = Charset.forName("UTF8").newEncoder();
private final MessageSendStateMachine state = new MessageSendStateMachine();
+
+ private volatile long asyncSendTimeout = -1;
+
// Max length for WebSocket frame header is 14 bytes
protected final ByteBuffer header = ByteBuffer.allocate(14);
protected ByteBuffer payload = null;
@Override
- public void setBatchingAllowed(boolean batchingAllowed) {
- // TODO Auto-generated method stub
-
+ public long getAsyncSendTimeout() {
+ return asyncSendTimeout;
}
@Override
- public boolean getBatchingAllowed() {
- // TODO Auto-generated method stub
- return false;
+ public void setAsyncSendTimeout(long timeout) {
+ this.asyncSendTimeout = timeout;
}
@Override
- public void flushBatch() {
+ public void setBatchingAllowed(boolean batchingAllowed) {
// TODO Auto-generated method stub
}
@Override
- public long getAsyncSendTimeout() {
+ public boolean getBatchingAllowed() {
// TODO Auto-generated method stub
- return 0;
+ return false;
}
@Override
- public void setAsyncSendTimeout(long timeout) {
+ public void flushBatch() {
// TODO Auto-generated method stub
}
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointClient.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointClient.java?rev=1440622&r1=1440621&r2=1440622&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointClient.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointClient.java Wed Jan 30 19:51:47 2013
@@ -38,8 +38,13 @@ public class WsRemoteEndpointClient exte
@Override
protected void sendMessage(WsCompletionHandler handler) {
- channel.write(new ByteBuffer[] {header, payload}, 0, 2, Long.MAX_VALUE,
- TimeUnit.DAYS, null, handler);
+ long timeout = getAsyncSendTimeout();
+ if (timeout < 1) {
+ timeout = Long.MAX_VALUE;
+
+ }
+ channel.write(new ByteBuffer[] {header, payload}, 0, 2,
+ getAsyncSendTimeout(), TimeUnit.MILLISECONDS, null, handler);
}
@Override
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java?rev=1440622&r1=1440621&r2=1440622&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java Wed Jan 30 19:51:47 2013
@@ -76,6 +76,8 @@ public class WsSession implements Sessio
this.wsRemoteEndpoint = wsRemoteEndpoint;
this.webSocketContainer = webSocketContainer;
applicationClassLoader = Thread.currentThread().getContextClassLoader();
+ wsRemoteEndpoint.setAsyncSendTimeout(
+ webSocketContainer.getDefaultAsyncSendTimeout());
}
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java?rev=1440622&r1=1440621&r2=1440622&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java Wed Jan 30 19:51:47 2013
@@ -54,6 +54,7 @@ public class WsWebSocketContainer implem
private static final byte[] crlf = new byte[] {13, 10};
private static final int defaultBufferSize = 8 * 1024;
+ private long defaultAsyncTimeout = -1;
private int binaryBufferSize = defaultBufferSize;
private int textBufferSize = defaultBufferSize;
@@ -399,16 +400,25 @@ public class WsWebSocketContainer implem
}
+ /**
+ * {@inheritDoc}
+ *
+ * The default value for this implementation is -1.
+ */
@Override
public long getDefaultAsyncSendTimeout() {
- // TODO Auto-generated method stub
- return 0;
+ return defaultAsyncTimeout;
}
+ /**
+ * {@inheritDoc}
+ *
+ * The default value for this implementation is -1.
+ */
@Override
public void setAsyncSendTimeout(long timeout) {
- // TODO Auto-generated method stub
+ this.defaultAsyncTimeout = timeout;
}
private static class WsHandshakeResponse implements HandshakeResponse {
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/ServerContainerImpl.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/ServerContainerImpl.java?rev=1440622&r1=1440621&r2=1440622&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/server/ServerContainerImpl.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/server/ServerContainerImpl.java Wed Jan 30 19:51:47 2013
@@ -70,6 +70,8 @@ public class ServerContainerImpl extends
return result;
}
+ private final WsTimeout wsTimeout;
+ private final Thread timeoutThread;
private volatile ServletContext servletContext = null;
private Map<String,ServerEndpointConfiguration> configMap =
@@ -80,11 +82,18 @@ public class ServerContainerImpl extends
private ServerContainerImpl() {
- // Hide default constructor
+ wsTimeout = new WsTimeout();
+ timeoutThread = new Thread(wsTimeout);
+ timeoutThread.setName(WsTimeout.THREAD_NAME_PREFIX + this);
+ timeoutThread.start();
}
public void setServletContext(ServletContext servletContext) {
+ if (this.servletContext == servletContext) {
+ return;
+ }
+
this.servletContext = servletContext;
// Configure servlet context wide defaults
@@ -99,6 +108,10 @@ public class ServerContainerImpl extends
if (value != null) {
setMaxTextMessageBufferSize(Long.parseLong(value));
}
+
+ // Update the timeout thread name
+ timeoutThread.setName(
+ WsTimeout.THREAD_NAME_PREFIX + servletContext.getContextPath());
}
@@ -212,6 +225,25 @@ public class ServerContainerImpl extends
}
+ protected WsTimeout getTimeout() {
+ return wsTimeout;
+ }
+
+
+ protected void stop() {
+ wsTimeout.stop();
+ int count = 0;
+ while (count < 50 && timeoutThread.isAlive()) {
+ count ++;
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ }
+
+
/**
* Converts a path defined for a WebSocket endpoint into a path that can be
* used as a servlet mapping.
Added: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsListener.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsListener.java?rev=1440622&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsListener.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsListener.java Wed Jan 30 19:51:47 2013
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.websocket.server;
+
+import javax.servlet.ServletContextEvent;
+import javax.servlet.ServletContextListener;
+
+/**
+ * This will be added automatically to a {@link javax.servlet.ServletContext} by
+ * the {@link WsSci}. If the {@link WsSci} is disabled, this listener must be
+ * added manually to every {@link javax.servlet.ServletContext} that uses
+ * WebSocket.
+ */
+public class WsListener implements ServletContextListener {
+
+ @Override
+ public void contextInitialized(ServletContextEvent sce) {
+ ServerContainerImpl sc = ServerContainerImpl.getServerContainer();
+ sc.setServletContext(sce.getServletContext());
+ }
+
+ @Override
+ public void contextDestroyed(ServletContextEvent sce) {
+ ServerContainerImpl sc = ServerContainerImpl.getServerContainer();
+ sc.stop();
+ }
+}
Propchange: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsProtocolHandler.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsProtocolHandler.java?rev=1440622&r1=1440621&r2=1440622&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsProtocolHandler.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsProtocolHandler.java Wed Jan 30 19:51:47 2013
@@ -29,7 +29,6 @@ import javax.websocket.CloseReason;
import javax.websocket.CloseReason.CloseCodes;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfiguration;
-import javax.websocket.WebSocketContainer;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
@@ -50,14 +49,14 @@ public class WsProtocolHandler implement
private final Endpoint ep;
private final EndpointConfiguration endpointConfig;
private final ClassLoader applicationClassLoader;
- private final WebSocketContainer webSocketContainer;
+ private final ServerContainerImpl webSocketContainer;
private WsSession wsSession;
public WsProtocolHandler(Endpoint ep,
EndpointConfiguration endpointConfig,
- WebSocketContainer wsc) {
+ ServerContainerImpl wsc) {
this.ep = ep;
this.endpointConfig = endpointConfig;
this.webSocketContainer = wsc;
@@ -84,7 +83,7 @@ public class WsProtocolHandler implement
t.setContextClassLoader(applicationClassLoader);
try {
WsRemoteEndpointServer wsRemoteEndpointServer =
- new WsRemoteEndpointServer(sos);
+ new WsRemoteEndpointServer(sos, webSocketContainer);
wsSession = new WsSession(
ep, wsRemoteEndpointServer, webSocketContainer);
WsFrameServer wsFrame = new WsFrameServer(
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointServer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointServer.java?rev=1440622&r1=1440621&r2=1440622&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointServer.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointServer.java Wed Jan 30 19:51:47 2013
@@ -17,6 +17,7 @@
package org.apache.tomcat.websocket.server;
import java.io.IOException;
+import java.net.SocketTimeoutException;
import javax.servlet.ServletOutputStream;
@@ -38,15 +39,19 @@ public class WsRemoteEndpointServer exte
LogFactory.getLog(WsProtocolHandler.class);
private final ServletOutputStream sos;
+ private final WsTimeout wsTimeout;
private volatile WsCompletionHandler handler = null;
+ private volatile long timeoutExpiry = -1;
private volatile boolean close;
private volatile Long size = null;
private volatile boolean headerWritten = false;
private volatile boolean payloadWritten = false;
- public WsRemoteEndpointServer(ServletOutputStream sos) {
+ public WsRemoteEndpointServer(ServletOutputStream sos,
+ ServerContainerImpl serverContainer) {
this.sos = sos;
+ this.wsTimeout = serverContainer.getTimeout();
}
@@ -79,19 +84,29 @@ public class WsRemoteEndpointServer exte
sos.write(payload.array(), payload.arrayOffset(),
payload.limit());
} else {
+ wsTimeout.unregister(this);
if (close) {
- sos.close();
+ close();
}
handler.completed(size, null);
- size = null;
- handler = null;
- headerWritten = false;
- payloadWritten = false;
+ nextWrite();
break;
}
}
} catch (IOException ioe) {
+ wsTimeout.unregister(this);
+ close();
handler.failed(ioe, null);
+ nextWrite();
+ }
+ if (handler != null) {
+ // Async write is in progress
+
+ timeoutExpiry = getAsyncSendTimeout() + System.currentTimeMillis();
+ if (timeoutExpiry > 0) {
+ // Register with timeout thread
+ wsTimeout.register(this);
+ }
}
}
@@ -105,5 +120,26 @@ public class WsRemoteEndpointServer exte
log.info(sm.getString("wsRemoteEndpointServer.closeFailed"), e);
}
}
+ wsTimeout.unregister(this);
+ }
+
+
+ protected long getTimeoutExpiry() {
+ return timeoutExpiry;
+ }
+
+
+ protected void onTimeout() {
+ close();
+ handler.failed(new SocketTimeoutException(), null);
+ nextWrite();
+ }
+
+
+ private void nextWrite() {
+ handler = null;
+ size = null;
+ headerWritten = false;
+ payloadWritten = false;
}
}
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsSci.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsSci.java?rev=1440622&r1=1440621&r2=1440622&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsSci.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsSci.java Wed Jan 30 19:51:47 2013
@@ -35,12 +35,14 @@ public class WsSci implements ServletCon
@Override
public void onStartup(Set<Class<?>> clazzes, ServletContext ctx)
throws ServletException {
- // Need to configure the ServletContext in all cases
- ServerContainerImpl sc = ServerContainerImpl.getServerContainer();
- sc.setServletContext(ctx);
+
+ ctx.addListener(WsListener.class);
+
if (clazzes == null || clazzes.size() == 0) {
return;
}
+
+ ServerContainerImpl sc = ServerContainerImpl.getServerContainer();
for (Class<?> clazz : clazzes) {
WebSocketEndpoint annotation =
clazz.getAnnotation(WebSocketEndpoint.class);
Added: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsTimeout.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsTimeout.java?rev=1440622&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsTimeout.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsTimeout.java Wed Jan 30 19:51:47 2013
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.websocket.server;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+/**
+ * Provides timeouts for asynchronous web socket writes. On the server side we
+ * only have access to {@link javax.servlet.ServletOutputStream} and
+ * {@link javax.servlet.ServletInputStream} so there is no way to set a timeout
+ * for writes to the client. Hence the separate thread.
+ */
+public class WsTimeout implements Runnable {
+
+ public static final String THREAD_NAME_PREFIX = "Websocket Timeout - ";
+
+ private final Set<WsRemoteEndpointServer> endpoints =
+ new ConcurrentSkipListSet<>(new EndpointComparator());
+ private volatile boolean running = true;
+
+ public void stop() {
+ running = false;
+ synchronized (this) {
+ this.notify();
+ }
+ }
+
+
+ @Override
+ public void run() {
+ while (running) {
+ // Wait for one second - no need for timeouts more frequently than
+ // that
+ synchronized (this) {
+ try {
+ wait(1000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+
+ long now = System.currentTimeMillis();
+ Iterator<WsRemoteEndpointServer> iter = endpoints.iterator();
+ while (iter.hasNext()) {
+ WsRemoteEndpointServer endpoint = iter.next();
+ if (endpoint.getTimeoutExpiry() < now) {
+ System.out.println(now);
+ endpoint.onTimeout();
+ } else {
+ // Endpoints are ordered by timeout expiry so we reach this
+ // point there is no need to check the remaining endpoints
+ break;
+ }
+ }
+ }
+ }
+
+
+ public void register(WsRemoteEndpointServer endpoint) {
+ endpoints.add(endpoint);
+ }
+
+
+ public void unregister(WsRemoteEndpointServer endpoint) {
+ endpoints.remove(endpoint);
+ }
+
+
+ /**
+ * Note: this comparator imposes orderings that are inconsistent with equals
+ */
+ private static class EndpointComparator implements
+ Comparator<WsRemoteEndpointServer> {
+
+ @Override
+ public int compare(WsRemoteEndpointServer o1,
+ WsRemoteEndpointServer o2) {
+
+ long t1 = o1.getTimeoutExpiry();
+ long t2 = o2.getTimeoutExpiry();
+
+ if (t1 < t2) {
+ return -1;
+ } else if (t1 == t2) {
+ return 0;
+ } else {
+ return 1;
+ }
+ }
+ }
+}
Propchange: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsTimeout.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java?rev=1440622&r1=1440621&r2=1440622&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java (original)
+++ tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java Wed Jan 30 19:51:47 2013
@@ -22,6 +22,8 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletContextEvent;
@@ -29,12 +31,15 @@ import javax.servlet.ServletContextListe
import javax.websocket.CloseReason;
import javax.websocket.ContainerProvider;
import javax.websocket.DefaultClientConfiguration;
+import javax.websocket.DeploymentException;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfiguration;
import javax.websocket.MessageHandler;
+import javax.websocket.SendResult;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import javax.websocket.WebSocketMessage;
+import javax.websocket.server.DefaultServerConfiguration;
import org.junit.Assert;
import org.junit.Test;
@@ -42,7 +47,9 @@ import org.junit.Test;
import org.apache.catalina.Context;
import org.apache.catalina.startup.Tomcat;
import org.apache.catalina.startup.TomcatBaseTest;
+import org.apache.coyote.http11.Http11Protocol;
import org.apache.tomcat.websocket.server.ServerContainerImpl;
+import org.apache.tomcat.websocket.server.WsListener;
public class TestWsWebSocketContainer extends TomcatBaseTest {
@@ -50,6 +57,8 @@ public class TestWsWebSocketContainer ex
private static final String MESSAGE_TEXT_4K;
private static final byte[] MESSAGE_BINARY_4K = new byte[4096];
+ private static final long TIMEOUT_MS = 5 * 1000;
+
static {
StringBuilder sb = new StringBuilder(4096);
for (int i = 0; i < 4096; i++) {
@@ -246,6 +255,161 @@ public class TestWsWebSocketContainer ex
}
}
+
+ @Test
+ public void testTimeoutClientContainer() throws Exception {
+ doTestTimeoutClient(true);
+ }
+
+
+ @Test
+ public void testTimeoutClientEndpoint() throws Exception {
+ doTestTimeoutClient(false);
+ }
+
+
+ private void doTestTimeoutClient(boolean setTimeoutOnContainer)
+ throws Exception {
+
+ Tomcat tomcat = getTomcatInstance();
+ // Must have a real docBase - just use temp
+ Context ctx =
+ tomcat.addContext("", System.getProperty("java.io.tmpdir"));
+ ctx.addApplicationListener(BlockingConfig.class.getName());
+
+ WebSocketContainer wsContainer = ContainerProvider.getClientContainer();
+
+ // Reset client buffer size as client container is retained between
+ // tests
+ wsContainer.setMaxBinaryMessageBufferSize(8192);
+ wsContainer.setMaxTextMessageBufferSize(8192);
+
+
+ // Set the async timeout
+ if (setTimeoutOnContainer) {
+ wsContainer.setAsyncSendTimeout(TIMEOUT_MS);
+ }
+
+ tomcat.start();
+
+ Session wsSession = wsContainer.connectToServer(TesterEndpoint.class,
+ new DefaultClientConfiguration(), new URI("http://localhost:" +
+ getPort() + BlockingConfig.PATH));
+
+ if (!setTimeoutOnContainer) {
+ wsSession.getRemote().setAsyncSendTimeout(TIMEOUT_MS);
+ }
+
+ long lastSend = 0;
+ boolean isOK = true;
+ SendResult sr = null;
+
+ // Should send quickly until the network buffers fill up and then block
+ // until the timeout kicks in
+ while (isOK) {
+ Future<SendResult> f = wsSession.getRemote().sendBytesByFuture(
+ ByteBuffer.wrap(MESSAGE_BINARY_4K));
+ lastSend = System.currentTimeMillis();
+ sr = f.get();
+ isOK = sr.isOK();
+ }
+
+ long timeout = System.currentTimeMillis() - lastSend;
+
+ // Check correct time passed
+ Assert.assertTrue(timeout >= TIMEOUT_MS);
+
+ // Check the timeout wasn't too long
+ Assert.assertTrue(timeout < TIMEOUT_MS*2);
+
+ if (sr == null) {
+ Assert.fail();
+ } else {
+ Assert.assertNotNull(sr.getException());
+ }
+ }
+
+
+ @Test
+ public void testTimeoutServerContainer() throws Exception {
+ doTestTimeoutServer(true);
+ }
+
+
+ @Test
+ public void testTimeoutServerEndpoint() throws Exception {
+ doTestTimeoutServer(false);
+ }
+
+
+ private static volatile boolean timoutOnContainer = false;
+
+ private void doTestTimeoutServer(boolean setTimeoutOnContainer)
+ throws Exception {
+
+ /*
+ * Note: There are all sorts of horrible uses of statics in this test
+ * because the API uses classes and the tests really need access
+ * to the instances which simply isn't possible.
+ */
+ timoutOnContainer = setTimeoutOnContainer;
+
+ Tomcat tomcat = getTomcatInstance();
+
+ if (getProtocol().equals(Http11Protocol.class.getName())) {
+ // This will never work for BIO
+ return;
+ }
+
+ // Must have a real docBase - just use temp
+ Context ctx =
+ tomcat.addContext("", System.getProperty("java.io.tmpdir"));
+ ctx.addApplicationListener(WsListener.class.getName());
+ ctx.addApplicationListener(ConstantTxConfig.class.getName());
+
+ WebSocketContainer wsContainer = ContainerProvider.getClientContainer();
+
+ // Reset client buffer size as client container is retained between
+ // tests
+ wsContainer.setMaxBinaryMessageBufferSize(8192);
+ wsContainer.setMaxTextMessageBufferSize(8192);
+
+ tomcat.start();
+
+ Session wsSession = wsContainer.connectToServer(TesterEndpoint.class,
+ new DefaultClientConfiguration(), new URI("http://localhost:" +
+ getPort() + ConstantTxConfig.PATH));
+
+ wsSession.addMessageHandler(new BlockingBinaryHandler());
+
+ int loops = 0;
+ while (loops < 60) {
+ Thread.sleep(1000);
+ if (!ConstantTxEndpoint.getRunning()) {
+ break;
+ }
+ }
+
+ // Check nothing really bad happened
+ Assert.assertNull(ConstantTxEndpoint.getException());
+
+ System.out.println(ConstantTxEndpoint.getTimeout());
+ // Check correct time passed
+ Assert.assertTrue(ConstantTxEndpoint.getTimeout() >= TIMEOUT_MS);
+
+ // Check the timeout wasn't too long
+ Assert.assertTrue(ConstantTxEndpoint.getTimeout() < TIMEOUT_MS*2);
+
+ if (ConstantTxEndpoint.getSendResult() == null) {
+ Assert.fail();
+ } else {
+ Assert.assertNotNull(
+ ConstantTxEndpoint.getSendResult().getException());
+ }
+
+ }
+
+
private abstract static class TesterMessageHandler<T>
implements MessageHandler.Basic<T> {
@@ -411,4 +575,151 @@ public class TestWsWebSocketContainer ex
}
}
}
+
+
+ public static class BlockingConfig implements ServletContextListener {
+
+ public static final String PATH = "/block";
+
+ @Override
+ public void contextInitialized(ServletContextEvent sce) {
+ ServerContainerImpl sc = ServerContainerImpl.getServerContainer();
+ sc.publishServer(BlockingPojo.class, sce.getServletContext(), PATH);
+ }
+
+ @Override
+ public void contextDestroyed(ServletContextEvent sce) {
+ // NO-OP
+ }
+ }
+
+
+ public static class BlockingPojo {
+ @SuppressWarnings("unused")
+ @WebSocketMessage
+ public void echoTextMessage(Session session, String msg, boolean last) {
+ try {
+ Thread.sleep(60000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+
+
+ @SuppressWarnings("unused")
+ @WebSocketMessage
+ public void echoBinaryMessage(Session session, ByteBuffer msg,
+ boolean last) {
+ try {
+ Thread.sleep(TIMEOUT_MS * 10);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ }
+
+
+ public static class BlockingBinaryHandler
+ implements MessageHandler.Async<ByteBuffer> {
+
+ @Override
+ public void onMessage(ByteBuffer messagePart, boolean last) {
+ try {
+ Thread.sleep(TIMEOUT_MS * 10);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ }
+
+
+ public static class ConstantTxEndpoint extends Endpoint {
+
+ // Have to be static to be able to retrieve results from test case
+ private static volatile long timeout = -1;
+ private static volatile boolean ok = true;
+ private static volatile SendResult sr = null;
+ private static volatile Exception exception = null;
+ private static volatile boolean running = true;
+
+
+ @Override
+ public void onOpen(Session session, EndpointConfiguration config) {
+
+ // Reset everything
+ timeout = -1;
+ ok = true;
+ sr = null;
+ exception = null;
+ running = true;
+
+ if (!TestWsWebSocketContainer.timoutOnContainer) {
+ session.getRemote().setAsyncSendTimeout(TIMEOUT_MS);
+ }
+
+ long lastSend = 0;
+
+ // Should send quickly until the network buffers fill up and then
+ // block until the timeout kicks in
+ try {
+ while (ok) {
+ lastSend = System.currentTimeMillis();
+ Future<SendResult> f = session.getRemote().sendBytesByFuture(
+ ByteBuffer.wrap(MESSAGE_BINARY_4K));
+ sr = f.get();
+ ok = sr.isOK();
+ }
+ } catch (ExecutionException | InterruptedException e) {
+ exception = e;
+ }
+ timeout = System.currentTimeMillis() - lastSend;
+ running = false;
+ }
+
+ public static long getTimeout() {
+ return timeout;
+ }
+
+ public static boolean isOK() {
+ return ok;
+ }
+
+ public static SendResult getSendResult() {
+ return sr;
+ }
+
+ public static Exception getException() {
+ return exception;
+ }
+
+ public static boolean getRunning() {
+ return running;
+ }
+ }
+
+
+ public static class ConstantTxConfig implements ServletContextListener {
+
+ private static final String PATH = "/test";
+
+ @Override
+ public void contextInitialized(ServletContextEvent sce) {
+ ServerContainerImpl sc = ServerContainerImpl.getServerContainer();
+ sc.setServletContext(sce.getServletContext());
+ try {
+ sc.publishServer(ConstantTxEndpoint.class, PATH,
+ DefaultServerConfiguration.class);
+ if (TestWsWebSocketContainer.timoutOnContainer) {
+ sc.setAsyncSendTimeout(TIMEOUT_MS);
+ }
+ } catch (DeploymentException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public void contextDestroyed(ServletContextEvent sce) {
+ // NO-OP
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org