You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2010/11/18 09:29:41 UTC
svn commit: r1036342 -
/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
Author: ningjiang
Date: Thu Nov 18 08:29:40 2010
New Revision: 1036342
URL: http://svn.apache.org/viewvc?rev=1036342&view=rev
Log:
CAMEL-3342 Only call the continuation API when camel-cxf run with CXF 2.3.x
Modified:
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?rev=1036342&r1=1036341&r2=1036342&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java (original)
+++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java Thu Nov 18 08:29:40 2010
@@ -38,6 +38,7 @@ import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
import org.apache.cxf.service.invoker.Invoker;
import org.apache.cxf.service.model.BindingOperationInfo;
+import org.apache.cxf.version.Version;
/**
* A Consumer of exchanges for a service in CXF. CxfConsumer acts a CXF
@@ -64,21 +65,36 @@ public class CxfConsumer extends Default
LOG.trace("Received CXF Request: " + cxfExchange);
}
Continuation continuation = getContinuation(cxfExchange);
- if (continuation != null && !endpoint.isSynchronous()) {
+ // Only calling the continuation API for CXF 2.3.x
+ if (continuation != null && !endpoint.isSynchronous() && Version.getCurrentVersion().startsWith("2.3")) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Calling the Camel async processors.");
+ }
return asyncInvoke(cxfExchange, continuation);
} else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Calling the Camel sync processors.");
+ }
return syncInvoke(cxfExchange);
}
}
+ // NOTE this code cannot work with CXF 2.2.x
private Object asyncInvoke(Exchange cxfExchange, final Continuation continuation) {
synchronized (continuation) {
if (continuation.isNew()) {
final org.apache.camel.Exchange camelExchange = perpareCamelExchange(cxfExchange);
- // TODO we need to use the CXF 2.3.0 new Continuation API in CAMEL 3.0.0
- // The below code should work for CXF 2.2.x and CXF 2.3.x at the same time
+
+ // Now we don't set up the timeout value
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Suspending continuation of exchangeId: " + camelExchange.getExchangeId());
+ }
+ // TODO Support to set the timeout in case the Camel can't send the response back on time.
+ // The continuation could be called before the suspend is called
+ continuation.suspend(0);
+
// use the asynchronous API to process the exchange
- boolean sync = getAsyncProcessor().process(camelExchange, new AsyncCallback() {
+ getAsyncProcessor().process(camelExchange, new AsyncCallback() {
public void done(boolean doneSync) {
// make sure the continuation resume will not be called before the suspend method in other thread
synchronized (continuation) {
@@ -92,26 +108,7 @@ public class CxfConsumer extends Default
}
}
});
- // just need to avoid the continuation.resume is called
- // before the continuation.suspend is called
- if (continuation.getObject() != camelExchange && !sync) {
- // Now we don't set up the timeout value
- if (LOG.isTraceEnabled()) {
- LOG.trace("Suspending continuation of exchangeId: "
- + camelExchange.getExchangeId());
- }
- // The continuation could be called before the
- // suspend is called
- continuation.suspend(0);
- } else {
- // just set the response back, as the invoking
- // thread is not changed
- if (LOG.isTraceEnabled()) {
- LOG.trace("Processed the Exchange : " + camelExchange.getExchangeId());
- }
- setResponseBack(cxfExchange, camelExchange);
- }
-
+
}
if (continuation.isResumed()) {
org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation