You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2010/09/28 12:13:58 UTC
svn commit: r1002095 -
/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
Author: ningjiang
Date: Tue Sep 28 10:13:57 2010
New Revision: 1002095
URL: http://svn.apache.org/viewvc?rev=1002095&view=rev
Log:
CAMEL-3169 Add continuation synchronize block to avoid continuation is resumed befor it is suspended by the other thread
Modified:
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?rev=1002095&r1=1002094&r2=1002095&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java (original)
+++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java Tue Sep 28 10:13:57 2010
@@ -71,47 +71,54 @@ public class CxfConsumer extends Default
}
private Object asyncInvoke(Exchange cxfExchange, final Continuation continuation) {
- if (continuation.isNew()) {
- final org.apache.camel.Exchange camelExchange = perpareCamelExchange(cxfExchange);
-
- // use the asynchronous API to process the exchange
- boolean sync = getAsyncProcessor().process(camelExchange, new AsyncCallback() {
- public void done(boolean doneSync) {
+ synchronized (continuation) {
+ if (continuation.isNew()) {
+ final org.apache.camel.Exchange camelExchange = perpareCamelExchange(cxfExchange);
+
+ // use the asynchronous API to process the exchange
+ boolean sync = getAsyncProcessor().process(camelExchange, new AsyncCallback() {
+ public void done(boolean doneSync) {
+ // make sure the continuation resume will not be called before the suspend method in other thread
+ synchronized (continuation) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Resuming continuation of exchangeId: "
+ + camelExchange.getExchangeId());
+ }
+ // resume processing after both, sync and async callbacks
+ continuation.setObject(camelExchange);
+ continuation.resume();
+ }
+ }
+ });
+ // just need to avoid the continuation.resume is called
+ // before the continuation.suspend is called
+ if (continuation.getObject() != camelExchange && !sync) {
+ // Now we don't set up the timeout value
if (LOG.isTraceEnabled()) {
- LOG.trace("Resuming continuation of exchangeId: "
+ LOG.trace("Suspending continuation of exchangeId: "
+ camelExchange.getExchangeId());
}
- // resume processing after both, sync and async callbacks
- continuation.setObject(camelExchange);
- continuation.resume();
- }
- });
- // just need to avoid the continuation.resume is called
- // before the continuation.suspend is called
- if (continuation.getObject() != camelExchange && !sync) {
- // Now we don't set up the timeout value
- if (LOG.isTraceEnabled()) {
- LOG.trace("Suspending continuation of exchangeId: "
- + camelExchange.getExchangeId());
- }
- // The continuation could be called before the suspend
- // is called
- continuation.suspend(0);
- } else {
- // just set the response back, as the invoking thread is
- // not changed
- if (LOG.isTraceEnabled()) {
- LOG.trace("Processed the Exchange : " + camelExchange.getExchangeId());
+ // The continuation could be called before the
+ // suspend
+ // is called
+ continuation.suspend(0);
+ } else {
+ // just set the response back, as the invoking
+ // thread is
+ // not changed
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processed the Exchange : " + camelExchange.getExchangeId());
+ }
+ setResponseBack(cxfExchange, camelExchange);
}
- setResponseBack(cxfExchange, camelExchange);
- }
- }
- if (continuation.isResumed()) {
- org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation
- .getObject();
- setResponseBack(cxfExchange, camelExchange);
+ }
+ if (continuation.isResumed()) {
+ org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation
+ .getObject();
+ setResponseBack(cxfExchange, camelExchange);
+ }
}
return null;
}