You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2010/09/01 07:43:39 UTC
svn commit: r991440 - in
/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf:
CxfEndpoint.java jaxrs/CxfRsConsumer.java jaxrs/CxfRsInvoker.java
Author: ningjiang
Date: Wed Sep 1 05:43:38 2010
New Revision: 991440
URL: http://svn.apache.org/viewvc?rev=991440&view=rev
Log:
CAMEL-3095 now CxfRSConsumer supports the continuation API
Modified:
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java?rev=991440&r1=991439&r2=991440&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java (original)
+++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java Wed Sep 1 05:43:38 2010
@@ -100,7 +100,7 @@ public class CxfEndpoint extends Default
public CxfEndpoint(String remaining, CxfComponent cxfComponent) {
super(remaining, cxfComponent);
- setAddress(remaining);
+ setAddress(remaining);
}
public CxfEndpoint(String remaining, CamelContext context) {
Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java?rev=991440&r1=991439&r2=991440&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java (original)
+++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumer.java Wed Sep 1 05:43:38 2010
@@ -32,7 +32,7 @@ public class CxfRsConsumer extends Defau
public CxfRsConsumer(CxfRsEndpoint endpoint, Processor processor) {
super(endpoint, processor);
- CxfRsInvoker cxfRsInvoker = new CxfRsInvoker(endpoint, processor);
+ CxfRsInvoker cxfRsInvoker = new CxfRsInvoker(endpoint, this);
JAXRSServerFactoryBean svrBean = endpoint.createJAXRSServerFactoryBean();
svrBean.setInvoker(cxfRsInvoker);
server = svrBean.create();
Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java?rev=991440&r1=991439&r2=991440&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java (original)
+++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java Wed Sep 1 05:43:38 2010
@@ -20,21 +20,25 @@ import java.lang.reflect.Method;
import javax.ws.rs.WebApplicationException;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.ExchangePattern;
-import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.cxf.continuations.Continuation;
+import org.apache.cxf.continuations.ContinuationProvider;
import org.apache.cxf.jaxrs.JAXRSInvoker;
import org.apache.cxf.jaxrs.model.OperationResourceInfo;
import org.apache.cxf.message.Exchange;
public class CxfRsInvoker extends JAXRSInvoker {
-
- private Processor processor;
+ private static final Log LOG = LogFactory.getLog(CxfRsInvoker.class);
+ private CxfRsConsumer cxfRsConsumer;
private CxfRsEndpoint endpoint;
- public CxfRsInvoker(CxfRsEndpoint endpoint, Processor processor) {
+ public CxfRsInvoker(CxfRsEndpoint endpoint, CxfRsConsumer consumer) {
this.endpoint = endpoint;
- this.processor = processor;
+ this.cxfRsConsumer = consumer;
}
protected Object performInvocation(Exchange cxfExchange, final Object serviceObject, Method method,
@@ -45,19 +49,92 @@ public class CxfRsInvoker extends JAXRSI
// don't delegate the sub resource locator call to camel processor
return method.invoke(serviceObject, paramArray);
}
-
+ Continuation continuation = getContinuation(cxfExchange);
+ if (continuation != null && !endpoint.isSynchronous()) {
+ return asyncInvoke(cxfExchange, serviceObject, method, paramArray, continuation);
+ } else {
+ return syncInvoke(cxfExchange, serviceObject, method, paramArray);
+ }
+ }
+
+ private Continuation getContinuation(Exchange cxfExchange) {
+ ContinuationProvider provider =
+ (ContinuationProvider)cxfExchange.getInMessage().get(ContinuationProvider.class.getName());
+ return provider.getContinuation();
+ }
+
+ private Object asyncInvoke(Exchange cxfExchange, final Object serviceObject, Method method,
+ Object[] paramArray, final Continuation continuation) throws Exception {
+ if (continuation.isNew()) {
+ ExchangePattern ep = ExchangePattern.InOut;
+ if (method.getReturnType() == Void.class) {
+ ep = ExchangePattern.InOnly;
+ }
+ final org.apache.camel.Exchange camelExchange = endpoint.createExchange(ep);
+ CxfRsBinding binding = endpoint.getBinding();
+ binding.populateExchangeFromCxfRsRequest(cxfExchange, camelExchange, method, paramArray);
+ boolean sync = cxfRsConsumer.getAsyncProcessor().process(camelExchange, new AsyncCallback() {
+ public void done(boolean doneSync) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Resuming continuation of exchangeId: " + camelExchange.getExchangeId());
+ }
+ // resume processing after both, sync and async callbacks
+ continuation.setObject(camelExchange);
+ continuation.resume();
+ }
+ });
+ // just need to avoid the continuation.resume is called
+ // before the continuation.suspend is called
+ if (continuation.getObject() != camelExchange && !sync) {
+ // Now we don't set up the timeout value
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Suspending continuation of exchangeId: " + camelExchange.getExchangeId());
+ }
+ // The continuation could be called before the suspend
+ // is called
+ continuation.suspend(0);
+
+ } else {
+ // just set the response back, as the invoking thread is
+ // not changed
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processed the Exchange : " + camelExchange.getExchangeId());
+ }
+ return returnResponse(cxfExchange, camelExchange);
+ }
+
+ }
+ if (continuation.isResumed()) {
+ org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation
+ .getObject();
+ return returnResponse(cxfExchange, camelExchange);
+
+ }
+
+ return null;
+ }
+
+ private Object syncInvoke(Exchange cxfExchange, final Object serviceObject, Method method,
+ Object[] paramArray) throws Exception {
ExchangePattern ep = ExchangePattern.InOut;
+
if (method.getReturnType() == Void.class) {
ep = ExchangePattern.InOnly;
}
org.apache.camel.Exchange camelExchange = endpoint.createExchange(ep);
CxfRsBinding binding = endpoint.getBinding();
binding.populateExchangeFromCxfRsRequest(cxfExchange, camelExchange, method, paramArray);
+
try {
- processor.process(camelExchange);
+ cxfRsConsumer.getProcessor().process(camelExchange);
} catch (Exception exception) {
camelExchange.setException(exception);
}
+ return returnResponse(cxfExchange, camelExchange);
+
+ }
+
+ private Object returnResponse(Exchange cxfExchange, org.apache.camel.Exchange camelExchange) throws Exception {
if (camelExchange.getException() != null) {
Throwable exception = camelExchange.getException();
Object result = null;
@@ -69,7 +146,7 @@ public class CxfRsInvoker extends JAXRSI
}
return result;
}
- return binding.populateCxfRsResponseFromExchange(camelExchange, cxfExchange);
+ return endpoint.getBinding().populateCxfRsResponseFromExchange(camelExchange, cxfExchange);
}
}