You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/03/30 01:48:41 UTC

svn commit: r523882 - in /activemq/camel/trunk/camel-cxf: ./ src/main/java/org/apache/camel/component/cxf/ src/test/java/org/apache/camel/component/cxf/

Author: jstrachan
Date: Thu Mar 29 16:48:41 2007
New Revision: 523882

URL: http://svn.apache.org/viewvc?view=rev&rev=523882
Log:
added a basic working producer for communicating with a CXF service using the local transport

Added:
    activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java   (with props)
    activemq/camel/trunk/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTest.java   (with props)
Modified:
    activemq/camel/trunk/camel-cxf/pom.xml
    activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfBinding.java
    activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfComponent.java
    activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java

Modified: activemq/camel/trunk/camel-cxf/pom.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/pom.xml?view=diff&rev=523882&r1=523881&r2=523882
==============================================================================
--- activemq/camel/trunk/camel-cxf/pom.xml (original)
+++ activemq/camel/trunk/camel-cxf/pom.xml Thu Mar 29 16:48:41 2007
@@ -48,6 +48,11 @@
       <groupId>org.apache.cxf</groupId>
       <artifactId>cxf-rt-core</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.cxf</groupId>
+      <artifactId>cxf-rt-transports-local</artifactId>
+      <version>2.0-incubator-RC-SNAPSHOT</version>
+    </dependency>
 
 
     <dependency>

Modified: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfBinding.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfBinding.java?view=diff&rev=523882&r1=523881&r2=523882
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfBinding.java (original)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfBinding.java Thu Mar 29 16:48:41 2007
@@ -18,7 +18,10 @@
 package org.apache.camel.component.cxf;
 
 import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
 
+import java.io.InputStream;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -29,13 +32,47 @@
 public class CxfBinding {
     public Object extractBodyFromCxf(CxfExchange exchange, Message message) {
         //  TODO how do we choose a format?
+        return getBody(message);
+    }
+
+    protected Object getBody(Message message) {
         Set<Class<?>> contentFormats = message.getContentFormats();
         for (Class<?> contentFormat : contentFormats) {
-            Object answer = message.get(contentFormat);
+            Object answer = message.getContent(contentFormat);
             if (answer != null) {
                 return answer;
             }
         }
         return null;
+    }
+
+    public MessageImpl createCxfMessage(CxfExchange exchange) {
+        MessageImpl answer = new MessageImpl();
+
+        // TODO is InputStream the best type to give to CXF?
+        CxfMessage in = exchange.getIn();
+        Object body = in.getBody(InputStream.class);
+        if (body == null) {
+            body = in.getBody();
+        }
+        answer.setContent(InputStream.class, body);
+
+        // set the headers
+        Set<Map.Entry<String, Object>> entries = in.getHeaders().entrySet();
+        for (Map.Entry<String, Object> entry : entries) {
+            answer.put(entry.getKey(), entry.getValue());
+        }
+        return answer;
+    }
+
+    public void storeCxfResponse(CxfExchange exchange, Message response) {
+        CxfMessage out = exchange.getOut();
+        out.setBody(getBody(response));
+
+        // set the headers
+        Set<Map.Entry<String, Object>> entries = response.entrySet();
+        for (Map.Entry<String, Object> entry : entries) {
+          out.setHeader(entry.getKey(), entry.getValue());
+        }
     }
 }

Modified: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfComponent.java?view=diff&rev=523882&r1=523881&r2=523882
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfComponent.java (original)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfComponent.java Thu Mar 29 16:48:41 2007
@@ -19,18 +19,19 @@
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.impl.DefaultComponent;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.local.LocalTransportFactory;
+import org.xmlsoap.schemas.wsdl.http.AddressType;
 
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.HashMap;
-import java.util.Map;
 
 /**
  * @version $Revision$
  */
 public class CxfComponent extends DefaultComponent<CxfExchange> {
-    private Map<String, CxfEndpoint> map = new HashMap<String, CxfEndpoint>();
+    private LocalTransportFactory localTransportFactory = new LocalTransportFactory();
 
     public CxfComponent() {
     }
@@ -40,33 +41,23 @@
     }
 
     public synchronized CxfEndpoint createEndpoint(String uri, String[] urlParts) throws IOException, URISyntaxException {
-        CxfEndpoint endpoint = map.get(uri);
-        if (endpoint == null) {
-            String remainingUrl = uri.substring("cxf:".length());
-            URI u = new URI(remainingUrl);
-
-            String protocol = u.getScheme();
-
-            map.put(uri, endpoint);
-        }
-        return endpoint;
+        String remainingUrl = uri.substring("cxf:".length());
+        URI u = new URI(remainingUrl);
+
+        // TODO this is a hack!!!
+        EndpointInfo endpointInfo = new EndpointInfo(null, "http://schemas.xmlsoap.org/soap/http");
+        AddressType a = new AddressType();
+        a.setLocation(remainingUrl);
+        endpointInfo.addExtensor(a);
+
+        return new CxfEndpoint(uri, this, endpointInfo);
+    }
+
+    public LocalTransportFactory getLocalTransportFactory() {
+        return localTransportFactory;
     }
 
-    /*
-    protected void foo() {
-       Bus bus = CXFBusFactory.getDefaultBus();
-       ServerRegistry serverRegistry = bus.getExtension(ServerRegistry.class);
-       List<Server> servers = serverRegistry.getServers();
-
-       Server targetServer = null;
-       for (Server server : servers) {
-           targetServer = server;
-           EndpointInfo info = server.getEndpoint().getEndpointInfo();
-           String address = info.getAddress();
-
-           Message message = new MessageImpl();
-           server.getMessageObserver().onMessage(message);
-       }
+    public void setLocalTransportFactory(LocalTransportFactory localTransportFactory) {
+        this.localTransportFactory = localTransportFactory;
     }
-    */
 }

Modified: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java?view=diff&rev=523882&r1=523881&r2=523882
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java (original)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java Thu Mar 29 16:48:41 2007
@@ -17,13 +17,14 @@
  */
 package org.apache.camel.component.cxf;
 
-import org.apache.camel.CamelContext;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.impl.DefaultEndpoint;
-import org.apache.camel.impl.DefaultProducer;
+import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.transport.local.LocalTransportFactory;
+import org.apache.cxf.service.model.EndpointInfo;
 
 /**
  * The endpoint in the service engine
@@ -32,17 +33,18 @@
  */
 public class CxfEndpoint extends DefaultEndpoint<CxfExchange> {
     private CxfBinding binding;
-
-    protected CxfEndpoint(String uri, CamelContext camelContext) {
-        super(uri, camelContext);
+    private final CxfComponent component;
+    private final EndpointInfo endpointInfo;
+    private boolean inOut = true;
+
+    public CxfEndpoint(String uri, CxfComponent component, EndpointInfo endpointInfo) {
+        super(uri, component.getContext());
+        this.component = component;
+        this.endpointInfo = endpointInfo;
     }
 
     public Producer<CxfExchange> createProducer() throws Exception {
-        return startService(new DefaultProducer<CxfExchange>(this) {
-            public void onExchange(CxfExchange exchange) {
-                // TODO send into CXF
-            }
-        });
+        return startService(new CxfProducer(this));
     }
 
     public Consumer<CxfExchange> createConsumer(Processor<CxfExchange> processor) throws Exception {
@@ -65,15 +67,23 @@
         this.binding = binding;
     }
 
-    @Override
-    protected void doActivate() throws Exception {
-        super.doActivate();
+    public boolean isInOut() {
+        return inOut;
+    }
+
+    public void setInOut(boolean inOut) {
+        this.inOut = inOut;
+    }
 
-        // TODO process any inbound messages from CXF
+    public LocalTransportFactory getLocalTransportFactory() {
+        return component.getLocalTransportFactory();
+    }
 
-        Processor<CxfExchange> processor = getInboundProcessor();
-        if (processor != null) {
+    public EndpointInfo getEndpointInfo() {
+        return endpointInfo;
+    }
 
-        }
+    public CxfComponent getComponent() {
+        return component;
     }
 }

Added: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=auto&rev=523882
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java (added)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java Thu Mar 29 16:48:41 2007
@@ -0,0 +1,107 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.message.ExchangeImpl;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.transport.local.LocalConduit;
+import org.apache.cxf.transport.local.LocalDestination;
+import org.apache.cxf.transport.local.LocalTransportFactory;
+import org.xmlsoap.schemas.wsdl.http.AddressType;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Sends messages from Camel into the CXF endpoint
+ *
+ * @version $Revision$
+ */
+public class CxfProducer extends DefaultProducer<CxfExchange> {
+    private CxfEndpoint endpoint;
+
+    public CxfProducer(CxfEndpoint endpoint) {
+        super(endpoint);
+        this.endpoint = endpoint;
+    }
+
+    public void onExchange(CxfExchange exchange) {
+        try {
+            LocalTransportFactory factory = endpoint.getLocalTransportFactory();
+            EndpointInfo endpointInfo = endpoint.getEndpointInfo();
+            LocalDestination d = (LocalDestination) factory.getDestination(endpointInfo);
+
+            // Set up a listener for the response
+            Conduit conduit = factory.getConduit(endpointInfo);
+            ResultFuture future = new ResultFuture();
+            conduit.setMessageObserver(future);
+
+            CxfBinding binding = endpoint.getBinding();
+            MessageImpl m = binding.createCxfMessage(exchange);
+            ExchangeImpl e = new ExchangeImpl();
+            e.setInMessage(m);
+            m.put(LocalConduit.DIRECT_DISPATCH, Boolean.TRUE);
+            m.setDestination(d);
+            conduit.send(m);
+
+            // now lets wait for the response
+            if (endpoint.isInOut()) {
+                Message response = future.getResponse();
+                binding.storeCxfResponse(exchange, response);
+            }
+        }
+        catch (IOException e) {
+            throw new RuntimeCamelException(e);
+        }
+    }
+
+    protected class ResultFuture implements MessageObserver {
+        Message response;
+        CountDownLatch latch = new CountDownLatch(1);
+
+        public Message getResponse() {
+            while (response == null) {
+                try {
+                    latch.await();
+                }
+                catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+            return response;
+        }
+
+        public synchronized void onMessage(Message message) {
+            try {
+                message.remove(LocalConduit.DIRECT_DISPATCH);
+                this.response = message;
+            }
+            finally {
+                latch.countDown();
+            }
+        }
+    }
+}

Propchange: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/camel/trunk/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTest.java?view=auto&rev=523882
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTest.java (added)
+++ activemq/camel/trunk/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTest.java Thu Mar 29 16:48:41 2007
@@ -0,0 +1,139 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf;
+
+import junit.framework.TestCase;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.TypeConverter;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.util.CamelClient;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.transport.local.LocalConduit;
+import org.apache.cxf.transport.local.LocalDestination;
+import org.apache.cxf.transport.local.LocalTransportFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.xmlsoap.schemas.wsdl.http.AddressType;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * @version $Revision$
+ */
+public class CxfTest extends TestCase {
+    private static final transient Log log = LogFactory.getLog(CxfTest.class);
+
+    protected CamelContext camelContext = new DefaultCamelContext();
+    protected CamelClient client = new CamelClient(camelContext);
+
+    public void testInvokeOfServer() throws Exception {
+        CxfEndpoint endpoint = (CxfEndpoint) camelContext.resolveEndpoint("cxf:http://localhost/test");
+        assertNotNull(endpoint);
+
+        // lets make sure we use the same factory
+        LocalTransportFactory factory = endpoint.getLocalTransportFactory();
+
+        EndpointInfo ei = new EndpointInfo(null, "http://schemas.xmlsoap.org/soap/http");
+        AddressType a = new AddressType();
+        a.setLocation("http://localhost/test");
+        ei.addExtensor(a);
+
+        LocalDestination d = (LocalDestination) factory.getDestination(ei);
+        d.setMessageObserver(new EchoObserver());
+
+        Exchange exchange = client.send("cxf:http://localhost/test", new Processor<Exchange>() {
+            public void onExchange(Exchange exchange) {
+                exchange.getIn().setBody("<hello>world</hello>");
+            }
+        });
+
+        org.apache.camel.Message out = exchange.getOut();
+        log.info("Received output message: " + out);
+
+/*
+        String output = out.getBody(String.class);
+        log.info("Received output text: "+ output);
+*/
+    }
+
+    protected class EchoObserver implements MessageObserver {
+        public void onMessage(Message message) {
+            try {
+                log.info("Received message: "+ message + " with content types: " + message.getContentFormats());
+                
+                Conduit backChannel = message.getDestination().getBackChannel(message, null, null);
+                message.remove(LocalConduit.DIRECT_DISPATCH);
+
+                TypeConverter converter = camelContext.getTypeConverter();
+                String request = converter.convertTo(String.class, message.getContent(InputStream.class));
+                log.info("Request body: " + request);
+                
+                org.apache.cxf.message.Exchange exchange = message.getExchange();
+                MessageImpl reply = new MessageImpl();
+                //reply.setContent(String.class, "<reply>true</reply>");
+                InputStream payload = converter.convertTo(InputStream.class, "<reply>true</reply>");
+                reply.setContent(InputStream.class, payload);
+                exchange.setOutMessage(reply);
+
+
+                backChannel.send(reply);
+
+/*
+                backChannel.send(message);
+
+                OutputStream out = message.getContent(OutputStream.class);
+                InputStream in = message.getContent(InputStream.class);
+
+                copy(in, out, 1024);
+
+                out.close();
+                in.close();
+*/
+            }
+            catch (Exception e) {
+                log.error("Caught: "+ e, e);
+                fail("Caught: " + e);
+            }
+        }
+    }
+
+    private static void copy(final InputStream input, final OutputStream output, final int bufferSize)
+            throws IOException {
+        try {
+            final byte[] buffer = new byte[bufferSize];
+
+            int n = input.read(buffer);
+            while (-1 != n) {
+                output.write(buffer, 0, n);
+                n = input.read(buffer);
+            }
+        }
+        finally {
+            input.close();
+            output.close();
+        }
+    }
+}

Propchange: activemq/camel/trunk/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTest.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain