You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2010/11/18 09:29:41 UTC

svn commit: r1036342 - /camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java

Author: ningjiang
Date: Thu Nov 18 08:29:40 2010
New Revision: 1036342

URL: http://svn.apache.org/viewvc?rev=1036342&view=rev
Log:
CAMEL-3342 Only call the continuation API when camel-cxf run with CXF 2.3.x

Modified:
    camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java

Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?rev=1036342&r1=1036341&r2=1036342&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java (original)
+++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java Thu Nov 18 08:29:40 2010
@@ -38,6 +38,7 @@ import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.service.invoker.Invoker;
 import org.apache.cxf.service.model.BindingOperationInfo;
+import org.apache.cxf.version.Version;
 
 /**
  * A Consumer of exchanges for a service in CXF.  CxfConsumer acts a CXF
@@ -64,21 +65,36 @@ public class CxfConsumer extends Default
                     LOG.trace("Received CXF Request: " + cxfExchange);
                 }                
                 Continuation continuation = getContinuation(cxfExchange);
-                if (continuation != null && !endpoint.isSynchronous()) {
+                // Only calling the continuation API for CXF 2.3.x 
+                if (continuation != null && !endpoint.isSynchronous() && Version.getCurrentVersion().startsWith("2.3")) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Calling the Camel async processors.");
+                    }
                     return asyncInvoke(cxfExchange, continuation);
                 } else {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Calling the Camel sync processors.");
+                    }
                     return syncInvoke(cxfExchange);
                 }
             }            
             
+            // NOTE this code cannot work with CXF 2.2.x
             private Object asyncInvoke(Exchange cxfExchange, final Continuation continuation) {
                 synchronized (continuation) {
                     if (continuation.isNew()) {
                         final org.apache.camel.Exchange camelExchange = perpareCamelExchange(cxfExchange);
-                        // TODO we need to use the CXF 2.3.0 new Continuation API in CAMEL 3.0.0
-                        // The below code should work for CXF 2.2.x and CXF 2.3.x at the same time
+                        
+                        // Now we don't set up the timeout value
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Suspending continuation of exchangeId: " + camelExchange.getExchangeId());
+                        }
+                        // TODO Support to set the timeout in case the Camel can't send the response back on time.
+                        // The continuation could be called before the suspend is called
+                        continuation.suspend(0);
+
                         // use the asynchronous API to process the exchange
-                        boolean sync = getAsyncProcessor().process(camelExchange, new AsyncCallback() {
+                        getAsyncProcessor().process(camelExchange, new AsyncCallback() {
                             public void done(boolean doneSync) {
                                 // make sure the continuation resume will not be called before the suspend method in other thread
                                 synchronized (continuation) {
@@ -92,26 +108,7 @@ public class CxfConsumer extends Default
                                 }
                             }
                         });
-                        // just need to avoid the continuation.resume is called
-                        // before the continuation.suspend is called
-                        if (continuation.getObject() != camelExchange && !sync) {
-                            // Now we don't set up the timeout value
-                            if (LOG.isTraceEnabled()) {
-                                LOG.trace("Suspending continuation of exchangeId: "
-                                          + camelExchange.getExchangeId());
-                            }
-                            // The continuation could be called before the
-                            // suspend is called
-                            continuation.suspend(0);
-                        } else {
-                            // just set the response back, as the invoking
-                            // thread is not changed
-                            if (LOG.isTraceEnabled()) {
-                                LOG.trace("Processed the Exchange : " + camelExchange.getExchangeId());
-                            }
-                            setResponseBack(cxfExchange, camelExchange);
-                        }
-
+                        
                     }
                     if (continuation.isResumed()) {
                         org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation