You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2014/06/17 11:22:39 UTC
git commit: [CXF-5809] WebSocket transport supporting concurrent
asynchronous calls
Repository: cxf
Updated Branches:
refs/heads/master 99295330d -> 86a0bb61d
[CXF-5809] WebSocket transport supporting concurrent asynchronous calls
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/86a0bb61
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/86a0bb61
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/86a0bb61
Branch: refs/heads/master
Commit: 86a0bb61dc3442f28bb6b036bfa8f518c8576c7e
Parents: 9929533
Author: Akitoshi Yoshida <ay...@apache.org>
Authored: Tue Jun 17 11:12:02 2014 +0200
Committer: Akitoshi Yoshida <ay...@apache.org>
Committed: Tue Jun 17 11:21:52 2014 +0200
----------------------------------------------------------------------
.../websocket/ahc/AhcWebSocketConduit.java | 2 +-
.../websocket/jetty/JettyWebSocket.java | 40 +++---
.../websocket/jetty/JettyWebSocketManager.java | 14 +-
.../websocket/ClientServerWebSocketTest.java | 136 +++++++++++++++++++
4 files changed, 173 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/86a0bb61/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java
index 4275d20..612d986 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java
@@ -213,7 +213,7 @@ public class AhcWebSocketConduit extends URLConnectionHTTPConduit {
@Override
protected void handleResponseAsync() throws IOException {
- // TODO Auto-generated method stub
+ handleResponseOnWorkqueue(true, false);
}
@Override
http://git-wip-us.apache.org/repos/asf/cxf/blob/86a0bb61/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
index 0086095..e78f876 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
@@ -102,25 +102,31 @@ class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessa
invokeService(data, offset, length);
}
- private void invokeService(byte[] data, int offset, int length) {
- HttpServletRequest request = null;
- HttpServletResponse response = null;
- try {
- response = createServletResponse();
- request = createServletRequest(data, offset, length);
- if (manager != null) {
- String reqid = request.getHeader(requestIdKey);
- if (reqid != null) {
- response.setHeader(responseIdKey, reqid);
+ private void invokeService(final byte[] data, final int offset, final int length) {
+ // need to invoke the service asynchronously if the websocket's onMessage is synchronously blocked
+ manager.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ HttpServletRequest request = null;
+ HttpServletResponse response = null;
+ try {
+ response = createServletResponse();
+ request = createServletRequest(data, offset, length);
+ if (manager != null) {
+ String reqid = request.getHeader(requestIdKey);
+ if (reqid != null) {
+ response.setHeader(responseIdKey, reqid);
+ }
+ manager.service(request, response);
+ }
+ } catch (InvalidPathException ex) {
+ reportErrorStatus(response, 400);
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Failed to invoke service", e);
+ reportErrorStatus(response, 500);
}
- manager.service(request, response);
}
- } catch (InvalidPathException ex) {
- reportErrorStatus(response, 400);
- } catch (Exception e) {
- LOG.log(Level.WARNING, "Failed to invoke service", e);
- reportErrorStatus(response, 500);
- }
+ });
}
// may want to move this error reporting code to WebSocketServletHolder
http://git-wip-us.apache.org/repos/asf/cxf/blob/86a0bb61/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManager.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManager.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManager.java
index f6f8b7f..5e7f4f7 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManager.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManager.java
@@ -20,6 +20,7 @@
package org.apache.cxf.transport.websocket.jetty;
import java.io.IOException;
+import java.util.concurrent.Executor;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
@@ -30,6 +31,7 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.cxf.transport.http.AbstractHTTPDestination;
import org.apache.cxf.transport.websocket.WebSocketDestinationService;
+import org.apache.cxf.workqueue.OneShotAsyncExecutor;
import org.eclipse.jetty.websocket.WebSocketFactory;
import org.eclipse.jetty.websocket.WebSocketFactory.Acceptor;
@@ -40,12 +42,18 @@ public class JettyWebSocketManager {
private WebSocketFactory webSocketFactory;
private AbstractHTTPDestination destination;
private ServletContext servletContext;
+ private Executor executor;
public void init(AbstractHTTPDestination dest) {
this.destination = dest;
//TODO customize websocket factory configuration options when using the destination.
webSocketFactory = new WebSocketFactory((Acceptor)dest, 8192);
+
+ //FIXME get the bus's executor for async service invocation to decouple
+ // the service invocation from websocket's onMessage call which is synchronously
+ // blocked.
+ executor = OneShotAsyncExecutor.getInstance();
}
void setServletContext(ServletContext servletContext) {
@@ -73,9 +81,13 @@ public class JettyWebSocketManager {
return false;
}
- public void service(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
+ void service(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
if (destination != null) {
((WebSocketDestinationService)destination).invokeInternal(null, servletContext, request, response);
}
}
+
+ Executor getExecutor() {
+ return executor;
+ }
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/86a0bb61/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/websocket/ClientServerWebSocketTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/websocket/ClientServerWebSocketTest.java b/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/websocket/ClientServerWebSocketTest.java
index 336fbe9..e1114bb 100644
--- a/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/websocket/ClientServerWebSocketTest.java
+++ b/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/websocket/ClientServerWebSocketTest.java
@@ -22,12 +22,15 @@ package org.apache.cxf.systest.jaxws.websocket;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.URL;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Logger;
import javax.xml.namespace.QName;
+import javax.xml.ws.AsyncHandler;
import javax.xml.ws.BindingProvider;
+import javax.xml.ws.Response;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.configuration.security.AuthorizationPolicy;
@@ -40,6 +43,7 @@ import org.apache.hello_world_soap_http.BadRecordLitFault;
import org.apache.hello_world_soap_http.Greeter;
import org.apache.hello_world_soap_http.NoSuchCodeLitFault;
import org.apache.hello_world_soap_http.SOAPService;
+import org.apache.hello_world_soap_http.types.GreetMeLaterResponse;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -249,6 +253,138 @@ public class ClientServerWebSocketTest extends AbstractBusClientServerTestBase {
}
}
+ @Test
+ public void testAsyncPollingCall() throws Exception {
+ URL wsdl = getClass().getResource("/wsdl/hello_world.wsdl");
+ assertNotNull(wsdl);
+
+ SOAPService service = new SOAPService(wsdl, serviceName);
+ Greeter greeter = service.getPort(portName, Greeter.class);
+ updateGreeterAddress(greeter, PORT);
+
+ long before = System.currentTimeMillis();
+
+ long delay = 3000;
+ Response<GreetMeLaterResponse> r1 = greeter.greetMeLaterAsync(delay);
+ Response<GreetMeLaterResponse> r2 = greeter.greetMeLaterAsync(delay);
+
+ long after = System.currentTimeMillis();
+
+ assertTrue("Duration of calls exceeded " + (2 * delay) + " ms", after - before < (2 * delay));
+
+ // first time round, responses should not be available yet
+ assertFalse("Response already available.", r1.isDone());
+ assertFalse("Response already available.", r2.isDone());
+
+ // after three seconds responses should be available
+ long waited = 0;
+ while (waited < (delay + 1000)) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ex) {
+ // ignore
+ }
+ if (r1.isDone() && r2.isDone()) {
+ break;
+ }
+ waited += 500;
+ }
+ assertTrue("Response is not available.", r1.isDone());
+ assertTrue("Response is not available.", r2.isDone());
+ }
+
+ @Test
+ public void testAsyncSynchronousPolling() throws Exception {
+ URL wsdl = getClass().getResource("/wsdl/hello_world.wsdl");
+ assertNotNull(wsdl);
+
+ SOAPService service = new SOAPService(wsdl, serviceName);
+ assertNotNull(service);
+
+ final String expectedString = new String("Hello, finally!");
+
+ class Poller extends Thread {
+ Response<GreetMeLaterResponse> response;
+ int tid;
+
+ Poller(Response<GreetMeLaterResponse> r, int t) {
+ response = r;
+ tid = t;
+ }
+ public void run() {
+ if (tid % 2 > 0) {
+ while (!response.isDone()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ex) {
+ // ignore
+ }
+ }
+ }
+ GreetMeLaterResponse reply = null;
+ try {
+ reply = response.get();
+ } catch (Exception ex) {
+ fail("Poller " + tid + " failed with " + ex);
+ }
+ assertNotNull("Poller " + tid + ": no response received from service", reply);
+ String s = reply.getResponseType();
+ assertEquals(expectedString, s);
+ }
+ }
+
+ Greeter greeter = service.getPort(portName, Greeter.class);
+ updateGreeterAddress(greeter, PORT);
+ long before = System.currentTimeMillis();
+
+
+ long delay = 3000;
+
+ Response<GreetMeLaterResponse> response = greeter.greetMeLaterAsync(delay);
+ long after = System.currentTimeMillis();
+
+ assertTrue("Duration of calls exceeded " + delay + " ms", after - before < delay);
+
+ // first time round, responses should not be available yet
+ assertFalse("Response already available.", response.isDone());
+
+
+ Poller[] pollers = new Poller[4];
+ for (int i = 0; i < pollers.length; i++) {
+ pollers[i] = new Poller(response, i);
+ }
+ for (Poller p : pollers) {
+ p.start();
+ }
+
+ for (Poller p : pollers) {
+ p.join();
+ }
+
+
+ }
+
+ static class MyHandler implements AsyncHandler<GreetMeLaterResponse> {
+ static int invocationCount;
+ private String replyBuffer;
+
+ public void handleResponse(Response<GreetMeLaterResponse> response) {
+ invocationCount++;
+ try {
+ GreetMeLaterResponse reply = response.get();
+ replyBuffer = reply.getResponseType();
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
+ } catch (ExecutionException ex) {
+ ex.printStackTrace();
+ }
+ }
+
+ String getReplyBuffer() {
+ return replyBuffer;
+ }
+ }
+
private void updateGreeterAddress(Greeter greeter, String port) {
((BindingProvider)greeter).getRequestContext()
.put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY,