You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/03/30 12:17:12 UTC
svn commit: r524027 - in /activemq/camel/trunk/camel-cxf/src:
main/java/org/apache/camel/component/cxf/
test/java/org/apache/camel/component/cxf/
Author: jstrachan
Date: Fri Mar 30 03:17:10 2007
New Revision: 524027
URL: http://svn.apache.org/viewvc?view=rev&rev=524027
Log:
use the TransportFactory from the Bus itself to avoid possibly using the wrong one; also got a working test case now that does assertions on the returned headers and body
Modified:
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfBinding.java
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfComponent.java
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfExchange.java
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfMessage.java
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
activemq/camel/trunk/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTest.java
Modified: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfBinding.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfBinding.java?view=diff&rev=524027&r1=524026&r2=524027
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfBinding.java (original)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfBinding.java Fri Mar 30 03:17:10 2007
@@ -21,7 +21,6 @@
import org.apache.cxf.message.MessageImpl;
import java.io.InputStream;
-import java.util.Map;
import java.util.Set;
/**
@@ -47,7 +46,7 @@
}
public MessageImpl createCxfMessage(CxfExchange exchange) {
- MessageImpl answer = new MessageImpl();
+ MessageImpl answer = (MessageImpl) exchange.getInMessage();
// TODO is InputStream the best type to give to CXF?
CxfMessage in = exchange.getIn();
@@ -57,22 +56,21 @@
}
answer.setContent(InputStream.class, body);
+ // no need to process headers as we reuse the CXF message
+ /*
// set the headers
Set<Map.Entry<String, Object>> entries = in.getHeaders().entrySet();
for (Map.Entry<String, Object> entry : entries) {
answer.put(entry.getKey(), entry.getValue());
}
+ */
return answer;
}
public void storeCxfResponse(CxfExchange exchange, Message response) {
+ // no need to process headers as we use the CXF message
CxfMessage out = exchange.getOut();
+ out.setMessage(response);
out.setBody(getBody(response));
-
- // set the headers
- Set<Map.Entry<String, Object>> entries = response.entrySet();
- for (Map.Entry<String, Object> entry : entries) {
- out.setHeader(entry.getKey(), entry.getValue());
- }
}
}
Modified: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfComponent.java?view=diff&rev=524027&r1=524026&r2=524027
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfComponent.java (original)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfComponent.java Fri Mar 30 03:17:10 2007
@@ -19,7 +19,11 @@
import org.apache.camel.CamelContext;
import org.apache.camel.impl.DefaultComponent;
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusException;
+import org.apache.cxf.bus.CXFBusFactory;
import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.DestinationFactoryManager;
import org.apache.cxf.transport.local.LocalTransportFactory;
import org.xmlsoap.schemas.wsdl.http.AddressType;
@@ -31,7 +35,7 @@
* @version $Revision$
*/
public class CxfComponent extends DefaultComponent<CxfExchange> {
- private LocalTransportFactory localTransportFactory = new LocalTransportFactory();
+ private LocalTransportFactory localTransportFactory;
public CxfComponent() {
}
@@ -53,11 +57,23 @@
return new CxfEndpoint(uri, this, endpointInfo);
}
- public LocalTransportFactory getLocalTransportFactory() {
+ public LocalTransportFactory getLocalTransportFactory() throws BusException {
+ if (localTransportFactory == null) {
+ localTransportFactory = findLocalTransportFactory();
+ if (localTransportFactory == null) {
+ localTransportFactory = new LocalTransportFactory();
+ }
+ }
return localTransportFactory;
}
public void setLocalTransportFactory(LocalTransportFactory localTransportFactory) {
this.localTransportFactory = localTransportFactory;
+ }
+
+ protected LocalTransportFactory findLocalTransportFactory() throws BusException {
+ Bus bus = CXFBusFactory.getDefaultBus();
+ DestinationFactoryManager dfm = bus.getExtension(DestinationFactoryManager.class);
+ return (LocalTransportFactory) dfm.getDestinationFactory(LocalTransportFactory.TRANSPORT_ID);
}
}
Modified: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=diff&rev=524027&r1=524026&r2=524027
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java (original)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java Fri Mar 30 03:17:10 2007
@@ -17,32 +17,34 @@
*/
package org.apache.camel.component.cxf;
-import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.Processor;
-import org.apache.cxf.transport.local.LocalDestination;
-import org.apache.cxf.transport.MessageObserver;
+import org.apache.camel.impl.DefaultConsumer;
import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.Destination;
+import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.transport.local.LocalTransportFactory;
/**
* A consumer of exchanges for a service in CXF
*
* @version $Revision$
-*/
+ */
public class CxfConsumer extends DefaultConsumer<CxfExchange> {
private CxfEndpoint endpoint;
- private LocalDestination destination;
+ private final LocalTransportFactory transportFactory;
+ private Destination destination;
- public CxfConsumer(CxfEndpoint endpoint, Processor<CxfExchange> processor) {
+ public CxfConsumer(CxfEndpoint endpoint, Processor<CxfExchange> processor, LocalTransportFactory transportFactory) {
super(endpoint, processor);
this.endpoint = endpoint;
+ this.transportFactory = transportFactory;
}
-
@Override
protected void doStart() throws Exception {
super.doStart();
- destination = (LocalDestination) endpoint.getLocalTransportFactory().getDestination(endpoint.getEndpointInfo());
+ destination = transportFactory.getDestination(endpoint.getEndpointInfo());
destination.setMessageObserver(new MessageObserver() {
public void onMessage(Message message) {
incomingCxfMessage(message);
Modified: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java?view=diff&rev=524027&r1=524026&r2=524027
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java (original)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java Fri Mar 30 03:17:10 2007
@@ -21,9 +21,10 @@
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
-import org.apache.cxf.transport.local.LocalTransportFactory;
-import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.BusException;
import org.apache.cxf.message.Message;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.local.LocalTransportFactory;
/**
* The endpoint in the service engine
@@ -43,11 +44,11 @@
}
public Producer<CxfExchange> createProducer() throws Exception {
- return startService(new CxfProducer(this));
+ return startService(new CxfProducer(this, getLocalTransportFactory()));
}
public Consumer<CxfExchange> createConsumer(Processor<CxfExchange> processor) throws Exception {
- return startService(new CxfConsumer(this, processor));
+ return startService(new CxfConsumer(this, processor, getLocalTransportFactory()));
}
public CxfExchange createExchange() {
@@ -77,7 +78,7 @@
this.inOut = inOut;
}
- public LocalTransportFactory getLocalTransportFactory() {
+ public LocalTransportFactory getLocalTransportFactory() throws BusException {
return component.getLocalTransportFactory();
}
Modified: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfExchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfExchange.java?view=diff&rev=524027&r1=524026&r2=524027
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfExchange.java (original)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfExchange.java Fri Mar 30 03:17:10 2007
@@ -95,20 +95,20 @@
return exchange;
}
- public Message getOutFaultMessage() {
- return getExchange().getOutFaultMessage();
+ public Message getInMessage() {
+ return getIn().getMessage();
}
- public Message getInMessage() {
- return getExchange().getInMessage();
+ public Message getOutMessage() {
+ return getOut().getMessage();
}
- public Message getInFaultMessage() {
- return getExchange().getInFaultMessage();
+ public Message getOutFaultMessage() {
+ return getExchange().getOutFaultMessage();
}
- public Message getOutMessage() {
- return getExchange().getOutMessage();
+ public Message getInFaultMessage() {
+ return getExchange().getInFaultMessage();
}
public Destination getDestination() {
Modified: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfMessage.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfMessage.java?view=diff&rev=524027&r1=524026&r2=524027
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfMessage.java (original)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfMessage.java Fri Mar 30 03:17:10 2007
@@ -19,9 +19,9 @@
import org.apache.camel.impl.DefaultMessage;
import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
import java.util.Map;
-import java.util.Set;
/**
* An Apache CXF {@link Message} which provides access to the underlying CXF features
@@ -32,6 +32,7 @@
private Message cxfMessage;
public CxfMessage() {
+ this(new MessageImpl());
}
public CxfMessage(Message cxfMessage) {
@@ -67,38 +68,26 @@
}
public Object getHeader(String name) {
- Object answer = null;
- if (cxfMessage != null) {
- answer = cxfMessage.get(name);
- }
- if (answer == null) {
- answer = super.getHeader(name);
- }
- return answer;
+ return cxfMessage.get(name);
}
@Override
- public CxfMessage newInstance() {
- return new CxfMessage();
+ public void setHeader(String name, Object value) {
+ cxfMessage.put(name, value);
}
@Override
- protected Object createBody() {
- if (cxfMessage != null) {
- return getExchange().getBinding().extractBodyFromCxf(getExchange(), cxfMessage);
- }
- return null;
+ public Map<String, Object> getHeaders() {
+ return cxfMessage;
}
@Override
- protected void populateInitialHeaders(Map<String, Object> map) {
- if (cxfMessage != null) {
- Set<Map.Entry<String, Object>> entries = cxfMessage.entrySet();
- for (Map.Entry<String, Object> entry : entries) {
- String name = entry.getKey();
- Object value = entry.getValue();
- map.put(name, value);
- }
- }
+ public CxfMessage newInstance() {
+ return new CxfMessage();
+ }
+
+ @Override
+ protected Object createBody() {
+ return getExchange().getBinding().extractBodyFromCxf(getExchange(), cxfMessage);
}
}
Modified: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=diff&rev=524027&r1=524026&r2=524027
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java (original)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java Fri Mar 30 03:17:10 2007
@@ -19,20 +19,17 @@
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.impl.DefaultProducer;
+import org.apache.cxf.message.ExchangeImpl;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
-import org.apache.cxf.message.ExchangeImpl;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.transport.Destination;
import org.apache.cxf.transport.MessageObserver;
import org.apache.cxf.transport.local.LocalConduit;
-import org.apache.cxf.transport.local.LocalDestination;
import org.apache.cxf.transport.local.LocalTransportFactory;
-import org.xmlsoap.schemas.wsdl.http.AddressType;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.util.concurrent.CountDownLatch;
/**
@@ -42,20 +39,21 @@
*/
public class CxfProducer extends DefaultProducer<CxfExchange> {
private CxfEndpoint endpoint;
+ private final LocalTransportFactory transportFactory;
- public CxfProducer(CxfEndpoint endpoint) {
+ public CxfProducer(CxfEndpoint endpoint, LocalTransportFactory transportFactory) {
super(endpoint);
this.endpoint = endpoint;
+ this.transportFactory = transportFactory;
}
public void onExchange(CxfExchange exchange) {
try {
- LocalTransportFactory factory = endpoint.getLocalTransportFactory();
EndpointInfo endpointInfo = endpoint.getEndpointInfo();
- LocalDestination d = (LocalDestination) factory.getDestination(endpointInfo);
+ Destination d = transportFactory.getDestination(endpointInfo);
// Set up a listener for the response
- Conduit conduit = factory.getConduit(endpointInfo);
+ Conduit conduit = transportFactory.getConduit(endpointInfo);
ResultFuture future = new ResultFuture();
conduit.setMessageObserver(future);
@@ -70,6 +68,9 @@
// now lets wait for the response
if (endpoint.isInOut()) {
Message response = future.getResponse();
+
+ // TODO - why do we need to ignore the returned message and get the out message from the exchange!
+ response = e.getOutMessage();
binding.storeCxfResponse(exchange, response);
}
}
Modified: activemq/camel/trunk/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTest.java?view=diff&rev=524027&r1=524026&r2=524027
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTest.java (original)
+++ activemq/camel/trunk/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTest.java Fri Mar 30 03:17:10 2007
@@ -24,82 +24,98 @@
import org.apache.camel.TypeConverter;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.util.CamelClient;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.cxf.Bus;
+import org.apache.cxf.bus.CXFBusFactory;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.transport.Destination;
+import org.apache.cxf.transport.DestinationFactory;
+import org.apache.cxf.transport.DestinationFactoryManager;
import org.apache.cxf.transport.MessageObserver;
import org.apache.cxf.transport.local.LocalConduit;
-import org.apache.cxf.transport.local.LocalDestination;
import org.apache.cxf.transport.local.LocalTransportFactory;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.xmlsoap.schemas.wsdl.http.AddressType;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Map;
+import java.util.Set;
/**
* @version $Revision$
*/
public class CxfTest extends TestCase {
private static final transient Log log = LogFactory.getLog(CxfTest.class);
-
protected CamelContext camelContext = new DefaultCamelContext();
protected CamelClient client = new CamelClient(camelContext);
public void testInvokeOfServer() throws Exception {
- CxfEndpoint endpoint = (CxfEndpoint) camelContext.resolveEndpoint("cxf:http://localhost/test");
- assertNotNull(endpoint);
-
- // lets make sure we use the same factory
- LocalTransportFactory factory = endpoint.getLocalTransportFactory();
-
+ // lets register a service
EndpointInfo ei = new EndpointInfo(null, "http://schemas.xmlsoap.org/soap/http");
AddressType a = new AddressType();
a.setLocation("http://localhost/test");
ei.addExtensor(a);
- LocalDestination d = (LocalDestination) factory.getDestination(ei);
- d.setMessageObserver(new EchoObserver());
+ Bus bus = CXFBusFactory.getDefaultBus();
+ DestinationFactoryManager dfm = bus.getExtension(DestinationFactoryManager.class);
+ DestinationFactory factory = dfm.getDestinationFactory(LocalTransportFactory.TRANSPORT_ID);
- Exchange exchange = client.send("cxf:http://localhost/test", new Processor<Exchange>() {
+ Destination destination = factory.getDestination(ei);
+ destination.setMessageObserver(new EchoObserver());
+
+ // now lets invoke it via Camel
+ CxfExchange exchange = (CxfExchange) client.send("cxf:http://localhost/test", new Processor<Exchange>() {
public void onExchange(Exchange exchange) {
+ exchange.getIn().setHeader("requestHeader", "foo");
exchange.getIn().setBody("<hello>world</hello>");
}
});
org.apache.camel.Message out = exchange.getOut();
- log.info("Received output message: " + out);
+ Message cxfOutMessage = exchange.getOutMessage();
+ log.info("Received output message: " + out + " and CXF out: " + cxfOutMessage);
+
+ assertEquals("replyHeader on CXF", "foo2", cxfOutMessage.get("replyHeader"));
+ assertEquals("replyHeader on Camel", "foo2", out.getHeader("replyHeader"));
-/*
String output = out.getBody(String.class);
- log.info("Received output text: "+ output);
-*/
+ log.info("Received output text: " + output);
}
protected class EchoObserver implements MessageObserver {
public void onMessage(Message message) {
try {
- log.info("Received message: "+ message + " with content types: " + message.getContentFormats());
-
+ log.info("Received message: " + message + " with content types: " + message.getContentFormats());
+
Conduit backChannel = message.getDestination().getBackChannel(message, null, null);
message.remove(LocalConduit.DIRECT_DISPATCH);
TypeConverter converter = camelContext.getTypeConverter();
String request = converter.convertTo(String.class, message.getContent(InputStream.class));
log.info("Request body: " + request);
-
+
org.apache.cxf.message.Exchange exchange = message.getExchange();
MessageImpl reply = new MessageImpl();
+ reply.put("foo", "bar");
+ assertEquals("foo header", "bar", reply.get("foo"));
+
+ reply.put("replyHeader", message.get("requestHeader") + "2");
+
+ Set<Map.Entry<String, Object>> entries = reply.entrySet();
+ assertEquals("entrySet.size()", 2, entries.size());
+
//reply.setContent(String.class, "<reply>true</reply>");
InputStream payload = converter.convertTo(InputStream.class, "<reply>true</reply>");
reply.setContent(InputStream.class, payload);
exchange.setOutMessage(reply);
-
- backChannel.send(reply);
+ log.info("sending reply: " + reply);
+ backChannel.send(message);
/*
backChannel.send(message);
@@ -114,7 +130,7 @@
*/
}
catch (Exception e) {
- log.error("Caught: "+ e, e);
+ log.error("Caught: " + e, e);
fail("Caught: " + e);
}
}