You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2010/09/01 04:32:49 UTC
svn commit: r991419 - in
/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf:
CxfConstants.java CxfConsumer.java CxfProducer.java
Author: ningjiang
Date: Wed Sep 1 02:32:49 2010
New Revision: 991419
URL: http://svn.apache.org/viewvc?rev=991419&view=rev
Log:
CAMEL-3086 Let the camel-cxf consumer to leverage the CXF continuation API to call the async processor
Modified:
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConstants.java
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConstants.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConstants.java?rev=991419&r1=991418&r2=991419&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConstants.java (original)
+++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConstants.java Wed Sep 1 02:32:49 2010
@@ -40,6 +40,7 @@ public interface CxfConstants {
String OPERATION_NAMESPACE = "operationNamespace";
String SPRING_CONTEXT_ENDPOINT = "bean:";
String CAMEL_TRANSPORT_PREFIX = "camel:";
+ String JAXWS_CONTEXT = "jaxwsContext";
String CXF_EXCHANGE = "org.apache.cxf.message.exchange";
String CAMEL_EXCHANGE = "org.apache.camel.exchange";
String CAMEL_CXF_RS_USING_HTTP_API = "CamelCxfRsUsingHttpAPI";
Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?rev=991419&r1=991418&r2=991419&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java (original)
+++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java Wed Sep 1 02:32:49 2010
@@ -23,10 +23,13 @@ import javax.xml.ws.WebFault;
import org.w3c.dom.Element;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.cxf.continuations.Continuation;
+import org.apache.cxf.continuations.ContinuationProvider;
import org.apache.cxf.endpoint.Server;
import org.apache.cxf.frontend.ServerFactoryBean;
import org.apache.cxf.interceptor.Fault;
@@ -56,11 +59,89 @@ public class CxfConsumer extends Default
// we receive a CXF request when this method is called
public Object invoke(Exchange cxfExchange, Object o) {
-
if (LOG.isTraceEnabled()) {
LOG.trace("Received CXF Request: " + cxfExchange);
+ }
+ Continuation continuation = getContinuation(cxfExchange);
+ if (continuation != null) {
+ return asyncInvoke(cxfExchange, continuation);
+ } else {
+ return syncInvoke(cxfExchange);
+ }
+ }
+
+ private Object asyncInvoke(Exchange cxfExchange, final Continuation continuation) {
+ if (continuation.isNew()) {
+ final org.apache.camel.Exchange camelExchange = perpareCamelExchange(cxfExchange);
+
+ // use the asynchronous API to process the exchange
+ boolean sync = getAsyncProcessor().process(camelExchange, new AsyncCallback() {
+ public void done(boolean doneSync) {
+ if (log.isTraceEnabled()) {
+ log.trace("Resuming continuation of exchangeId: "
+ + camelExchange.getExchangeId());
+ }
+ // resume processing after both, sync and async callbacks
+ continuation.setObject(camelExchange);
+ continuation.resume();
+ }
+ });
+ // just need to avoid the continuation.resume is called
+ // before the continuation.suspend is called
+ if (continuation.getObject() != camelExchange && !sync) {
+ // Now we don't set up the timeout value
+ if (log.isTraceEnabled()) {
+ log.trace("Suspending continuation of exchangeId: "
+ + camelExchange.getExchangeId());
+ }
+ // The continuation could be called before the suspend
+ // is called
+ continuation.suspend(0);
+ } else {
+ // just set the response back, as the invoking thread is
+ // not changed
+ if (log.isTraceEnabled()) {
+ log.trace("Processed the Exchange : " + camelExchange.getExchangeId());
+ }
+ setResponseBack(cxfExchange, camelExchange);
+ }
+
}
-
+ if (continuation.isResumed()) {
+ org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation
+ .getObject();
+ setResponseBack(cxfExchange, camelExchange);
+
+ }
+ return null;
+ }
+
+ private Continuation getContinuation(Exchange cxfExchange) {
+ ContinuationProvider provider =
+ (ContinuationProvider)cxfExchange.getInMessage().get(ContinuationProvider.class.getName());
+ return provider.getContinuation();
+ }
+
+ private Object syncInvoke(Exchange cxfExchange) {
+ org.apache.camel.Exchange camelExchange = perpareCamelExchange(cxfExchange);
+ // send Camel exchange to the target processor
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing +++ START +++");
+ }
+ try {
+ getProcessor().process(camelExchange);
+ } catch (Exception e) {
+ throw new Fault(e);
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing +++ END +++");
+ }
+ setResponseBack(cxfExchange, camelExchange);
+ // response should have been set in outMessage's content
+ return null;
+ }
+
+ private org.apache.camel.Exchange perpareCamelExchange(Exchange cxfExchange) {
// get CXF binding
CxfEndpoint endpoint = (CxfEndpoint)getEndpoint();
CxfBinding binding = endpoint.getCxfBinding();
@@ -94,24 +175,22 @@ public class CxfConsumer extends Default
// bind the CXF request into a Camel exchange
binding.populateExchangeFromCxfRequest(cxfExchange, camelExchange);
-
// extract the javax.xml.ws header
Map<String, Object> context = new HashMap<String, Object>();
- binding.extractJaxWsContext(cxfExchange, context);
- // send Camel exchange to the target processor
- if (LOG.isTraceEnabled()) {
- LOG.trace("Processing +++ START +++");
- }
- try {
- getProcessor().process(camelExchange);
- } catch (Exception e) {
- throw new Fault(e);
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("Processing +++ END +++");
- }
+ binding.extractJaxWsContext(cxfExchange, context);
+ // put the context into camelExchange
+ camelExchange.setProperty(CxfConstants.JAXWS_CONTEXT, context);
+ return camelExchange;
+
+ }
+
+ @SuppressWarnings("unchecked")
+ private void setResponseBack(Exchange cxfExchange, org.apache.camel.Exchange camelExchange) {
+ CxfEndpoint endpoint = (CxfEndpoint)getEndpoint();
+ CxfBinding binding = endpoint.getCxfBinding();
+
checkFailure(camelExchange);
-
+
// bind the Camel response into a CXF response
if (camelExchange.getPattern().isOutCapable()) {
binding.populateCxfResponseFromExchange(camelExchange, cxfExchange);
@@ -121,9 +200,7 @@ public class CxfConsumer extends Default
checkFailure(camelExchange);
// copy the headers javax.xml.ws header back
- binding.copyJaxWsContext(cxfExchange, context);
- // response should have been set in outMessage's content
- return null;
+ binding.copyJaxWsContext(cxfExchange, (Map<String, Object>)camelExchange.getProperty(CxfConstants.JAXWS_CONTEXT));
}
private void checkFailure(org.apache.camel.Exchange camelExchange) throws Fault {
Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?rev=991419&r1=991418&r2=991419&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java (original)
+++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java Wed Sep 1 02:32:49 2010
@@ -73,7 +73,7 @@ public class CxfProducer extends Default
// so we don't delegate the sync process call to the async process
public boolean process(Exchange camelExchange, AsyncCallback callback) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Process exchange: " + camelExchange);
+ LOG.trace("Process exchange: " + camelExchange + " in an async way.");
}
try {
@@ -109,7 +109,7 @@ public class CxfProducer extends Default
public void process(Exchange camelExchange) throws Exception {
if (LOG.isTraceEnabled()) {
- LOG.trace("Process exchange: " + camelExchange);
+ LOG.trace("Process exchange: " + camelExchange + "in sync way.");
}
// create CXF exchange