You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/03/30 12:17:12 UTC

svn commit: r524027 - in /activemq/camel/trunk/camel-cxf/src: main/java/org/apache/camel/component/cxf/ test/java/org/apache/camel/component/cxf/

Author: jstrachan
Date: Fri Mar 30 03:17:10 2007
New Revision: 524027

URL: http://svn.apache.org/viewvc?view=rev&rev=524027
Log:
use the TransportFactory from the Bus itself to avoid possibly using the wrong one; also got a working test case now that does assertions on the returned headers and body

Modified:
    activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfBinding.java
    activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfComponent.java
    activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
    activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
    activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfExchange.java
    activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfMessage.java
    activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
    activemq/camel/trunk/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTest.java

Modified: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfBinding.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfBinding.java?view=diff&rev=524027&r1=524026&r2=524027
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfBinding.java (original)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfBinding.java Fri Mar 30 03:17:10 2007
@@ -21,7 +21,6 @@
 import org.apache.cxf.message.MessageImpl;
 
 import java.io.InputStream;
-import java.util.Map;
 import java.util.Set;
 
 /**
@@ -47,7 +46,7 @@
     }
 
     public MessageImpl createCxfMessage(CxfExchange exchange) {
-        MessageImpl answer = new MessageImpl();
+        MessageImpl answer = (MessageImpl) exchange.getInMessage();
 
         // TODO is InputStream the best type to give to CXF?
         CxfMessage in = exchange.getIn();
@@ -57,22 +56,21 @@
         }
         answer.setContent(InputStream.class, body);
 
+        // no need to process headers as we reuse the CXF message
+        /*
         // set the headers
         Set<Map.Entry<String, Object>> entries = in.getHeaders().entrySet();
         for (Map.Entry<String, Object> entry : entries) {
             answer.put(entry.getKey(), entry.getValue());
         }
+        */
         return answer;
     }
 
     public void storeCxfResponse(CxfExchange exchange, Message response) {
+        // no need to process headers as we use the CXF message
         CxfMessage out = exchange.getOut();
+        out.setMessage(response);
         out.setBody(getBody(response));
-
-        // set the headers
-        Set<Map.Entry<String, Object>> entries = response.entrySet();
-        for (Map.Entry<String, Object> entry : entries) {
-          out.setHeader(entry.getKey(), entry.getValue());
-        }
     }
 }

Modified: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfComponent.java?view=diff&rev=524027&r1=524026&r2=524027
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfComponent.java (original)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfComponent.java Fri Mar 30 03:17:10 2007
@@ -19,7 +19,11 @@
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.impl.DefaultComponent;
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusException;
+import org.apache.cxf.bus.CXFBusFactory;
 import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.DestinationFactoryManager;
 import org.apache.cxf.transport.local.LocalTransportFactory;
 import org.xmlsoap.schemas.wsdl.http.AddressType;
 
@@ -31,7 +35,7 @@
  * @version $Revision$
  */
 public class CxfComponent extends DefaultComponent<CxfExchange> {
-    private LocalTransportFactory localTransportFactory = new LocalTransportFactory();
+    private LocalTransportFactory localTransportFactory;
 
     public CxfComponent() {
     }
@@ -53,11 +57,23 @@
         return new CxfEndpoint(uri, this, endpointInfo);
     }
 
-    public LocalTransportFactory getLocalTransportFactory() {
+    public LocalTransportFactory getLocalTransportFactory() throws BusException {
+        if (localTransportFactory == null) {
+            localTransportFactory = findLocalTransportFactory();
+            if (localTransportFactory == null) {
+                localTransportFactory = new LocalTransportFactory();
+            }
+        }
         return localTransportFactory;
     }
 
     public void setLocalTransportFactory(LocalTransportFactory localTransportFactory) {
         this.localTransportFactory = localTransportFactory;
+    }
+
+    protected LocalTransportFactory findLocalTransportFactory() throws BusException {
+        Bus bus = CXFBusFactory.getDefaultBus();
+        DestinationFactoryManager dfm = bus.getExtension(DestinationFactoryManager.class);
+        return (LocalTransportFactory) dfm.getDestinationFactory(LocalTransportFactory.TRANSPORT_ID);
     }
 }

Modified: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=diff&rev=524027&r1=524026&r2=524027
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java (original)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java Fri Mar 30 03:17:10 2007
@@ -17,32 +17,34 @@
  */
 package org.apache.camel.component.cxf;
 
-import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.Processor;
-import org.apache.cxf.transport.local.LocalDestination;
-import org.apache.cxf.transport.MessageObserver;
+import org.apache.camel.impl.DefaultConsumer;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.Destination;
+import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.transport.local.LocalTransportFactory;
 
 /**
  * A consumer of exchanges for a service in CXF
  *
  * @version $Revision$
-*/
+ */
 public class CxfConsumer extends DefaultConsumer<CxfExchange> {
     private CxfEndpoint endpoint;
-    private LocalDestination destination;
+    private final LocalTransportFactory transportFactory;
+    private Destination destination;
 
-    public CxfConsumer(CxfEndpoint endpoint, Processor<CxfExchange> processor) {
+    public CxfConsumer(CxfEndpoint endpoint, Processor<CxfExchange> processor, LocalTransportFactory transportFactory) {
         super(endpoint, processor);
         this.endpoint = endpoint;
+        this.transportFactory = transportFactory;
     }
 
-
     @Override
     protected void doStart() throws Exception {
         super.doStart();
 
-        destination = (LocalDestination) endpoint.getLocalTransportFactory().getDestination(endpoint.getEndpointInfo());
+        destination = transportFactory.getDestination(endpoint.getEndpointInfo());
         destination.setMessageObserver(new MessageObserver() {
             public void onMessage(Message message) {
                 incomingCxfMessage(message);

Modified: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java?view=diff&rev=524027&r1=524026&r2=524027
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java (original)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java Fri Mar 30 03:17:10 2007
@@ -21,9 +21,10 @@
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
-import org.apache.cxf.transport.local.LocalTransportFactory;
-import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.BusException;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.local.LocalTransportFactory;
 
 /**
  * The endpoint in the service engine
@@ -43,11 +44,11 @@
     }
 
     public Producer<CxfExchange> createProducer() throws Exception {
-        return startService(new CxfProducer(this));
+        return startService(new CxfProducer(this, getLocalTransportFactory()));
     }
 
     public Consumer<CxfExchange> createConsumer(Processor<CxfExchange> processor) throws Exception {
-        return startService(new CxfConsumer(this, processor));
+        return startService(new CxfConsumer(this, processor, getLocalTransportFactory()));
     }
 
     public CxfExchange createExchange() {
@@ -77,7 +78,7 @@
         this.inOut = inOut;
     }
 
-    public LocalTransportFactory getLocalTransportFactory() {
+    public LocalTransportFactory getLocalTransportFactory() throws BusException {
         return component.getLocalTransportFactory();
     }
 

Modified: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfExchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfExchange.java?view=diff&rev=524027&r1=524026&r2=524027
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfExchange.java (original)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfExchange.java Fri Mar 30 03:17:10 2007
@@ -95,20 +95,20 @@
         return exchange;
     }
 
-    public Message getOutFaultMessage() {
-        return getExchange().getOutFaultMessage();
+    public Message getInMessage() {
+        return getIn().getMessage();
     }
 
-    public Message getInMessage() {
-        return getExchange().getInMessage();
+    public Message getOutMessage() {
+        return getOut().getMessage();
     }
 
-    public Message getInFaultMessage() {
-        return getExchange().getInFaultMessage();
+    public Message getOutFaultMessage() {
+        return getExchange().getOutFaultMessage();
     }
 
-    public Message getOutMessage() {
-        return getExchange().getOutMessage();
+    public Message getInFaultMessage() {
+        return getExchange().getInFaultMessage();
     }
 
     public Destination getDestination() {

Modified: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfMessage.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfMessage.java?view=diff&rev=524027&r1=524026&r2=524027
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfMessage.java (original)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfMessage.java Fri Mar 30 03:17:10 2007
@@ -19,9 +19,9 @@
 
 import org.apache.camel.impl.DefaultMessage;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
 
 import java.util.Map;
-import java.util.Set;
 
 /**
  * An Apache CXF {@link Message} which provides access to the underlying CXF features
@@ -32,6 +32,7 @@
     private Message cxfMessage;
 
     public CxfMessage() {
+        this(new MessageImpl());
     }
 
     public CxfMessage(Message cxfMessage) {
@@ -67,38 +68,26 @@
     }
 
     public Object getHeader(String name) {
-        Object answer = null;
-        if (cxfMessage != null) {
-            answer = cxfMessage.get(name);
-        }
-        if (answer == null) {
-            answer = super.getHeader(name);
-        }
-        return answer;
+        return cxfMessage.get(name);
     }
 
     @Override
-    public CxfMessage newInstance() {
-        return new CxfMessage();
+    public void setHeader(String name, Object value) {
+        cxfMessage.put(name, value);
     }
 
     @Override
-    protected Object createBody() {
-        if (cxfMessage != null) {
-            return getExchange().getBinding().extractBodyFromCxf(getExchange(), cxfMessage);
-        }
-        return null;
+    public Map<String, Object> getHeaders() {
+        return cxfMessage;
     }
 
     @Override
-    protected void populateInitialHeaders(Map<String, Object> map) {
-        if (cxfMessage != null) {
-            Set<Map.Entry<String, Object>> entries = cxfMessage.entrySet();
-            for (Map.Entry<String, Object> entry : entries) {
-                String name = entry.getKey();
-                Object value = entry.getValue();
-                map.put(name, value);
-            }
-        }
+    public CxfMessage newInstance() {
+        return new CxfMessage();
+    }
+
+    @Override
+    protected Object createBody() {
+        return getExchange().getBinding().extractBodyFromCxf(getExchange(), cxfMessage);
     }
 }

Modified: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=diff&rev=524027&r1=524026&r2=524027
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java (original)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java Fri Mar 30 03:17:10 2007
@@ -19,20 +19,17 @@
 
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
-import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.transport.Destination;
 import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.transport.local.LocalConduit;
-import org.apache.cxf.transport.local.LocalDestination;
 import org.apache.cxf.transport.local.LocalTransportFactory;
-import org.xmlsoap.schemas.wsdl.http.AddressType;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.concurrent.CountDownLatch;
 
 /**
@@ -42,20 +39,21 @@
  */
 public class CxfProducer extends DefaultProducer<CxfExchange> {
     private CxfEndpoint endpoint;
+    private final LocalTransportFactory transportFactory;
 
-    public CxfProducer(CxfEndpoint endpoint) {
+    public CxfProducer(CxfEndpoint endpoint, LocalTransportFactory transportFactory) {
         super(endpoint);
         this.endpoint = endpoint;
+        this.transportFactory = transportFactory;
     }
 
     public void onExchange(CxfExchange exchange) {
         try {
-            LocalTransportFactory factory = endpoint.getLocalTransportFactory();
             EndpointInfo endpointInfo = endpoint.getEndpointInfo();
-            LocalDestination d = (LocalDestination) factory.getDestination(endpointInfo);
+            Destination d = transportFactory.getDestination(endpointInfo);
 
             // Set up a listener for the response
-            Conduit conduit = factory.getConduit(endpointInfo);
+            Conduit conduit = transportFactory.getConduit(endpointInfo);
             ResultFuture future = new ResultFuture();
             conduit.setMessageObserver(future);
 
@@ -70,6 +68,9 @@
             // now lets wait for the response
             if (endpoint.isInOut()) {
                 Message response = future.getResponse();
+
+                // TODO - why do we need to ignore the returned message and get the out message from the exchange!
+                response = e.getOutMessage();
                 binding.storeCxfResponse(exchange, response);
             }
         }

Modified: activemq/camel/trunk/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTest.java?view=diff&rev=524027&r1=524026&r2=524027
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTest.java (original)
+++ activemq/camel/trunk/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTest.java Fri Mar 30 03:17:10 2007
@@ -24,82 +24,98 @@
 import org.apache.camel.TypeConverter;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.util.CamelClient;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.cxf.Bus;
+import org.apache.cxf.bus.CXFBusFactory;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.transport.Destination;
+import org.apache.cxf.transport.DestinationFactory;
+import org.apache.cxf.transport.DestinationFactoryManager;
 import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.transport.local.LocalConduit;
-import org.apache.cxf.transport.local.LocalDestination;
 import org.apache.cxf.transport.local.LocalTransportFactory;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.xmlsoap.schemas.wsdl.http.AddressType;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * @version $Revision$
  */
 public class CxfTest extends TestCase {
     private static final transient Log log = LogFactory.getLog(CxfTest.class);
-
     protected CamelContext camelContext = new DefaultCamelContext();
     protected CamelClient client = new CamelClient(camelContext);
 
     public void testInvokeOfServer() throws Exception {
-        CxfEndpoint endpoint = (CxfEndpoint) camelContext.resolveEndpoint("cxf:http://localhost/test");
-        assertNotNull(endpoint);
-
-        // lets make sure we use the same factory
-        LocalTransportFactory factory = endpoint.getLocalTransportFactory();
-
+        // lets register a service
         EndpointInfo ei = new EndpointInfo(null, "http://schemas.xmlsoap.org/soap/http");
         AddressType a = new AddressType();
         a.setLocation("http://localhost/test");
         ei.addExtensor(a);
 
-        LocalDestination d = (LocalDestination) factory.getDestination(ei);
-        d.setMessageObserver(new EchoObserver());
+        Bus bus = CXFBusFactory.getDefaultBus();
+        DestinationFactoryManager dfm = bus.getExtension(DestinationFactoryManager.class);
+        DestinationFactory factory = dfm.getDestinationFactory(LocalTransportFactory.TRANSPORT_ID);
 
-        Exchange exchange = client.send("cxf:http://localhost/test", new Processor<Exchange>() {
+        Destination destination = factory.getDestination(ei);
+        destination.setMessageObserver(new EchoObserver());
+
+        // now lets invoke it via Camel
+        CxfExchange exchange = (CxfExchange) client.send("cxf:http://localhost/test", new Processor<Exchange>() {
             public void onExchange(Exchange exchange) {
+                exchange.getIn().setHeader("requestHeader", "foo");
                 exchange.getIn().setBody("<hello>world</hello>");
             }
         });
 
         org.apache.camel.Message out = exchange.getOut();
-        log.info("Received output message: " + out);
+        Message cxfOutMessage = exchange.getOutMessage();
+        log.info("Received output message: " + out + " and CXF out: " + cxfOutMessage);
+
+        assertEquals("replyHeader on CXF", "foo2", cxfOutMessage.get("replyHeader"));
+        assertEquals("replyHeader on Camel", "foo2", out.getHeader("replyHeader"));
 
-/*
         String output = out.getBody(String.class);
-        log.info("Received output text: "+ output);
-*/
+        log.info("Received output text: " + output);
     }
 
     protected class EchoObserver implements MessageObserver {
         public void onMessage(Message message) {
             try {
-                log.info("Received message: "+ message + " with content types: " + message.getContentFormats());
-                
+                log.info("Received message: " + message + " with content types: " + message.getContentFormats());
+
                 Conduit backChannel = message.getDestination().getBackChannel(message, null, null);
                 message.remove(LocalConduit.DIRECT_DISPATCH);
 
                 TypeConverter converter = camelContext.getTypeConverter();
                 String request = converter.convertTo(String.class, message.getContent(InputStream.class));
                 log.info("Request body: " + request);
-                
+
                 org.apache.cxf.message.Exchange exchange = message.getExchange();
                 MessageImpl reply = new MessageImpl();
+                reply.put("foo", "bar");
+                assertEquals("foo header", "bar", reply.get("foo"));
+
+                reply.put("replyHeader", message.get("requestHeader") + "2");
+
+                Set<Map.Entry<String, Object>> entries = reply.entrySet();
+                assertEquals("entrySet.size()", 2, entries.size());
+
                 //reply.setContent(String.class, "<reply>true</reply>");
                 InputStream payload = converter.convertTo(InputStream.class, "<reply>true</reply>");
                 reply.setContent(InputStream.class, payload);
                 exchange.setOutMessage(reply);
 
-
-                backChannel.send(reply);
+                log.info("sending reply: " + reply);
+                backChannel.send(message);
 
 /*
                 backChannel.send(message);
@@ -114,7 +130,7 @@
 */
             }
             catch (Exception e) {
-                log.error("Caught: "+ e, e);
+                log.error("Caught: " + e, e);
                 fail("Caught: " + e);
             }
         }