You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2007/11/15 17:10:34 UTC
svn commit: r595344 - in /activemq/camel/trunk: ./
components/camel-cxf/src/main/java/org/apache/camel/component/cxf/
components/camel-cxf/src/main/java/org/apache/camel/component/cxf/spring/
components/camel-cxf/src/main/java/org/apache/camel/componen...
Author: jstrachan
Date: Thu Nov 15 08:10:32 2007
New Revision: 595344
URL: http://svn.apache.org/viewvc?rev=595344&view=rev
Log:
applied patch for http://issues.apache.org/activemq/browse/CAMEL-216
Modified:
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CamelInvoker.java
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfMessage.java
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapBinding.java
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapConsumer.java
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapEndpoint.java
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapProducer.java
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/spring/CxfEndpointBean.java
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConstants.java
activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelDestination.java
activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CXFGreeterRouterTest.java
activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CXFRouterSpringTest.java
activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfRouterTest.java
activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfSoapTest.java
activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/transport/CamelConduitTest.java
activemq/camel/trunk/pom.xml
Modified: activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CamelInvoker.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CamelInvoker.java?rev=595344&r1=595343&r2=595344&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CamelInvoker.java (original)
+++ activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CamelInvoker.java Thu Nov 15 08:10:32 2007
@@ -165,12 +165,15 @@
}
//System.out.println(cxfExchange.getOut().getBody());
//TODO deal with the fault message
- Object[] result;
+ Object result;
if (cxfExchange.isFailed()) {
Exception ex= (Exception)cxfExchange.getFault().getBody();
throw new Fault(ex);
} else {
- result = (Object[])cxfExchange.getOut().getBody();
+ result = cxfExchange.getOut().getBody();
+ if(result instanceof Object[]) {
+ return (Object[])result;
+ }
}
return result;
Modified: activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?rev=595344&r1=595343&r2=595344&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java (original)
+++ activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java Thu Nov 15 08:10:32 2007
@@ -57,15 +57,13 @@
if (endpoint.isSpringContextEndpoint()) {
CxfEndpointBean endpointBean = endpoint.getCxfEndpointBean();
svrBean = CxfEndpointUtils.getServerFactoryBean(endpointBean.getServiceClass());
- endpoint.configure(svrBean);
- //Need to set the service name and endpoint name to the ClientFactoryBean's service factory
- // to walk around the issue of setting EndpointName and ServiceName
+ endpoint.configure(svrBean);
CxfEndpointBean cxfEndpointBean = endpoint.getCxfEndpointBean();
if (cxfEndpointBean.getServiceName() != null) {
- svrBean.getServiceFactory().setServiceName(cxfEndpointBean.getServiceName());
+ svrBean.setServiceName(cxfEndpointBean.getServiceName());
}
if (cxfEndpointBean.getEndpointName() != null) {
- svrBean.getServiceFactory().setEndpointName(cxfEndpointBean.getEndpointName());
+ svrBean.setEndpointName(cxfEndpointBean.getEndpointName());
}
} else { // setup the serverFactoryBean with the URI paraments
@@ -74,10 +72,10 @@
svrBean.setAddress(endpoint.getAddress());
svrBean.setServiceClass(serviceClass);
if (endpoint.getServiceName() != null) {
- svrBean.getServiceFactory().setServiceName(CxfEndpointUtils.getServiceName(endpoint));
+ svrBean.setServiceName(CxfEndpointUtils.getServiceName(endpoint));
}
if (endpoint.getPortName() != null) {
- svrBean.getServiceFactory().setEndpointName(CxfEndpointUtils.getPortName(endpoint));
+ svrBean.setEndpointName(CxfEndpointUtils.getPortName(endpoint));
}
if (endpoint.getWsdlURL() != null) {
svrBean.setWsdlURL(endpoint.getWsdlURL());
Modified: activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfMessage.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfMessage.java?rev=595344&r1=595343&r2=595344&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfMessage.java (original)
+++ activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfMessage.java Thu Nov 15 08:10:32 2007
@@ -38,8 +38,9 @@
public CxfMessage(Message cxfMessage) {
if (cxfMessage == null) {
this.cxfMessage = new MessageImpl();
+ } else {
+ this.cxfMessage = cxfMessage;
}
- this.cxfMessage = cxfMessage;
}
@Override
Modified: activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?rev=595344&r1=595343&r2=595344&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java (original)
+++ activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java Thu Nov 15 08:10:32 2007
@@ -92,14 +92,13 @@
cfb = CxfEndpointUtils.getClientFactoryBean(endpointBean.getServiceClass());
}
endpoint.configure(cfb);
- // Need to set the service name and endpoint name to the ClientFactoryBean's service factory
- // to walk around the issue of setting EndpointName and ServiceName
+
CxfEndpointBean cxfEndpointBean = endpoint.getCxfEndpointBean();
if (cxfEndpointBean.getServiceName() != null) {
- cfb.getServiceFactory().setServiceName(cxfEndpointBean.getServiceName());
+ cfb.setServiceName(cxfEndpointBean.getServiceName());
}
if (cxfEndpointBean.getEndpointName() != null) {
- cfb.getServiceFactory().setEndpointName(cxfEndpointBean.getEndpointName());
+ cfb.setEndpointName(cxfEndpointBean.getEndpointName());
}
} else { // set up the clientFactoryBean by using URI information
if (null != endpoint.getServiceClass()) {
@@ -131,10 +130,10 @@
}
}
if (endpoint.getServiceName() != null) {
- cfb.getServiceFactory().setServiceName(CxfEndpointUtils.getServiceName(endpoint));
+ cfb.setServiceName(CxfEndpointUtils.getServiceName(endpoint));
}
if (endpoint.getPortName() != null) {
- cfb.getServiceFactory().setEndpointName(CxfEndpointUtils.getPortName(endpoint));
+ cfb.setEndpointName(CxfEndpointUtils.getPortName(endpoint));
}
if (endpoint.getWsdlURL() != null) {
Modified: activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapBinding.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapBinding.java?rev=595344&r1=595343&r2=595344&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapBinding.java (original)
+++ activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapBinding.java Thu Nov 15 08:10:32 2007
@@ -32,8 +32,11 @@
import org.apache.cxf.message.MessageImpl;
public class CxfSoapBinding {
+ private CxfSoapBinding() {
+
+ }
- public org.apache.cxf.message.Message getCxfInMessage(org.apache.camel.Exchange exchange, boolean isClient) {
+ public static org.apache.cxf.message.Message getCxfInMessage(org.apache.camel.Exchange exchange, boolean isClient) {
MessageImpl answer = new MessageImpl();
org.apache.cxf.message.Exchange cxfExchange = exchange.getProperty("CxfExchange",
org.apache.cxf.message.Exchange.class);
@@ -59,7 +62,7 @@
return answer;
}
- public org.apache.cxf.message.Message getCxfOutMessage(org.apache.camel.Exchange exchange, boolean isClient) {
+ public static org.apache.cxf.message.Message getCxfOutMessage(org.apache.camel.Exchange exchange, boolean isClient) {
org.apache.cxf.message.Exchange cxfExchange = exchange.getProperty("CxfExchange", org.apache.cxf.message.Exchange.class);
assert cxfExchange != null;
org.apache.cxf.endpoint.Endpoint cxfEndpoint = cxfExchange.get(org.apache.cxf.endpoint.Endpoint.class);
Modified: activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapConsumer.java?rev=595344&r1=595343&r2=595344&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapConsumer.java (original)
+++ activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapConsumer.java Thu Nov 15 08:10:32 2007
@@ -115,9 +115,8 @@
}
protected void processSoapConsumerIn(Exchange exchange) throws Exception {
- LOG.info("processSoapConsumerIn: " + exchange);
- CxfSoapBinding binding = endpoint.getCxfSoapBinding();
- org.apache.cxf.message.Message inMessage = binding.getCxfInMessage(exchange, false);
+ LOG.info("processSoapConsumerIn: " + exchange);
+ org.apache.cxf.message.Message inMessage = CxfSoapBinding.getCxfInMessage(exchange, false);
org.apache.cxf.message.Exchange cxfExchange = inMessage.getExchange();
cxfExchange.put(org.apache.cxf.endpoint.Endpoint.class, server.getEndpoint());
cxfExchange.put(Bus.class, getBus());
@@ -132,10 +131,10 @@
protected void processSoapConsumerOut(Exchange exchange) throws Exception {
LOG.info("processSoapConsumerOut: " + exchange);
- CxfSoapBinding binding = endpoint.getCxfSoapBinding();
+
// TODO check if the message is oneway message
// Get the method name form the soap endpoint
- org.apache.cxf.message.Message outMessage = binding.getCxfOutMessage(exchange, false);
+ org.apache.cxf.message.Message outMessage = CxfSoapBinding.getCxfOutMessage(exchange, false);
org.apache.cxf.message.Exchange cxfExchange = outMessage.getExchange();
InterceptorChain chain = OutgoingChainInterceptor.getOutInterceptorChain(cxfExchange);
outMessage.setInterceptorChain(chain);
Modified: activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapEndpoint.java?rev=595344&r1=595343&r2=595344&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapEndpoint.java Thu Nov 15 08:10:32 2007
@@ -42,7 +42,7 @@
private QName serviceName;
private QName endpointName;
private Bus bus;
- private CxfSoapBinding cxfSoapBinding;
+
public CxfSoapEndpoint(Endpoint endpoint) {
this.endpoint = endpoint;
@@ -115,18 +115,7 @@
public QName getEndpointName() {
return endpointName;
}
-
- public CxfSoapBinding getCxfSoapBinding() {
- if (cxfSoapBinding == null) {
- cxfSoapBinding = new CxfSoapBinding();
- }
- return cxfSoapBinding;
- }
-
- public void setCxfSoapBinding(CxfSoapBinding bing) {
- cxfSoapBinding = bing;
- }
-
+
public void init() throws Exception {
Assert.notNull(wsdl, "soap.wsdl parameter must be set on the uri");
if (serviceName == null) {
Modified: activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapProducer.java?rev=595344&r1=595343&r2=595344&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapProducer.java (original)
+++ activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapProducer.java Thu Nov 15 08:10:32 2007
@@ -124,8 +124,8 @@
protected void processSoapProviderOut(Exchange exchange) throws Exception {
LOG.info("processSoapProviderOut: " + exchange);
- CxfSoapBinding binding = endpoint.getCxfSoapBinding();
- org.apache.cxf.message.Message inMessage = binding.getCxfInMessage(exchange, true);
+
+ org.apache.cxf.message.Message inMessage = CxfSoapBinding.getCxfInMessage(exchange, true);
client.setInInterceptors(client.getEndpoint().getService().getInInterceptors());
client.onMessage(inMessage);
@@ -145,9 +145,8 @@
cxfExchange.put(org.apache.cxf.endpoint.Endpoint.class, cxfEndpoint);
cxfExchange.put(Bus.class, getBus());
cxfExchange.setConduit(new NullConduit());
- exchange.setProperty("CxfExchange", cxfExchange);
- CxfSoapBinding binding = endpoint.getCxfSoapBinding();
- org.apache.cxf.message.Message outMessage = binding.getCxfOutMessage(exchange, true);
+ exchange.setProperty("CxfExchange", cxfExchange);
+ org.apache.cxf.message.Message outMessage = CxfSoapBinding.getCxfOutMessage(exchange, true);
outMessage.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
outMessage.put(Message.INBOUND_MESSAGE, Boolean.FALSE);
InterceptorChain chain = OutgoingChainInterceptor.getOutInterceptorChain(cxfExchange);
Modified: activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/spring/CxfEndpointBean.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/spring/CxfEndpointBean.java?rev=595344&r1=595343&r2=595344&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/spring/CxfEndpointBean.java (original)
+++ activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/spring/CxfEndpointBean.java Thu Nov 15 08:10:32 2007
@@ -16,13 +16,14 @@
*/
package org.apache.camel.component.cxf.spring;
-import org.apache.cxf.frontend.AbstractEndpointFactory;
+
+import org.apache.cxf.frontend.AbstractWSDLBasedEndpointFactory;
import org.apache.cxf.service.factory.ReflectionServiceFactoryBean;
/**
*
*/
-public class CxfEndpointBean extends AbstractEndpointFactory {
+public class CxfEndpointBean extends AbstractWSDLBasedEndpointFactory{
public CxfEndpointBean() {
setServiceFactory(new ReflectionServiceFactoryBean());
}
Modified: activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java?rev=595344&r1=595343&r2=595344&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java (original)
+++ activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java Thu Nov 15 08:10:32 2007
@@ -25,7 +25,11 @@
import java.util.logging.Logger;
import org.apache.camel.CamelContext;
+import org.apache.camel.CamelTemplate;
+import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.component.cxf.CxfSoapBinding;
+import org.apache.camel.impl.DefaultCamelContext;
import org.apache.cxf.Bus;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.configuration.Configurable;
@@ -45,42 +49,44 @@
* @version $Revision$
*/
public class CamelConduit extends AbstractConduit implements Configurable {
- protected static final String BASE_BEAN_NAME_SUFFIX = ".camel-conduit-base";
+ protected static final String BASE_BEAN_NAME_SUFFIX = ".camel-conduit";
private static final Logger LOG = LogUtils.getL7dLogger(CamelConduit.class);
- private final CamelTransportBase base;
+ private CamelContext camelContext;
+ private EndpointInfo endpointInfo;
private String targetCamelEndpointUri;
-
- /*
- * protected ClientConfig clientConfig; protected ClientBehaviorPolicyType
- * runtimePolicy; protected AddressType address; protected SessionPoolType
- * sessionPool;
- */
-
- public CamelConduit(CamelContext camelContext, Bus bus, EndpointInfo endpointInfo, EndpointReferenceType targetReference) {
+ private CamelTemplate<Exchange> camelTemplate;
+ private Bus bus;
+
+
+ public CamelConduit(CamelContext context, Bus b, EndpointInfo epInfo, EndpointReferenceType targetReference) {
super(targetReference);
AttributedURIType address = targetReference.getAddress();
if (address != null) {
this.targetCamelEndpointUri = address.getValue();
}
-
- base = new CamelTransportBase(camelContext, bus, endpointInfo, false, BASE_BEAN_NAME_SUFFIX);
-
+ endpointInfo = epInfo;
+ camelContext = context;
+ bus = b;
initConfig();
}
+
+ public void setCamelContext(CamelContext context) {
+ camelContext = context;
+ }
+
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
// prepare the message for send out , not actually send out the message
public void prepare(Message message) throws IOException {
getLogger().log(Level.FINE, "CamelConduit send message");
-
message.setContent(OutputStream.class, new CamelOutputStream(message));
}
public void close() {
getLogger().log(Level.FINE, "CamelConduit closed ");
-
- // ensure resources held by session factory are released
- //
- base.close();
+
}
protected Logger getLogger() {
@@ -88,32 +94,35 @@
}
public String getBeanName() {
- EndpointInfo info = base.endpointInfo;
- if (info == null) {
+
+ if (endpointInfo == null) {
return "default.camel-conduit";
}
- return info.getName() + ".camel-conduit";
+ return endpointInfo.getName() + ".camel-conduit";
}
private void initConfig() {
-
- /*
- * this.address = base.endpointInfo.getTraversedExtensor(new
- * AddressType(), AddressType.class); this.sessionPool =
- * base.endpointInfo.getTraversedExtensor(new SessionPoolType(),
- * SessionPoolType.class); this.clientConfig =
- * base.endpointInfo.getTraversedExtensor(new ClientConfig(),
- * ClientConfig.class); this.runtimePolicy =
- * base.endpointInfo.getTraversedExtensor(new
- * ClientBehaviorPolicyType(), ClientBehaviorPolicyType.class);
- */
-
- Configurer configurer = base.bus.getExtension(Configurer.class);
+ // we could configure the camel context here
+ Configurer configurer = bus.getExtension(Configurer.class);
if (null != configurer) {
configurer.configureBean(this);
}
}
-
+
+ public CamelTemplate getCamelTemplate() {
+ if (camelTemplate == null) {
+ if (camelContext != null) {
+ camelTemplate = new CamelTemplate<Exchange>(camelContext);
+ } else {
+ camelTemplate = new CamelTemplate<Exchange>(new DefaultCamelContext());
+ }
+ }
+ return camelTemplate;
+ }
+
+ public void setCamelTemplate(CamelTemplate template) {
+
+ }
private class CamelOutputStream extends CachedOutputStream {
private Message outMessage;
private boolean isOneWay;
@@ -129,108 +138,36 @@
protected void doClose() throws IOException {
isOneWay = outMessage.getExchange().isOneWay();
commitOutputMessage();
- if (!isOneWay) {
- handleResponse();
- }
}
protected void onWrite() throws IOException {
-
+ // do nothing here
}
+
private void commitOutputMessage() {
- base.template.send(targetCamelEndpointUri, new Processor() {
- public void process(org.apache.camel.Exchange reply) {
- Object request = null;
- if (isTextPayload()) {
- request = currentStream.toString();
- } else {
- request = ((ByteArrayOutputStream)currentStream).toByteArray();
- }
-
- getLogger().log(Level.FINE, "Conduit Request is :[" + request + "]");
- String replyTo = base.getReplyDestination();
- // TODO setting up the responseExpected
- base.marshal(request, replyTo, reply);
- base.setMessageProperties(outMessage, reply);
-
- String correlationID = null;
- if (!isOneWay) {
- // TODO create a correlationID
- String id = null;
-
- if (id != null) {
- if (correlationID != null) {
- String error = "User cannot set CamelCorrelationID when " + "making a request/reply invocation using " + "a static replyTo Queue.";
- }
- correlationID = id;
- }
- }
-
- if (correlationID != null) {
- reply.getIn().setHeader(CamelConstants.CAMEL_CORRELATION_ID, correlationID);
- } else {
- // No message correlation id is set. Whatever comeback
- // will be accepted as responses.
- // We assume that it will only happen in case of the
- // temp. reply queue.
- }
-
- getLogger().log(Level.FINE, "template sending request: ", reply.getIn());
+ // we could wait for the rely asynconized
+ org.apache.camel.Exchange exchange = getCamelTemplate().send(targetCamelEndpointUri, new Processor() {
+ public void process(org.apache.camel.Exchange ex) throws IOException {
+ CachedOutputStream outputStream = (CachedOutputStream)outMessage.getContent(OutputStream.class);
+ // send out the request message here
+ ex.getIn().setHeaders(outMessage);
+ ex.getIn().setBody(outputStream.getInputStream());
+ // setup the out message
+ getLogger().log(Level.FINE, "template sending request: ", ex.getIn());
}
});
+ if (!isOneWay) {
+ handleResponse(exchange);
+ }
+
}
- private void handleResponse() throws IOException {
- // REVISIT distinguish decoupled case or oneway call
- Object response = null;
-
- // TODO if outMessage need to get the response
- Message inMessage = new MessageImpl();
- outMessage.getExchange().setInMessage(inMessage);
- // set the message header back to the incomeMessage
- // inMessage.put(CamelConstants.CAMEL_CLIENT_RESPONSE_HEADERS,
- // outMessage.get(CamelConstants.CAMEL_CLIENT_RESPONSE_HEADERS));
-
- /*
- * Object result1; Object result = null; javax.camel.Message
- * camelMessage1 = pooledSession.consumer().receive(timeout);
- * getLogger().log(Level.FINE, "template received reply: " ,
- * camelMessage1); if (camelMessage1 != null) {
- * base.populateIncomingContext(camelMessage1, outMessage,
- * CamelConstants.CAMEL_CLIENT_RESPONSE_HEADERS); String messageType =
- * camelMessage1 instanceof TextMessage ?
- * CamelConstants.TEXT_MESSAGE_TYPE :
- * CamelConstants.BINARY_MESSAGE_TYPE; result =
- * base.unmarshal((org.apache.camel.Exchange) outMessage); result1 =
- * result; } else { String error = "CamelClientTransport.receive()
- * timed out. No message available."; getLogger().log(Level.SEVERE,
- * error); //TODO: Review what exception should we throw. throw new
- * CamelException(error); } response = result1; //set the message
- * header back to the incomeMessage
- * inMessage.put(CamelConstants.CAMEL_CLIENT_RESPONSE_HEADERS,
- * outMessage.get(CamelConstants.CAMEL_CLIENT_RESPONSE_HEADERS));
- */
-
- getLogger().log(Level.FINE, "The Response Message is : [" + response + "]");
-
- // setup the inMessage response stream
- byte[] bytes = null;
- if (response instanceof String) {
- String requestString = (String)response;
- bytes = requestString.getBytes();
- } else {
- bytes = (byte[])response;
- }
- inMessage.setContent(InputStream.class, new ByteArrayInputStream(bytes));
+ private void handleResponse(org.apache.camel.Exchange exchange) {
+ org.apache.cxf.message.Message inMessage = CxfSoapBinding.getCxfInMessage(exchange, true);
getLogger().log(Level.FINE, "incoming observer is " + incomingObserver);
incomingObserver.onMessage(inMessage);
}
- }
-
- private boolean isTextPayload() {
- // TODO use runtime policy
- return true;
}
/**
Modified: activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConstants.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConstants.java?rev=595344&r1=595343&r2=595344&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConstants.java (original)
+++ activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConstants.java Thu Nov 15 08:10:32 2007
@@ -22,6 +22,7 @@
public class CamelConstants {
public static final String TEXT_MESSAGE_TYPE = "text";
public static final String BINARY_MESSAGE_TYPE = "binary";
+ public static final String CAMEL_TARGET_ENDPOINT_URI = "org.apache.cxf.camel.target.endpoint.uri";
public static final String CAMEL_SERVER_REQUEST_HEADERS = "org.apache.cxf.camel.server.request.headers";
public static final String CAMEL_SERVER_RESPONSE_HEADERS = "org.apache.cxf.camel.server.response.headers";
public static final String CAMEL_REQUEST_MESSAGE = "org.apache.cxf.camel.request.message";
Modified: activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelDestination.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelDestination.java?rev=595344&r1=595343&r2=595344&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelDestination.java (original)
+++ activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelDestination.java Thu Nov 15 08:10:32 2007
@@ -24,10 +24,14 @@
import java.util.logging.Logger;
import org.apache.camel.CamelContext;
+import org.apache.camel.CamelTemplate;
+import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
+import org.apache.camel.component.cxf.CxfSoapBinding;
+import org.apache.camel.impl.DefaultCamelContext;
import org.apache.cxf.Bus;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.configuration.Configurable;
@@ -40,6 +44,7 @@
import org.apache.cxf.transport.Conduit;
import org.apache.cxf.transport.ConduitInitiator;
import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.ws.addressing.AttributedURIType;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.wsdl.EndpointReferenceUtils;
@@ -47,22 +52,20 @@
* @version $Revision$
*/
public class CamelDestination extends AbstractDestination implements Configurable {
- protected static final String BASE_BEAN_NAME_SUFFIX = ".camel-destination-base";
+ protected static final String BASE_BEAN_NAME_SUFFIX = ".camel-destination";
private static final Logger LOG = LogUtils.getL7dLogger(CamelDestination.class);
+ private CamelTemplate<Exchange> camelTemplate;
CamelContext camelContext;
+ Consumer consumer;
String camelUri;
final ConduitInitiator conduitInitiator;
- private CamelTransportBase base;
- private Endpoint endpoint;
+
+ private org.apache.camel.Endpoint distinationEndpoint;
public CamelDestination(CamelContext camelContext, Bus bus, ConduitInitiator ci, EndpointInfo info) throws IOException {
super(getTargetReference(info, bus), info);
this.camelContext = camelContext;
-
- base = new CamelTransportBase(camelContext, bus, endpointInfo, true, BASE_BEAN_NAME_SUFFIX);
-
conduitInitiator = ci;
-
initConfig();
}
@@ -83,35 +86,48 @@
try {
getLogger().log(Level.FINE, "establishing Camel connection");
- endpoint = camelContext.getEndpoint(camelUri);
+ distinationEndpoint = camelContext.getEndpoint(camelUri);
+ consumer = distinationEndpoint.createConsumer(new ConsumerProcessor());
+ consumer.start();
+
} catch (Exception ex) {
getLogger().log(Level.SEVERE, "Camel connect failed with EException : ", ex);
}
}
- public void deactivate() {
- base.close();
+ public void deactivate() {
+ try {
+ consumer.stop();
+ } catch (Exception e) {
+ // TODO need to handle the exception somewhere
+ e.printStackTrace();
+ }
}
public void shutdown() {
getLogger().log(Level.FINE, "CamelDestination shutdown()");
this.deactivate();
}
+
+ public CamelTemplate getCamelTemplate() {
+ if (camelTemplate == null) {
+ if (camelContext != null) {
+ camelTemplate = new CamelTemplate<Exchange>(camelContext);
+ } else {
+ camelTemplate = new CamelTemplate<Exchange>(new DefaultCamelContext());
+ }
+ }
+ return camelTemplate;
+ }
- protected void incoming(Exchange exchange) {
+ protected void incoming(org.apache.camel.Exchange exchange) {
getLogger().log(Level.FINE, "server received request: ", exchange);
-
- byte[] bytes = base.unmarshal(exchange);
-
- // get the message to be interceptor
- MessageImpl inMessage = new MessageImpl();
- inMessage.setContent(InputStream.class, new ByteArrayInputStream(bytes));
- base.populateIncomingContext(exchange, inMessage, CamelConstants.CAMEL_SERVER_REQUEST_HEADERS);
- // inMessage.put(CamelConstants.CAMEL_SERVER_RESPONSE_HEADERS, new
- // CamelMessageHeadersType());
+ org.apache.cxf.message.Message inMessage =
+ CxfSoapBinding.getCxfInMessage(exchange, false);
+
inMessage.put(CamelConstants.CAMEL_REQUEST_MESSAGE, exchange);
- inMessage.setDestination(this);
+ ((MessageImpl)inMessage).setDestination(this);
// handle the incoming message
incomingObserver.onMessage(inMessage);
@@ -122,16 +138,7 @@
}
private void initConfig() {
- /*
- * this.runtimePolicy = endpointInfo.getTraversedExtensor(new
- * ServerBehaviorPolicyType(), ServerBehaviorPolicyType.class);
- * this.serverConfig = endpointInfo.getTraversedExtensor(new
- * ServerConfig(), ServerConfig.class); this.address =
- * endpointInfo.getTraversedExtensor(new AddressType(),
- * AddressType.class); this.sessionPool =
- * endpointInfo.getTraversedExtensor(new SessionPoolType(),
- * SessionPoolType.class);
- */
+ // setup the endpoint infor here
}
protected class ConsumerProcessor implements Processor {
@@ -147,9 +154,13 @@
// this should deal with the cxf message
protected class BackChannelConduit extends AbstractConduit {
protected Message inMessage;
-
+ String targetCamelEndpointUri;
BackChannelConduit(EndpointReferenceType ref, Message message) {
super(ref);
+ AttributedURIType address = ref.getAddress();
+ if (address != null) {
+ targetCamelEndpointUri = address.getValue();
+ }
inMessage = message;
}
@@ -171,6 +182,7 @@
public void prepare(Message message) throws IOException {
// setup the message to be send back
message.put(CamelConstants.CAMEL_REQUEST_MESSAGE, inMessage.get(CamelConstants.CAMEL_REQUEST_MESSAGE));
+ message.put(CamelConstants.CAMEL_TARGET_ENDPOINT_URI, targetCamelEndpointUri);
message.setContent(OutputStream.class, new CamelOutputStream(inMessage));
}
@@ -181,33 +193,27 @@
}
private class CamelOutputStream extends CachedOutputStream {
- private Message inMessage;
- private Producer<Exchange> replyTo;
- private Producer<Exchange> sender;
-
- // setup the ByteArrayStream
+ private Message outMessage;
+
public CamelOutputStream(Message m) {
super();
- inMessage = m;
+ outMessage = m;
}
// prepair the message and get the send out message
private void commitOutputMessage() throws IOException {
-
- // setup the reply message
- final String replyToUri = getReplyToDestination(inMessage);
-
- base.template.send(replyToUri, new Processor() {
- public void process(Exchange reply) {
- base.marshal(currentStream.toString(), replyToUri, reply);
-
- setReplyCorrelationID(inMessage, reply);
-
- base.setMessageProperties(inMessage, reply);
-
- getLogger().log(Level.FINE, "just server sending reply: ", reply);
+ String targetCamelEndpointUri = (String) outMessage.get(CamelConstants.CAMEL_TARGET_ENDPOINT_URI);
+ getCamelTemplate().send(targetCamelEndpointUri, new Processor() {
+ public void process(org.apache.camel.Exchange ex) throws IOException {
+ // put the output stream into the message
+ CachedOutputStream outputStream = (CachedOutputStream)outMessage.getContent(OutputStream.class);
+ // send out the request message here
+ ex.getIn().setHeaders(outMessage);
+ ex.getIn().setBody(outputStream.getInputStream());
+ // setup the out message
+ getLogger().log(Level.FINE, "template sending request: ", ex.getIn());
}
- });
+ });
}
@Override
@@ -225,19 +231,5 @@
// Do nothing here
}
}
-
- protected String getReplyToDestination(Message inMessage) {
- if (inMessage.get(CamelConstants.CAMEL_REBASED_REPLY_TO) != null) {
- return (String)inMessage.get(CamelConstants.CAMEL_REBASED_REPLY_TO);
- } else {
- return base.getReplyDestination();
- }
- }
-
- protected void setReplyCorrelationID(Message inMessage, Exchange reply) {
- Object value = inMessage.get(CamelConstants.CAMEL_CORRELATION_ID);
- if (value != null) {
- reply.getIn().setHeader(CamelConstants.CAMEL_CORRELATION_ID, value);
- }
- }
+
}
Modified: activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CXFGreeterRouterTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CXFGreeterRouterTest.java?rev=595344&r1=595343&r2=595344&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CXFGreeterRouterTest.java (original)
+++ activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CXFGreeterRouterTest.java Thu Nov 15 08:10:32 2007
@@ -54,6 +54,8 @@
reply = greeter.sayHi();
assertNotNull("No response received from service", reply);
assertEquals("Got the wrong reply ", "Bonjour", reply);
+
+ greeter.greetMeOneWay("call greetMe OneWay !");
} catch (UndeclaredThrowableException ex) {
throw (Exception)ex.getCause();
}
Modified: activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CXFRouterSpringTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CXFRouterSpringTest.java?rev=595344&r1=595343&r2=595344&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CXFRouterSpringTest.java (original)
+++ activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CXFRouterSpringTest.java Thu Nov 15 08:10:32 2007
@@ -33,17 +33,18 @@
@Override
protected void setUp() throws Exception {
applicationContext = createApplicationContext();
+ super.setUp();
assertNotNull("Should have created a valid spring context", applicationContext);
- super.setUp();
+
}
@Override
- protected void tearDown() throws Exception {
- super.tearDown();
+ protected void tearDown() throws Exception {
if (applicationContext != null) {
applicationContext.destroy();
}
+ super.tearDown();
}
protected RouteBuilder createRouteBuilder() {
Modified: activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfRouterTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfRouterTest.java?rev=595344&r1=595343&r2=595344&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfRouterTest.java (original)
+++ activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfRouterTest.java Thu Nov 15 08:10:32 2007
@@ -46,12 +46,13 @@
private String serviceEndpointURI = "cxf://" + SERVICE_ADDRESS + "?" + SERVICE_CLASS + "&dataFormat=POJO";
private ServerImpl server;
+ private Bus bus;
@Override
protected void setUp() throws Exception {
- super.setUp();
-
+ super.setUp();
+ bus = BusFactory.getDefaultBus();
startService();
}
@@ -62,7 +63,7 @@
svrBean.setAddress(SERVICE_ADDRESS);
svrBean.setServiceClass(HelloService.class);
svrBean.setServiceBean(new HelloServiceImpl());
- svrBean.setBus(CXFBusFactory.getDefaultBus());
+ svrBean.setBus(bus);
server = (ServerImpl)svrBean.create();
server.start();
@@ -70,9 +71,8 @@
@Override
protected void tearDown() throws Exception {
- if (server != null) {
- server.stop();
- }
+ //bus.shutdown(true);
+ BusFactory.setDefaultBus(null);
}
protected RouteBuilder createRouteBuilder() {
@@ -89,8 +89,7 @@
public void testInvokingServiceFromCXFClient() throws Exception {
- Bus bus = BusFactory.getDefaultBus();
-
+
ClientProxyFactoryBean proxyFactory = new ClientProxyFactoryBean();
ClientFactoryBean clientBean = proxyFactory.getClientFactoryBean();
clientBean.setAddress(ROUTER_ADDRESS);
Modified: activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfSoapTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfSoapTest.java?rev=595344&r1=595343&r2=595344&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfSoapTest.java (original)
+++ activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfSoapTest.java Thu Nov 15 08:10:32 2007
@@ -54,7 +54,7 @@
e.getOut().setBody(result);
}
});
- from("direct:provider").to("soap:mock:provider?soap.wsdl=classpath:hello/HelloWorld-DOC.wsdl");
+ from("direct:producer").to("soap:mock:producer?soap.wsdl=classpath:hello/HelloWorld-DOC.wsdl");
}
};
}
@@ -77,14 +77,14 @@
}
- public void testSoapProvider() throws Exception {
+ public void testSoapProducer() throws Exception {
// set out the source message
URL request = this.getClass().getResource("RequestBody.xml");
File requestFile = new File(request.toURI());
FileInputStream inputStream = new FileInputStream(requestFile);
XMLStreamReader xmlReader = StaxUtils.createXMLStreamReader(inputStream);
DOMSource source = new DOMSource(StaxUtils.read(xmlReader));
- Object result = template.sendBody("direct:provider", source);
+ Object result = template.sendBody("direct:producer", source);
assertFalse("The result should not be changed", source.equals(result));
assertTrue("The result should be the instance of DOMSource", result instanceof DOMSource);
assertEquals("The DOMSource should be equal", XMLUtils.toString(source), XMLUtils.toString((Source)result));
Modified: activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/transport/CamelConduitTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/transport/CamelConduitTest.java?rev=595344&r1=595343&r2=595344&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/transport/CamelConduitTest.java (original)
+++ activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/transport/CamelConduitTest.java Thu Nov 15 08:10:32 2007
@@ -35,14 +35,7 @@
BusFactory.setDefaultBus(bus);
setupServiceInfo("http://cxf.apache.org/camel_conf_test", "/wsdl/camel_test_no_addr.wsdl", "HelloWorldQueueBinMsgService", "HelloWorldQueueBinMsgPort");
CamelConduit conduit = setupCamelConduit(false, false);
- /*
- * assertEquals("Can't get the right ClientReceiveTimeout", 500L,
- * conduit.getClientConfig().getClientReceiveTimeout());
- * assertEquals("Can't get the right SessionPoolConfig's LowWaterMark",
- * 10, conduit.getSessionPool().getLowWaterMark()); assertEquals("Can't
- * get the right AddressPolicy's ConnectionPassword", "testPassword",
- * conduit.getCamelAddress().getConnectionPassword());
- */
+
bus.shutdown(false);
BusFactory.setDefaultBus(null);
}
Modified: activemq/camel/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/pom.xml?rev=595344&r1=595343&r2=595344&view=diff
==============================================================================
--- activemq/camel/trunk/pom.xml (original)
+++ activemq/camel/trunk/pom.xml Thu Nov 15 08:10:32 2007
@@ -36,7 +36,7 @@
<properties>
<camel-version>1.3-SNAPSHOT</camel-version>
<compiler.fork>false</compiler.fork>
- <cxf-version>2.0.2-incubator</cxf-version>
+ <cxf-version>2.0.3-incubator</cxf-version>
<spring-version>2.0.6</spring-version>
<jetty-version>6.1.5</jetty-version>
<m1-repo-url>scpexe://minotaur.apache.org/www/people.apache.org/repo/m1-snapshot-repository</m1-repo-url>