You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2010/09/28 12:13:58 UTC

svn commit: r1002095 - /camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java

Author: ningjiang
Date: Tue Sep 28 10:13:57 2010
New Revision: 1002095

URL: http://svn.apache.org/viewvc?rev=1002095&view=rev
Log:
CAMEL-3169 Add continuation synchronize block to avoid continuation is resumed befor it is suspended by the other thread

Modified:
    camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java

Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?rev=1002095&r1=1002094&r2=1002095&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java (original)
+++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java Tue Sep 28 10:13:57 2010
@@ -71,47 +71,54 @@ public class CxfConsumer extends Default
             }            
             
             private Object asyncInvoke(Exchange cxfExchange, final Continuation continuation) {
-                if (continuation.isNew()) {
-                    final org.apache.camel.Exchange camelExchange = perpareCamelExchange(cxfExchange);
-
-                    // use the asynchronous API to process the exchange
-                    boolean sync = getAsyncProcessor().process(camelExchange, new AsyncCallback() {
-                        public void done(boolean doneSync) {
+                synchronized (continuation) {
+                    if (continuation.isNew()) {
+                        final org.apache.camel.Exchange camelExchange = perpareCamelExchange(cxfExchange);
+
+                        // use the asynchronous API to process the exchange
+                        boolean sync = getAsyncProcessor().process(camelExchange, new AsyncCallback() {
+                            public void done(boolean doneSync) {
+                                // make sure the continuation resume will not be called before the suspend method in other thread
+                                synchronized (continuation) {
+                                    if (LOG.isTraceEnabled()) {
+                                        LOG.trace("Resuming continuation of exchangeId: "
+                                                  + camelExchange.getExchangeId());
+                                    }
+                                    // resume processing after both, sync and async callbacks
+                                    continuation.setObject(camelExchange);
+                                    continuation.resume();
+                                }
+                            }
+                        });
+                        // just need to avoid the continuation.resume is called
+                        // before the continuation.suspend is called
+                        if (continuation.getObject() != camelExchange && !sync) {
+                            // Now we don't set up the timeout value
                             if (LOG.isTraceEnabled()) {
-                                LOG.trace("Resuming continuation of exchangeId: "
+                                LOG.trace("Suspending continuation of exchangeId: "
                                           + camelExchange.getExchangeId());
                             }
-                            // resume processing after both, sync and async callbacks
-                            continuation.setObject(camelExchange);
-                            continuation.resume();
-                        }
-                    });
-                    // just need to avoid the continuation.resume is called
-                    // before the continuation.suspend is called
-                    if (continuation.getObject() != camelExchange && !sync) {
-                        // Now we don't set up the timeout value
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("Suspending continuation of exchangeId: "
-                                      + camelExchange.getExchangeId());
-                        }
-                        // The continuation could be called before the suspend
-                        // is called
-                        continuation.suspend(0);
-                    } else {
-                        // just set the response back, as the invoking thread is
-                        // not changed
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("Processed the Exchange : " + camelExchange.getExchangeId());
+                            // The continuation could be called before the
+                            // suspend
+                            // is called
+                            continuation.suspend(0);
+                        } else {
+                            // just set the response back, as the invoking
+                            // thread is
+                            // not changed
+                            if (LOG.isTraceEnabled()) {
+                                LOG.trace("Processed the Exchange : " + camelExchange.getExchangeId());
+                            }
+                            setResponseBack(cxfExchange, camelExchange);
                         }
-                        setResponseBack(cxfExchange, camelExchange);
-                    }
 
-                }
-                if (continuation.isResumed()) {
-                    org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation
-                        .getObject();
-                    setResponseBack(cxfExchange, camelExchange);
+                    }
+                    if (continuation.isResumed()) {
+                        org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation
+                            .getObject();
+                        setResponseBack(cxfExchange, camelExchange);
 
+                    }
                 }
                 return null;
             }