You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2010/09/01 07:43:39 UTC

svn commit: r991440 - in /camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf: CxfEndpoint.java jaxrs/CxfRsConsumer.java jaxrs/CxfRsInvoker.java

Author: ningjiang
Date: Wed Sep  1 05:43:38 2010
New Revision: 991440

URL: http://svn.apache.org/viewvc?rev=991440&view=rev
Log:
CAMEL-3095 now CxfRSConsumer supports the continuation API

Modified:
    camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
    camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java
    camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java

Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java?rev=991440&r1=991439&r2=991440&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java (original)
+++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java Wed Sep  1 05:43:38 2010
@@ -100,7 +100,7 @@ public class CxfEndpoint extends Default
 
     public CxfEndpoint(String remaining, CxfComponent cxfComponent) {
         super(remaining, cxfComponent);
-        setAddress(remaining);
+        setAddress(remaining);        
     }
     
     public CxfEndpoint(String remaining, CamelContext context) {

Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java?rev=991440&r1=991439&r2=991440&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java (original)
+++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java Wed Sep  1 05:43:38 2010
@@ -32,7 +32,7 @@ public class CxfRsConsumer extends Defau
 
     public CxfRsConsumer(CxfRsEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
-        CxfRsInvoker cxfRsInvoker = new CxfRsInvoker(endpoint, processor);
+        CxfRsInvoker cxfRsInvoker = new CxfRsInvoker(endpoint, this);
         JAXRSServerFactoryBean svrBean = endpoint.createJAXRSServerFactoryBean();
         svrBean.setInvoker(cxfRsInvoker);
         server = svrBean.create();

Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java?rev=991440&r1=991439&r2=991440&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java (original)
+++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java Wed Sep  1 05:43:38 2010
@@ -20,21 +20,25 @@ import java.lang.reflect.Method;
 
 import javax.ws.rs.WebApplicationException;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.ExchangePattern;
-import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.cxf.continuations.Continuation;
+import org.apache.cxf.continuations.ContinuationProvider;
 import org.apache.cxf.jaxrs.JAXRSInvoker;
 import org.apache.cxf.jaxrs.model.OperationResourceInfo;
 import org.apache.cxf.message.Exchange;
 
 public class CxfRsInvoker extends JAXRSInvoker {
-    
-    private Processor processor;
+    private static final Log LOG = LogFactory.getLog(CxfRsInvoker.class);
+    private CxfRsConsumer cxfRsConsumer;
     private CxfRsEndpoint endpoint;
     
-    public CxfRsInvoker(CxfRsEndpoint endpoint, Processor processor) {
+    public CxfRsInvoker(CxfRsEndpoint endpoint, CxfRsConsumer consumer) {
         this.endpoint = endpoint;
-        this.processor = processor;
+        this.cxfRsConsumer = consumer;
     }
     
     protected Object performInvocation(Exchange cxfExchange, final Object serviceObject, Method method,
@@ -45,19 +49,92 @@ public class CxfRsInvoker extends JAXRSI
             // don't delegate the sub resource locator call to camel processor
             return method.invoke(serviceObject, paramArray);
         }
-       
+        Continuation continuation = getContinuation(cxfExchange);
+        if (continuation != null && !endpoint.isSynchronous()) {
+            return asyncInvoke(cxfExchange, serviceObject, method, paramArray, continuation);
+        } else {
+            return syncInvoke(cxfExchange, serviceObject, method, paramArray);
+        }
+    }
+    
+    private Continuation getContinuation(Exchange cxfExchange) {
+        ContinuationProvider provider = 
+            (ContinuationProvider)cxfExchange.getInMessage().get(ContinuationProvider.class.getName());
+        return provider.getContinuation();
+    }
+    
+    private Object asyncInvoke(Exchange cxfExchange, final Object serviceObject, Method method,
+                              Object[] paramArray, final Continuation continuation) throws Exception {
+        if (continuation.isNew()) {
+            ExchangePattern ep = ExchangePattern.InOut;
+            if (method.getReturnType() == Void.class) {
+                ep = ExchangePattern.InOnly;
+            } 
+            final org.apache.camel.Exchange camelExchange = endpoint.createExchange(ep);
+            CxfRsBinding binding = endpoint.getBinding();
+            binding.populateExchangeFromCxfRsRequest(cxfExchange, camelExchange, method, paramArray);
+            boolean sync = cxfRsConsumer.getAsyncProcessor().process(camelExchange, new AsyncCallback() {
+                public void done(boolean doneSync) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Resuming continuation of exchangeId: " + camelExchange.getExchangeId());
+                    }
+                    // resume processing after both, sync and async callbacks
+                    continuation.setObject(camelExchange);
+                    continuation.resume();
+                }
+            });
+            // just need to avoid the continuation.resume is called
+            // before the continuation.suspend is called
+            if (continuation.getObject() != camelExchange && !sync) {
+                // Now we don't set up the timeout value
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Suspending continuation of exchangeId: " + camelExchange.getExchangeId());
+                }
+                // The continuation could be called before the suspend
+                // is called
+                continuation.suspend(0);
+                
+            } else {
+                // just set the response back, as the invoking thread is
+                // not changed
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Processed the Exchange : " + camelExchange.getExchangeId());
+                }
+                return returnResponse(cxfExchange, camelExchange);
+            }
+
+        }
+        if (continuation.isResumed()) {
+            org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation
+                .getObject();
+            return returnResponse(cxfExchange, camelExchange);
+
+        }
+        
+        return null;
+    }
+    
+    private Object syncInvoke(Exchange cxfExchange, final Object serviceObject, Method method,
+                              Object[] paramArray) throws Exception {
         ExchangePattern ep = ExchangePattern.InOut;
+        
         if (method.getReturnType() == Void.class) {
             ep = ExchangePattern.InOnly;
         } 
         org.apache.camel.Exchange camelExchange = endpoint.createExchange(ep);
         CxfRsBinding binding = endpoint.getBinding();
         binding.populateExchangeFromCxfRsRequest(cxfExchange, camelExchange, method, paramArray);
+        
         try {
-            processor.process(camelExchange);
+            cxfRsConsumer.getProcessor().process(camelExchange);
         } catch (Exception exception) {
             camelExchange.setException(exception);
         }
+        return returnResponse(cxfExchange, camelExchange);
+        
+    }
+    
+    private Object returnResponse(Exchange cxfExchange, org.apache.camel.Exchange camelExchange) throws Exception {
         if (camelExchange.getException() != null) {
             Throwable exception = camelExchange.getException();
             Object result = null;
@@ -69,7 +146,7 @@ public class CxfRsInvoker extends JAXRSI
             }
             return result;
         }
-        return binding.populateCxfRsResponseFromExchange(camelExchange, cxfExchange);
+        return endpoint.getBinding().populateCxfRsResponseFromExchange(camelExchange, cxfExchange);
     }
 
 }