You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2010/09/01 04:32:49 UTC

svn commit: r991419 - in /camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf: CxfConstants.java CxfConsumer.java CxfProducer.java

Author: ningjiang
Date: Wed Sep  1 02:32:49 2010
New Revision: 991419

URL: http://svn.apache.org/viewvc?rev=991419&view=rev
Log:
CAMEL-3086 Let the camel-cxf consumer to leverage the CXF continuation API to call the async processor

Modified:
    camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConstants.java
    camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
    camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java

Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConstants.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConstants.java?rev=991419&r1=991418&r2=991419&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConstants.java (original)
+++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConstants.java Wed Sep  1 02:32:49 2010
@@ -40,6 +40,7 @@ public interface CxfConstants {
     String OPERATION_NAMESPACE = "operationNamespace";
     String SPRING_CONTEXT_ENDPOINT = "bean:";
     String CAMEL_TRANSPORT_PREFIX = "camel:";
+    String JAXWS_CONTEXT = "jaxwsContext";
     String CXF_EXCHANGE = "org.apache.cxf.message.exchange";
     String CAMEL_EXCHANGE = "org.apache.camel.exchange";
     String CAMEL_CXF_RS_USING_HTTP_API = "CamelCxfRsUsingHttpAPI";

Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?rev=991419&r1=991418&r2=991419&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java (original)
+++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java Wed Sep  1 02:32:49 2010
@@ -23,10 +23,13 @@ import javax.xml.ws.WebFault;
 
 import org.w3c.dom.Element;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.cxf.continuations.Continuation;
+import org.apache.cxf.continuations.ContinuationProvider;
 import org.apache.cxf.endpoint.Server;
 import org.apache.cxf.frontend.ServerFactoryBean;
 import org.apache.cxf.interceptor.Fault;
@@ -56,11 +59,89 @@ public class CxfConsumer extends Default
 
             // we receive a CXF request when this method is called
             public Object invoke(Exchange cxfExchange, Object o) {
-
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("Received CXF Request: " + cxfExchange);
+                }                
+                Continuation continuation = getContinuation(cxfExchange);
+                if (continuation != null) {
+                    return asyncInvoke(cxfExchange, continuation);
+                } else {
+                    return syncInvoke(cxfExchange);
+                }
+            }            
+            
+            private Object asyncInvoke(Exchange cxfExchange, final Continuation continuation) {
+                if (continuation.isNew()) {
+                    final org.apache.camel.Exchange camelExchange = perpareCamelExchange(cxfExchange);
+
+                    // use the asynchronous API to process the exchange
+                    boolean sync = getAsyncProcessor().process(camelExchange, new AsyncCallback() {
+                        public void done(boolean doneSync) {
+                            if (log.isTraceEnabled()) {
+                                log.trace("Resuming continuation of exchangeId: "
+                                          + camelExchange.getExchangeId());
+                            }
+                            // resume processing after both, sync and async callbacks
+                            continuation.setObject(camelExchange);
+                            continuation.resume();
+                        }
+                    });
+                    // just need to avoid the continuation.resume is called
+                    // before the continuation.suspend is called
+                    if (continuation.getObject() != camelExchange && !sync) {
+                        // Now we don't set up the timeout value
+                        if (log.isTraceEnabled()) {
+                            log.trace("Suspending continuation of exchangeId: "
+                                      + camelExchange.getExchangeId());
+                        }
+                        // The continuation could be called before the suspend
+                        // is called
+                        continuation.suspend(0);
+                    } else {
+                        // just set the response back, as the invoking thread is
+                        // not changed
+                        if (log.isTraceEnabled()) {
+                            log.trace("Processed the Exchange : " + camelExchange.getExchangeId());
+                        }
+                        setResponseBack(cxfExchange, camelExchange);
+                    }
+
                 }
-                
+                if (continuation.isResumed()) {
+                    org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation
+                        .getObject();
+                    setResponseBack(cxfExchange, camelExchange);
+
+                }
+                return null;
+            }
+
+            private Continuation getContinuation(Exchange cxfExchange) {
+                ContinuationProvider provider = 
+                    (ContinuationProvider)cxfExchange.getInMessage().get(ContinuationProvider.class.getName());
+                return provider.getContinuation();
+            }
+            
+            private Object syncInvoke(Exchange cxfExchange) {
+                org.apache.camel.Exchange camelExchange = perpareCamelExchange(cxfExchange);               
+                // send Camel exchange to the target processor
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Processing +++ START +++");
+                }
+                try {
+                    getProcessor().process(camelExchange);
+                } catch (Exception e) {
+                    throw new Fault(e);
+                }
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Processing +++ END +++");
+                }
+                setResponseBack(cxfExchange, camelExchange);
+                // response should have been set in outMessage's content
+                return null;
+            }
+            
+            private org.apache.camel.Exchange perpareCamelExchange(Exchange cxfExchange) {
                 // get CXF binding
                 CxfEndpoint endpoint = (CxfEndpoint)getEndpoint();
                 CxfBinding binding = endpoint.getCxfBinding();
@@ -94,24 +175,22 @@ public class CxfConsumer extends Default
                 
                 // bind the CXF request into a Camel exchange
                 binding.populateExchangeFromCxfRequest(cxfExchange, camelExchange);
-                
                 // extract the javax.xml.ws header
                 Map<String, Object> context = new HashMap<String, Object>();
-                binding.extractJaxWsContext(cxfExchange, context);                 
-                // send Camel exchange to the target processor
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Processing +++ START +++");
-                }
-                try {
-                    getProcessor().process(camelExchange);
-                } catch (Exception e) {
-                    throw new Fault(e);
-                }
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Processing +++ END +++");
-                }
+                binding.extractJaxWsContext(cxfExchange, context);
+                // put the context into camelExchange
+                camelExchange.setProperty(CxfConstants.JAXWS_CONTEXT, context);
+                return camelExchange;
+                
+            }
+            
+            @SuppressWarnings("unchecked")
+            private void setResponseBack(Exchange cxfExchange, org.apache.camel.Exchange camelExchange) {
+                CxfEndpoint endpoint = (CxfEndpoint)getEndpoint();
+                CxfBinding binding = endpoint.getCxfBinding();                
+                
                 checkFailure(camelExchange);
-              
+                
                 // bind the Camel response into a CXF response
                 if (camelExchange.getPattern().isOutCapable()) {
                     binding.populateCxfResponseFromExchange(camelExchange, cxfExchange);
@@ -121,9 +200,7 @@ public class CxfConsumer extends Default
                 checkFailure(camelExchange);
 
                 // copy the headers javax.xml.ws header back
-                binding.copyJaxWsContext(cxfExchange, context);
-                // response should have been set in outMessage's content
-                return null;
+                binding.copyJaxWsContext(cxfExchange, (Map<String, Object>)camelExchange.getProperty(CxfConstants.JAXWS_CONTEXT));
             }
 
             private void checkFailure(org.apache.camel.Exchange camelExchange) throws Fault {

Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?rev=991419&r1=991418&r2=991419&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java (original)
+++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java Wed Sep  1 02:32:49 2010
@@ -73,7 +73,7 @@ public class CxfProducer extends Default
     // so we don't delegate the sync process call to the async process 
     public boolean process(Exchange camelExchange, AsyncCallback callback) {
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Process exchange: " + camelExchange);
+            LOG.trace("Process exchange: " + camelExchange + " in an async way.");
         }
         
         try {
@@ -109,7 +109,7 @@ public class CxfProducer extends Default
     public void process(Exchange camelExchange) throws Exception {
         
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Process exchange: " + camelExchange);
+            LOG.trace("Process exchange: " + camelExchange + "in sync way.");
         }
         
         // create CXF exchange