You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2013/08/02 05:35:27 UTC

[3/3] git commit: CAMEL-6597 Camel conduit should support the JAXWS Async API out of box

CAMEL-6597 Camel conduit should support the JAXWS Async API out of box


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f58ccf9b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f58ccf9b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f58ccf9b

Branch: refs/heads/master
Commit: f58ccf9bd5261549dad2f1c13ab35339fa715c72
Parents: 7853023
Author: Willem Jiang <ni...@apache.org>
Authored: Fri Aug 2 10:35:28 2013 +0800
Committer: Willem Jiang <ni...@apache.org>
Committed: Fri Aug 2 11:33:33 2013 +0800

----------------------------------------------------------------------
 .../cxf/transport/CamelOutputStream.java        | 94 +++++++++++++++++---
 .../cxf/transport/JaxWSCamelConduitTest.java    | 14 +++
 .../cxf/transport/JaxWSCamelTestSupport.java    | 31 ++++++-
 3 files changed, 125 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f58ccf9b/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java b/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java
index bd1cf62..6cc1bb1 100644
--- a/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java
+++ b/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java
@@ -18,16 +18,24 @@ package org.apache.camel.component.cxf.transport;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Producer;
 import org.apache.camel.component.cxf.common.header.CxfHeaderHelper;
 import org.apache.camel.component.cxf.common.message.CxfMessageHelper;
 import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.cxf.Bus;
 import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.PhaseInterceptorChain;
 import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.workqueue.AutomaticWorkQueue;
+import org.apache.cxf.workqueue.WorkQueueManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,12 +45,13 @@ class CamelOutputStream extends CachedOutputStream {
     /**
      * 
      */
-    private Message outMessage;
+    private final Message outMessage;
     private boolean isOneWay;
     private String targetCamelEndpointUri;
     private Producer producer;
     private HeaderFilterStrategy headerFilterStrategy;
     private MessageObserver observer;
+    private boolean hasLoggedAsyncWarning;
 
     public CamelOutputStream(String targetCamelEndpointUri, Producer producer, 
                              HeaderFilterStrategy headerFilterStrategy, MessageObserver observer, 
@@ -60,6 +69,7 @@ class CamelOutputStream extends CachedOutputStream {
 
     protected void doClose() throws IOException {
         isOneWay = outMessage.getExchange().isOneWay();
+        
         commitOutputMessage();
     }
 
@@ -76,7 +86,7 @@ class CamelOutputStream extends CachedOutputStream {
             pattern = ExchangePattern.InOut;
         }
         LOG.debug("send the message to endpoint {}", this.targetCamelEndpointUri);
-        org.apache.camel.Exchange exchange = this.producer.createExchange(pattern);
+        final org.apache.camel.Exchange exchange = this.producer.createExchange(pattern);
 
         exchange.setProperty(Exchange.TO_ENDPOINT, this.targetCamelEndpointUri);
         CachedOutputStream outputStream = (CachedOutputStream) outMessage.getContent(OutputStream.class);
@@ -86,31 +96,89 @@ class CamelOutputStream extends CachedOutputStream {
         // TODO support different encoding
         exchange.getIn().setBody(outputStream.getInputStream());
         LOG.debug("template sending request: ", exchange.getIn());
-        Exception exception;
+        
+        if (outMessage.getExchange().isSynchronous()) {
+            syncInvoke(exchange);
+        } else {
+            // submit the request to the work queue
+            asyncInvokeFromWorkQueue(exchange);
+        }
+
+    }
+    
+    protected void syncInvoke(org.apache.camel.Exchange exchange) throws IOException {
         try {
             this.producer.process(exchange);
         } catch (Exception ex) {
-            exception = ex;
+            exchange.setException(ex);
         }
         // Throw the exception that the template get
-        exception = exchange.getException();
+        Exception exception = exchange.getException();
         if (exception != null) {
             throw new IOException("Cannot send the request message.", exchange.getException());
         }
         exchange.setProperty(CamelTransportConstants.CXF_EXCHANGE, outMessage.getExchange());
         if (!isOneWay) {
-            handleResponse(exchange);
+            handleResponseInternal(exchange);
         }
-
+        
     }
-
-    private void handleResponse(org.apache.camel.Exchange exchange) throws IOException {
-        org.apache.cxf.message.Message inMessage = null;
+     
+    protected void asyncInvokeFromWorkQueue(final org.apache.camel.Exchange exchange) throws IOException {
+        Runnable runnable = new Runnable() {
+            public void run() {
+                try {
+                    syncInvoke(exchange);
+                } catch (Throwable e) {
+                    ((PhaseInterceptorChain)outMessage.getInterceptorChain()).abort();
+                    outMessage.setContent(Exception.class, e);
+                    ((PhaseInterceptorChain)outMessage.getInterceptorChain()).unwind(outMessage);
+                    MessageObserver mo = outMessage.getInterceptorChain().getFaultObserver();
+                    if (mo == null) {
+                        mo = outMessage.getExchange().get(MessageObserver.class);
+                    }
+                    mo.onMessage(outMessage);
+                }
+            }
+        };
+        
         try {
-            inMessage = CxfMessageHelper.getCxfInMessage(this.headerFilterStrategy, exchange, true);
-        } catch (Exception ex) {
-            throw new IOException("Cannot get the response message. ", ex);
+            Executor ex = outMessage.getExchange().get(Executor.class);
+            if (ex != null) {
+                final Executor ex2 = ex;
+                final Runnable origRunnable = runnable;
+                runnable = new Runnable() {
+                    public void run() {
+                        outMessage.getExchange().put(Executor.class.getName() 
+                                                     + ".USING_SPECIFIED", Boolean.TRUE);
+                        ex2.execute(origRunnable);
+                    }
+                };
+            } else {
+                WorkQueueManager mgr = outMessage.getExchange().get(Bus.class)
+                    .getExtension(WorkQueueManager.class);
+                AutomaticWorkQueue qu = mgr.getNamedWorkQueue("nmr-conduit");
+                if (qu == null) {
+                    qu = mgr.getAutomaticWorkQueue();
+                }
+                // need to set the time out somewhere
+                qu.execute(runnable);
+            } 
+        } catch (RejectedExecutionException rex) {
+            if (!hasLoggedAsyncWarning) {
+                LOG.warn("Executor rejected background task to retrieve the response.  Suggest increasing the workqueue settings.");
+                hasLoggedAsyncWarning = true;
+            }
+            LOG.info("Executor rejected background task to retrieve the response, running on current thread.");
+            syncInvoke(exchange);
         }
+    }
+
+    private void handleResponseInternal(org.apache.camel.Exchange exchange) {
+        org.apache.cxf.message.Message inMessage = null;
+        inMessage = CxfMessageHelper.getCxfInMessage(this.headerFilterStrategy, exchange, true);
         this.observer.onMessage(inMessage);
     }
+    
+    
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/f58ccf9b/components/camel-cxf-transport/src/test/java/org/apache/camel/component/cxf/transport/JaxWSCamelConduitTest.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf-transport/src/test/java/org/apache/camel/component/cxf/transport/JaxWSCamelConduitTest.java b/components/camel-cxf-transport/src/test/java/org/apache/camel/component/cxf/transport/JaxWSCamelConduitTest.java
index 5f3922b..788fbed 100644
--- a/components/camel-cxf-transport/src/test/java/org/apache/camel/component/cxf/transport/JaxWSCamelConduitTest.java
+++ b/components/camel-cxf-transport/src/test/java/org/apache/camel/component/cxf/transport/JaxWSCamelConduitTest.java
@@ -16,10 +16,14 @@
  */
 package org.apache.camel.component.cxf.transport;
 
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.Test;
+
 import static org.hamcrest.CoreMatchers.is;
 
 /**
@@ -68,4 +72,14 @@ public class JaxWSCamelConduitTest extends JaxWSCamelTestSupport {
     public void testStart3() {
         assertThat(getSampleWS("direct:start3").getSomething(), is("Something"));
     }
+    
+    @Test
+    public void testAsyncInvocation() throws InterruptedException, ExecutionException {
+        
+        Future<?> result = getSampleWSAsyncWithCXFAPI("direct:start2").getSomethingAsync();
+        // as the CXF will build the getSomethingResponse by using asm, so we cannot get the response directly.
+        assertNotNull(result.get());
+        
+       
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/f58ccf9b/components/camel-cxf-transport/src/test/java/org/apache/camel/component/cxf/transport/JaxWSCamelTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf-transport/src/test/java/org/apache/camel/component/cxf/transport/JaxWSCamelTestSupport.java b/components/camel-cxf-transport/src/test/java/org/apache/camel/component/cxf/transport/JaxWSCamelTestSupport.java
index f1ac596..3fe416a 100644
--- a/components/camel-cxf-transport/src/test/java/org/apache/camel/component/cxf/transport/JaxWSCamelTestSupport.java
+++ b/components/camel-cxf-transport/src/test/java/org/apache/camel/component/cxf/transport/JaxWSCamelTestSupport.java
@@ -16,16 +16,22 @@
  */
 package org.apache.camel.component.cxf.transport;
 
+import java.util.concurrent.Future;
+
 import javax.jws.WebMethod;
+import javax.jws.WebParam;
 import javax.jws.WebResult;
 import javax.jws.WebService;
 import javax.xml.namespace.QName;
+import javax.xml.ws.AsyncHandler;
 import javax.xml.ws.Endpoint;
+import javax.xml.ws.Response;
 import javax.xml.ws.Service;
 
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.apache.cxf.Bus;
 import org.apache.cxf.BusFactory;
+import org.apache.cxf.feature.Feature;
 import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
 import org.junit.Before;
 
@@ -51,9 +57,24 @@ public class JaxWSCamelTestSupport extends CamelTestSupport {
     @WebService(targetNamespace = "urn:test", serviceName = "testService", portName = "testPort")
     public interface SampleWS {
 
-        @WebMethod
+        @WebMethod(operationName = "getSomething")
         @WebResult(name = "result", targetNamespace = "urn:test")
         String getSomething();
+       
+    }
+    
+    @WebService(targetNamespace = "urn:test", serviceName = "testService", portName = "testPort")
+    public interface SampleWSAsync {
+        @WebMethod(operationName = "getSomething")
+        @WebResult(name = "result", targetNamespace = "urn:test")
+        String getSomething();
+        
+        @WebMethod(operationName = "getSomething")
+        Response<String> getSomethingAsync();
+        
+        @WebMethod(operationName = "getSomething")
+        Future<?> getSomethingAsync(@WebParam(name = "asyncHandler", targetNamespace = "")
+            AsyncHandler<String> asyncHandler);
     }
     
     public static class SampleWSImpl implements SampleWS {
@@ -99,6 +120,14 @@ public class JaxWSCamelTestSupport extends CamelTestSupport {
         return factory.create(SampleWS.class);
     }
     
+    public SampleWSAsync getSampleWSAsyncWithCXFAPI(String camelEndpoint) {
+        JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
+        factory.setAddress("camel://" + camelEndpoint);
+        factory.setServiceClass(SampleWSAsync.class);
+        factory.setBus(bus);
+        return factory.create(SampleWSAsync.class);
+    }
+    
     /**
      * Create a SampleWS Server to a specified route
      * @param camelEndpoint


Re: [3/3] git commit: CAMEL-6597 Camel conduit should support the JAXWS Async API out of box

Posted by Daniel Kulp <dk...@apache.org>.
On Aug 1, 2013, at 11:35 PM, ningjiang@apache.org wrote:

> +            } else {
> +                WorkQueueManager mgr = outMessage.getExchange().get(Bus.class)
> +                    .getExtension(WorkQueueManager.class);
> +                AutomaticWorkQueue qu = mgr.getNamedWorkQueue("nmr-conduit");
> +                if (qu == null) {
> +                    qu = mgr.getAutomaticWorkQueue();
> +                }
> +                // need to set the time out somewhere
> +                qu.execute(runnable);
> +            } 

Why "nmr-conduit"?    Can we just use "camel-cxf-conduit" or something that doesn't have JBI-isms in it?


-- 
Daniel Kulp
dkulp@apache.org - http://dankulp.com/blog
Talend Community Coder - http://coders.talend.com