You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/03/22 13:19:27 UTC

svn commit: r521236 - in /activemq/camel/trunk/camel-jms/src: main/java/org/apache/camel/component/jms/ test/java/org/apache/camel/component/jms/

Author: jstrachan
Date: Thu Mar 22 05:19:26 2007
New Revision: 521236

URL: http://svn.apache.org/viewvc?view=rev&rev=521236
Log:
tidied up the JMS component, added a separate strategy JmsBinding which deals with the actual binding of Camel <-> JMS and tested that the binding uses TextMessage or ObjectMessage based on the payload

Added:
    activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java   (with props)
    activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/PurchaseOrder.java   (with props)
Modified:
    activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
    activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsExchange.java
    activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
    activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java

Added: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java?view=auto&rev=521236
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java (added)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java Thu Mar 22 05:19:26 2007
@@ -0,0 +1,107 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.TextMessage;
+import javax.jms.MapMessage;
+import javax.jms.BytesMessage;
+import javax.jms.StreamMessage;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Enumeration;
+import java.io.Serializable;
+
+/**
+ * A Strategy used to convert between a Camel {@JmsExchange} and {@JmsMessage} to and from a
+ * JMS {@link Message}
+ *
+ * @version $Revision$
+ */
+public class JmsBinding {
+
+    /**
+     * Creates a JMS message from the Camel exchange and message
+     *
+     * @param session the JMS session used to create the message
+     * @return a newly created JMS Message instance containing the
+     * @throws JMSException if the message could not be created
+     */
+    public Message createJmsMessage(JmsExchange exchange, JmsMessage message, Session session) throws JMSException {
+        Object value = message.getBody();
+        if (value instanceof String) {
+            return session.createTextMessage((String) value);
+        }
+        else if (value instanceof Serializable) {
+            return session.createObjectMessage((Serializable) value);
+        }
+        else {
+            return session.createMessage();
+        }
+    }
+
+    /**
+     * Extracts the body from the JMS message
+     *
+     * @param exchange
+     * @param message
+     */
+    public Object extractBodyFromJms(JmsExchange exchange, Message message) {
+        try {
+            if (message instanceof ObjectMessage) {
+                ObjectMessage objectMessage = (ObjectMessage) message;
+                return objectMessage.getObject();
+            }
+            else if (message instanceof TextMessage) {
+                TextMessage textMessage = (TextMessage) message;
+                return textMessage.getText();
+            }
+            else if (message instanceof MapMessage) {
+                return createMapFromMapMessage((MapMessage) message);
+            }
+            else if (message instanceof BytesMessage || message instanceof StreamMessage) {
+                // TODO we need a decoder to be able to process the message
+                return message;
+            }
+            else {
+                return null;
+            }
+        }
+        catch (JMSException e) {
+            throw new RuntimeJmsException("Failed to extract body due to: " + e + ". Message: " + message, e);
+        }
+    }
+
+
+    /**
+     * Extracts a {@link Map} from a {@link MapMessage}
+     */
+    public Map<String, Object> createMapFromMapMessage(MapMessage message) throws JMSException {
+        Map<String, Object> answer = new HashMap<String, Object>();
+        Enumeration names = message.getPropertyNames();
+        while (names.hasMoreElements()) {
+            String name = names.nextElement().toString();
+            Object value = message.getObject(name);
+            answer.put(name, value);
+        }
+        return answer;
+    }
+}

Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?view=diff&rev=521236&r1=521235&r2=521236
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Thu Mar 22 05:19:26 2007
@@ -35,7 +35,7 @@
  */
 public class JmsEndpoint extends DefaultEndpoint<JmsExchange> implements MessageListener {
     private static final Log log = LogFactory.getLog(JmsEndpoint.class);
-
+    private JmsBinding binding;
     private JmsOperations template;
     private AbstractMessageListenerContainer listenerContainer;
     private String destination;
@@ -56,8 +56,7 @@
         getInboundProcessor().onExchange(exchange);
     }
 
-
-    public void send(Exchange exchange) {
+    public void onExchange(Exchange exchange) {
         // lets convert to the type of an exchange
         JmsExchange jmsExchange = convertTo(JmsExchange.class, exchange);
         onExchange(jmsExchange);
@@ -66,7 +65,7 @@
     public void onExchange(final JmsExchange exchange) {
         template.send(destination, new MessageCreator() {
             public Message createMessage(Session session) throws JMSException {
-                Message message = exchange.createMessage(session);
+                Message message = getBinding().createJmsMessage(exchange, exchange.getIn(), session);
                 if (log.isDebugEnabled()) {
                     log.debug(JmsEndpoint.this + " sending JMS message: " + message);
                 }
@@ -75,20 +74,38 @@
         });
     }
 
-    public JmsOperations getTemplate() {
-        return template;
+    public JmsExchange createExchange() {
+        return new JmsExchange(getContext(), getBinding());
     }
 
-    public JmsExchange createExchange() {
-        return new JmsExchange(getContext());
+    public JmsExchange createExchange(Message message) {
+        return new JmsExchange(getContext(), getBinding(), message);
     }
 
+    // Properties
+    //-------------------------------------------------------------------------
+    public JmsBinding getBinding() {
+        if (binding == null) {
+            binding = new JmsBinding();
+        }
+        return binding;
+    }
 
-    public JmsExchange createExchange(Message message) {
-        return new JmsExchange(getContext(), message);
+    /**
+     * Sets the binding used to convert from a Camel message to and from a JMS message
+     *
+     * @param binding the binding to use
+     */
+    public void setBinding(JmsBinding binding) {
+        this.binding = binding;
     }
 
+    public JmsOperations getTemplate() {
+        return template;
+    }
 
+    // Implementation methods
+    //-------------------------------------------------------------------------
     protected void doActivate() {
         super.doActivate();
         listenerContainer.afterPropertiesSet();

Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsExchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsExchange.java?view=diff&rev=521236&r1=521235&r2=521236
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsExchange.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsExchange.java Thu Mar 22 05:19:26 2007
@@ -31,44 +31,42 @@
  */
 public class JmsExchange extends DefaultExchange {
 
-    public JmsExchange(CamelContext container) {
-        super(container);
+    private JmsBinding binding;
+
+    public JmsExchange(CamelContext context, JmsBinding binding) {
+        super(context);
+        this.binding = binding;
     }
 
-    public JmsExchange(CamelContext container, Message message) {
-        super(container);
+    public JmsExchange(CamelContext context, JmsBinding binding, Message message) {
+        this(context, binding);
         setIn(new JmsMessage(message));
     }
 
     @Override
-    public Exchange newInstance() {
-        return new JmsExchange(getContext());
+    public JmsMessage getIn() {
+        return (JmsMessage) super.getIn();
     }
 
-    public Message createMessage(Session session) throws JMSException {
-        Message request = getInMessage();
-        if (request == null) {
-            request = session.createMessage();
-
-            /** TODO
-            if (lazyHeaders != null) {
-                // lets add any lazy headers
-                for (Map.Entry<String, Object> entry : lazyHeaders.entrySet()) {
-                    request.setObjectProperty(entry.getKey(), entry.getValue());
-                }
-            }
-             */
-        }
-        return request;
-    }
-
-    public Message getInMessage() {
-        JmsMessage jmsMessage = (JmsMessage) getIn();
-        if (jmsMessage != null) {
-            return jmsMessage.getJmsMessage();
-        }
-        return null;
+    @Override
+    public JmsMessage getOut() {
+        return (JmsMessage) super.getOut();
     }
+
+    @Override
+    public JmsMessage getFault() {
+        return (JmsMessage) super.getFault();
+    }
+
+    public JmsBinding getBinding() {
+        return binding;
+    }
+
+    @Override
+    public Exchange newInstance() {
+        return new JmsExchange(getContext(), binding);
+    }
+
 
     @Override
     protected org.apache.camel.Message createInMessage() {

Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java?view=diff&rev=521236&r1=521235&r2=521236
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java Thu Mar 22 05:19:26 2007
@@ -18,17 +18,25 @@
 package org.apache.camel.component.jms;
 
 import org.apache.camel.InvalidHeaderTypeException;
+import org.apache.camel.Exchange;
 import org.apache.camel.impl.MessageSupport;
 
+import javax.jms.BytesMessage;
 import javax.jms.JMSException;
+import javax.jms.MapMessage;
 import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+import java.io.Serializable;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Map;
 
 /**
  * Represents a {@link org.apache.camel.Message} for working with JMS
- * 
+ *
  * @version $Revision:520964 $
  */
 public class JmsMessage extends MessageSupport {
@@ -42,6 +50,22 @@
         this.jmsMessage = jmsMessage;
     }
 
+
+    @Override
+    public Object getBody() {
+        Object answer = super.getBody();
+        if (answer == null && jmsMessage != null) {
+            answer = getExchange().getBinding().extractBodyFromJms(getExchange(), jmsMessage);
+            setBody(answer);
+        }
+        return answer;
+    }
+
+    @Override
+    public JmsExchange getExchange() {
+        return (JmsExchange) super.getExchange();
+    }
+
     public Message getJmsMessage() {
         return jmsMessage;
     }
@@ -116,4 +140,9 @@
     public JmsMessage newInstance() {
         return new JmsMessage();
     }
+
+
+
+
 }
+

Modified: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java?view=diff&rev=521236&r1=521235&r2=521236
==============================================================================
--- activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java (original)
+++ activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java Thu Mar 22 05:19:26 2007
@@ -17,33 +17,90 @@
  */
 package org.apache.camel.component.jms;
 
-import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.ConnectionFactory;
-
 import junit.framework.TestCase;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.component.jms.JmsExchange;
 import org.apache.camel.builder.RouteBuilder;
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
 import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+import javax.jms.ObjectMessage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @version $Revision$
  */
 public class JmsRouteTest extends TestCase {
-    public void testJmsRoute() throws Exception {
-        final CountDownLatch latch = new CountDownLatch(1);
+    private static final transient Log log = LogFactory.getLog(JmsRouteTest.class);
+    
+    protected JmsExchange receivedExchange;
+    protected CamelContext container = new DefaultCamelContext();
+    protected CountDownLatch latch = new CountDownLatch(1);
+    protected Endpoint<JmsExchange> endpoint;
+
+    public void testJmsRouteWithTextMessage() throws Exception {
+        String expectedBody = "Hello there!";
+
+        // now lets fire in a message
+        JmsExchange exchange = endpoint.createExchange();
+        JmsMessage in = exchange.getIn();
+        in.setBody(expectedBody);
+        in.setHeader("cheese", 123);
+        endpoint.onExchange(exchange);
+
+        // lets wait on the message being received
+        boolean received = latch.await(5, TimeUnit.SECONDS);
+        assertTrue("Did not recieve the message!", received);
+
+        assertNotNull(receivedExchange);
 
-        CamelContext container = new DefaultCamelContext();
+        Object body = receivedExchange.getIn().getBody();
+        log.debug("Received body: " + body);
+        assertEquals("body", expectedBody, body);
 
+        Message jmsMessage = receivedExchange.getIn().getJmsMessage();
+        assertTrue("Received a JMS TextMessage: " + jmsMessage, jmsMessage instanceof TextMessage);
+
+        log.debug("Received JMS message: " + jmsMessage);
+    }
+
+    public void testJmsRouteWithObjectMessage() throws Exception {
+        PurchaseOrder expectedBody = new PurchaseOrder("Beer", 10);
+
+        // now lets fire in a message
+        JmsExchange exchange = endpoint.createExchange();
+        JmsMessage in = exchange.getIn();
+        in.setBody(expectedBody);
+        in.setHeader("cheese", 123);
+        endpoint.onExchange(exchange);
+
+        // lets wait on the message being received
+        boolean received = latch.await(5, TimeUnit.SECONDS);
+        assertTrue("Did not recieve the message!", received);
+
+        assertNotNull(receivedExchange);
+
+        Object body = receivedExchange.getIn().getBody();
+        log.debug("Received body: " + body);
+
+        assertEquals("body", expectedBody, body);
+
+        Message jmsMessage = receivedExchange.getIn().getJmsMessage();
+        assertTrue("Received a JMS TextMessage: " + jmsMessage, jmsMessage instanceof ObjectMessage);
+
+        log.debug("Received JMS message: " + jmsMessage);
+    }
+
+    @Override
+    protected void setUp() throws Exception {
         // lets configure some componnets
         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
         container.addComponent("activemq", jmsComponentClientAcknowledge(connectionFactory));
@@ -55,26 +112,20 @@
                 from("jms:activemq:test.b").process(new Processor<JmsExchange>() {
                     public void onExchange(JmsExchange e) {
                         System.out.println("Received exchange: " + e.getIn());
+                        receivedExchange = e;
                         latch.countDown();
                     }
                 });
             }
         });
+        endpoint = container.resolveEndpoint("jms:activemq:test.a");
+        assertNotNull("No endpoint found!", endpoint);
 
-        
         container.activateEndpoints();
-        
-        // now lets fire in a message
-        Endpoint<JmsExchange> endpoint = container.resolveEndpoint("jms:activemq:test.a");
-        JmsExchange exchange = endpoint.createExchange();
-        //exchange2.setInBody("Hello there!")
-        exchange.getIn().setHeader("cheese", 123);
-        endpoint.onExchange(exchange);
-
-        // now lets sleep for a while
-        boolean received = latch.await(5, TimeUnit.SECONDS);
-        assertTrue("Did not recieve the message!", received);
+    }
 
+    @Override
+    protected void tearDown() throws Exception {
         container.deactivateEndpoints();
     }
 }

Added: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/PurchaseOrder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/PurchaseOrder.java?view=auto&rev=521236
==============================================================================
--- activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/PurchaseOrder.java (added)
+++ activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/PurchaseOrder.java Thu Mar 22 05:19:26 2007
@@ -0,0 +1,68 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import java.io.Serializable;
+
+/**
+ * A simple POJO for testing
+ *
+ * @version $Revision$
+ */
+public class PurchaseOrder implements Serializable {
+    private String product;
+    private double amount;
+
+    public PurchaseOrder(String product, double amount) {
+        this.product = product;
+        this.amount = amount;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (other == null) {
+            return false;
+        }
+        if (this.getClass() != other.getClass()) {
+            return false;
+        }
+        PurchaseOrder that = (PurchaseOrder) other;
+        return this.product.equals(that.product) && this.amount == that.amount;
+    }
+
+    @Override
+    public int hashCode() {
+        return product.hashCode() * 37 + (int) Math.round(amount);
+    }
+
+    @Override
+    public String toString() {
+        return "PurchaseOrder[" + product + " x " + amount + "]";
+    }
+
+    public double getAmount() {
+        return amount;
+    }
+
+    public String getProduct() {
+        return product;
+    }
+}

Propchange: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/PurchaseOrder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/PurchaseOrder.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/PurchaseOrder.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain