You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2014/06/17 11:22:39 UTC

git commit: [CXF-5809] WebSocket transport supporting concurrent asynchronous calls

Repository: cxf
Updated Branches:
  refs/heads/master 99295330d -> 86a0bb61d


[CXF-5809] WebSocket transport supporting concurrent asynchronous calls


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/86a0bb61
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/86a0bb61
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/86a0bb61

Branch: refs/heads/master
Commit: 86a0bb61dc3442f28bb6b036bfa8f518c8576c7e
Parents: 9929533
Author: Akitoshi Yoshida <ay...@apache.org>
Authored: Tue Jun 17 11:12:02 2014 +0200
Committer: Akitoshi Yoshida <ay...@apache.org>
Committed: Tue Jun 17 11:21:52 2014 +0200

----------------------------------------------------------------------
 .../websocket/ahc/AhcWebSocketConduit.java      |   2 +-
 .../websocket/jetty/JettyWebSocket.java         |  40 +++---
 .../websocket/jetty/JettyWebSocketManager.java  |  14 +-
 .../websocket/ClientServerWebSocketTest.java    | 136 +++++++++++++++++++
 4 files changed, 173 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/86a0bb61/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java
index 4275d20..612d986 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java
@@ -213,7 +213,7 @@ public class AhcWebSocketConduit extends URLConnectionHTTPConduit {
 
         @Override
         protected void handleResponseAsync() throws IOException {
-            // TODO Auto-generated method stub
+            handleResponseOnWorkqueue(true, false);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cxf/blob/86a0bb61/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
index 0086095..e78f876 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
@@ -102,25 +102,31 @@ class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessa
         invokeService(data, offset, length);
     }
     
-    private void invokeService(byte[] data, int offset, int length) {
-        HttpServletRequest request = null;
-        HttpServletResponse response = null;
-        try {
-            response = createServletResponse();
-            request = createServletRequest(data, offset, length);
-            if (manager != null) {
-                String reqid = request.getHeader(requestIdKey);
-                if (reqid != null) {
-                    response.setHeader(responseIdKey, reqid);
+    private void invokeService(final byte[] data, final int offset, final int length) {
+        // need to invoke the service asynchronously if the websocket's onMessage is synchronously blocked
+        manager.getExecutor().execute(new Runnable() {
+            @Override
+            public void run() {
+                HttpServletRequest request = null;
+                HttpServletResponse response = null;
+                try {
+                    response = createServletResponse();
+                    request = createServletRequest(data, offset, length);
+                    if (manager != null) {
+                        String reqid = request.getHeader(requestIdKey);
+                        if (reqid != null) {
+                            response.setHeader(responseIdKey, reqid);
+                        }
+                        manager.service(request, response);
+                    }
+                } catch (InvalidPathException ex) { 
+                    reportErrorStatus(response, 400);
+                } catch (Exception e) {
+                    LOG.log(Level.WARNING, "Failed to invoke service", e);
+                    reportErrorStatus(response, 500);
                 }
-                manager.service(request, response);
             }
-        } catch (InvalidPathException ex) { 
-            reportErrorStatus(response, 400);
-        } catch (Exception e) {
-            LOG.log(Level.WARNING, "Failed to invoke service", e);
-            reportErrorStatus(response, 500);
-        }
+        });
     }
     
     // may want to move this error reporting code to WebSocketServletHolder

http://git-wip-us.apache.org/repos/asf/cxf/blob/86a0bb61/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManager.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManager.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManager.java
index f6f8b7f..5e7f4f7 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManager.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocketManager.java
@@ -20,6 +20,7 @@
 package org.apache.cxf.transport.websocket.jetty;
 
 import java.io.IOException;
+import java.util.concurrent.Executor;
 
 import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
@@ -30,6 +31,7 @@ import javax.servlet.http.HttpServletResponse;
 
 import org.apache.cxf.transport.http.AbstractHTTPDestination;
 import org.apache.cxf.transport.websocket.WebSocketDestinationService;
+import org.apache.cxf.workqueue.OneShotAsyncExecutor;
 import org.eclipse.jetty.websocket.WebSocketFactory;
 import org.eclipse.jetty.websocket.WebSocketFactory.Acceptor;
 
@@ -40,12 +42,18 @@ public class JettyWebSocketManager {
     private WebSocketFactory webSocketFactory;
     private AbstractHTTPDestination destination;
     private ServletContext servletContext;
+    private Executor executor;
 
     public void init(AbstractHTTPDestination dest) {
         this.destination = dest;
 
         //TODO customize websocket factory configuration options when using the destination.
         webSocketFactory = new WebSocketFactory((Acceptor)dest, 8192);
+
+        //FIXME get the bus's executor for async service invocation to decouple
+        // the service invocation from websocket's onMessage call which is synchronously
+        // blocked.
+        executor = OneShotAsyncExecutor.getInstance();
     }
 
     void setServletContext(ServletContext servletContext) {
@@ -73,9 +81,13 @@ public class JettyWebSocketManager {
         return false;
     }
 
-    public void service(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
+    void service(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
         if (destination != null) {
             ((WebSocketDestinationService)destination).invokeInternal(null, servletContext, request, response);
         }
     }
+    
+    Executor getExecutor() {
+        return executor;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/86a0bb61/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/websocket/ClientServerWebSocketTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/websocket/ClientServerWebSocketTest.java b/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/websocket/ClientServerWebSocketTest.java
index 336fbe9..e1114bb 100644
--- a/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/websocket/ClientServerWebSocketTest.java
+++ b/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/websocket/ClientServerWebSocketTest.java
@@ -22,12 +22,15 @@ package org.apache.cxf.systest.jaxws.websocket;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.net.URL;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.logging.Logger;
 
 import javax.xml.namespace.QName;
+import javax.xml.ws.AsyncHandler;
 import javax.xml.ws.BindingProvider;
+import javax.xml.ws.Response;
 
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.configuration.security.AuthorizationPolicy;
@@ -40,6 +43,7 @@ import org.apache.hello_world_soap_http.BadRecordLitFault;
 import org.apache.hello_world_soap_http.Greeter;
 import org.apache.hello_world_soap_http.NoSuchCodeLitFault;
 import org.apache.hello_world_soap_http.SOAPService;
+import org.apache.hello_world_soap_http.types.GreetMeLaterResponse;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -249,6 +253,138 @@ public class ClientServerWebSocketTest extends AbstractBusClientServerTestBase {
         }
     }
 
+    @Test
+    public void testAsyncPollingCall() throws Exception {
+        URL wsdl = getClass().getResource("/wsdl/hello_world.wsdl");
+        assertNotNull(wsdl);
+        
+        SOAPService service = new SOAPService(wsdl, serviceName);
+        Greeter greeter = service.getPort(portName, Greeter.class);
+        updateGreeterAddress(greeter, PORT);
+
+        long before = System.currentTimeMillis();
+
+        long delay = 3000;
+        Response<GreetMeLaterResponse> r1 = greeter.greetMeLaterAsync(delay);
+        Response<GreetMeLaterResponse> r2 = greeter.greetMeLaterAsync(delay);
+
+        long after = System.currentTimeMillis();
+
+        assertTrue("Duration of calls exceeded " + (2 * delay) + " ms", after - before < (2 * delay));
+
+        // first time round, responses should not be available yet
+        assertFalse("Response already available.", r1.isDone());
+        assertFalse("Response already available.", r2.isDone());
+
+        // after three seconds responses should be available
+        long waited = 0;
+        while (waited < (delay + 1000)) {
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException ex) {
+               // ignore
+            }
+            if (r1.isDone() && r2.isDone()) {
+                break;
+            }
+            waited += 500;
+        }
+        assertTrue("Response is  not available.", r1.isDone());
+        assertTrue("Response is  not available.", r2.isDone());
+    }
+
+    @Test
+    public void testAsyncSynchronousPolling() throws Exception {
+        URL wsdl = getClass().getResource("/wsdl/hello_world.wsdl");
+        assertNotNull(wsdl);
+        
+        SOAPService service = new SOAPService(wsdl, serviceName);
+        assertNotNull(service);
+        
+        final String expectedString = new String("Hello, finally!");
+          
+        class Poller extends Thread {
+            Response<GreetMeLaterResponse> response;
+            int tid;
+            
+            Poller(Response<GreetMeLaterResponse> r, int t) {
+                response = r;
+                tid = t;
+            }
+            public void run() {
+                if (tid % 2 > 0) {
+                    while (!response.isDone()) {
+                        try {
+                            Thread.sleep(100);
+                        } catch (InterruptedException ex) {
+                            // ignore
+                        }
+                    }
+                }
+                GreetMeLaterResponse reply = null;
+                try {
+                    reply = response.get();
+                } catch (Exception ex) {
+                    fail("Poller " + tid + " failed with " + ex);
+                }
+                assertNotNull("Poller " + tid + ": no response received from service", reply);
+                String s = reply.getResponseType();
+                assertEquals(expectedString, s);
+            }
+        }
+        
+        Greeter greeter = service.getPort(portName, Greeter.class);
+        updateGreeterAddress(greeter, PORT);
+        long before = System.currentTimeMillis();
+
+        
+        long delay = 3000;
+        
+        Response<GreetMeLaterResponse> response = greeter.greetMeLaterAsync(delay);
+        long after = System.currentTimeMillis();
+
+        assertTrue("Duration of calls exceeded " + delay + " ms", after - before < delay);
+
+        // first time round, responses should not be available yet
+        assertFalse("Response already available.", response.isDone());
+
+        
+        Poller[] pollers = new Poller[4];
+        for (int i = 0; i < pollers.length; i++) {
+            pollers[i] = new Poller(response, i);
+        }
+        for (Poller p : pollers) {
+            p.start();
+        }
+        
+        for (Poller p : pollers) {
+            p.join();
+        }
+        
+           
+    }
+
+    static class MyHandler implements AsyncHandler<GreetMeLaterResponse> {
+        static int invocationCount;
+        private String replyBuffer;
+        
+        public void handleResponse(Response<GreetMeLaterResponse> response) {
+            invocationCount++;
+            try {
+                GreetMeLaterResponse reply = response.get();
+                replyBuffer = reply.getResponseType();
+            } catch (InterruptedException ex) {
+                ex.printStackTrace();
+            } catch (ExecutionException ex) {
+                ex.printStackTrace();
+            }
+        }
+        
+        String getReplyBuffer() {
+            return replyBuffer;
+        }
+    }
+
     private void updateGreeterAddress(Greeter greeter, String port) {
         ((BindingProvider)greeter).getRequestContext()
             .put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY,