You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/03/22 13:19:27 UTC
svn commit: r521236 - in /activemq/camel/trunk/camel-jms/src:
main/java/org/apache/camel/component/jms/
test/java/org/apache/camel/component/jms/
Author: jstrachan
Date: Thu Mar 22 05:19:26 2007
New Revision: 521236
URL: http://svn.apache.org/viewvc?view=rev&rev=521236
Log:
tidied up the JMS component, added a separate strategy JmsBinding which deals with the actual binding of Camel <-> JMS and tested that the binding uses TextMessage or ObjectMessage based on the payload
Added:
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java (with props)
activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/PurchaseOrder.java (with props)
Modified:
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsExchange.java
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java
Added: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java?view=auto&rev=521236
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java (added)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java Thu Mar 22 05:19:26 2007
@@ -0,0 +1,107 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.TextMessage;
+import javax.jms.MapMessage;
+import javax.jms.BytesMessage;
+import javax.jms.StreamMessage;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Enumeration;
+import java.io.Serializable;
+
+/**
+ * A Strategy used to convert between a Camel {@JmsExchange} and {@JmsMessage} to and from a
+ * JMS {@link Message}
+ *
+ * @version $Revision$
+ */
+public class JmsBinding {
+
+ /**
+ * Creates a JMS message from the Camel exchange and message
+ *
+ * @param session the JMS session used to create the message
+ * @return a newly created JMS Message instance containing the
+ * @throws JMSException if the message could not be created
+ */
+ public Message createJmsMessage(JmsExchange exchange, JmsMessage message, Session session) throws JMSException {
+ Object value = message.getBody();
+ if (value instanceof String) {
+ return session.createTextMessage((String) value);
+ }
+ else if (value instanceof Serializable) {
+ return session.createObjectMessage((Serializable) value);
+ }
+ else {
+ return session.createMessage();
+ }
+ }
+
+ /**
+ * Extracts the body from the JMS message
+ *
+ * @param exchange
+ * @param message
+ */
+ public Object extractBodyFromJms(JmsExchange exchange, Message message) {
+ try {
+ if (message instanceof ObjectMessage) {
+ ObjectMessage objectMessage = (ObjectMessage) message;
+ return objectMessage.getObject();
+ }
+ else if (message instanceof TextMessage) {
+ TextMessage textMessage = (TextMessage) message;
+ return textMessage.getText();
+ }
+ else if (message instanceof MapMessage) {
+ return createMapFromMapMessage((MapMessage) message);
+ }
+ else if (message instanceof BytesMessage || message instanceof StreamMessage) {
+ // TODO we need a decoder to be able to process the message
+ return message;
+ }
+ else {
+ return null;
+ }
+ }
+ catch (JMSException e) {
+ throw new RuntimeJmsException("Failed to extract body due to: " + e + ". Message: " + message, e);
+ }
+ }
+
+
+ /**
+ * Extracts a {@link Map} from a {@link MapMessage}
+ */
+ public Map<String, Object> createMapFromMapMessage(MapMessage message) throws JMSException {
+ Map<String, Object> answer = new HashMap<String, Object>();
+ Enumeration names = message.getPropertyNames();
+ while (names.hasMoreElements()) {
+ String name = names.nextElement().toString();
+ Object value = message.getObject(name);
+ answer.put(name, value);
+ }
+ return answer;
+ }
+}
Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?view=diff&rev=521236&r1=521235&r2=521236
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Thu Mar 22 05:19:26 2007
@@ -35,7 +35,7 @@
*/
public class JmsEndpoint extends DefaultEndpoint<JmsExchange> implements MessageListener {
private static final Log log = LogFactory.getLog(JmsEndpoint.class);
-
+ private JmsBinding binding;
private JmsOperations template;
private AbstractMessageListenerContainer listenerContainer;
private String destination;
@@ -56,8 +56,7 @@
getInboundProcessor().onExchange(exchange);
}
-
- public void send(Exchange exchange) {
+ public void onExchange(Exchange exchange) {
// lets convert to the type of an exchange
JmsExchange jmsExchange = convertTo(JmsExchange.class, exchange);
onExchange(jmsExchange);
@@ -66,7 +65,7 @@
public void onExchange(final JmsExchange exchange) {
template.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
- Message message = exchange.createMessage(session);
+ Message message = getBinding().createJmsMessage(exchange, exchange.getIn(), session);
if (log.isDebugEnabled()) {
log.debug(JmsEndpoint.this + " sending JMS message: " + message);
}
@@ -75,20 +74,38 @@
});
}
- public JmsOperations getTemplate() {
- return template;
+ public JmsExchange createExchange() {
+ return new JmsExchange(getContext(), getBinding());
}
- public JmsExchange createExchange() {
- return new JmsExchange(getContext());
+ public JmsExchange createExchange(Message message) {
+ return new JmsExchange(getContext(), getBinding(), message);
}
+ // Properties
+ //-------------------------------------------------------------------------
+ public JmsBinding getBinding() {
+ if (binding == null) {
+ binding = new JmsBinding();
+ }
+ return binding;
+ }
- public JmsExchange createExchange(Message message) {
- return new JmsExchange(getContext(), message);
+ /**
+ * Sets the binding used to convert from a Camel message to and from a JMS message
+ *
+ * @param binding the binding to use
+ */
+ public void setBinding(JmsBinding binding) {
+ this.binding = binding;
}
+ public JmsOperations getTemplate() {
+ return template;
+ }
+ // Implementation methods
+ //-------------------------------------------------------------------------
protected void doActivate() {
super.doActivate();
listenerContainer.afterPropertiesSet();
Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsExchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsExchange.java?view=diff&rev=521236&r1=521235&r2=521236
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsExchange.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsExchange.java Thu Mar 22 05:19:26 2007
@@ -31,44 +31,42 @@
*/
public class JmsExchange extends DefaultExchange {
- public JmsExchange(CamelContext container) {
- super(container);
+ private JmsBinding binding;
+
+ public JmsExchange(CamelContext context, JmsBinding binding) {
+ super(context);
+ this.binding = binding;
}
- public JmsExchange(CamelContext container, Message message) {
- super(container);
+ public JmsExchange(CamelContext context, JmsBinding binding, Message message) {
+ this(context, binding);
setIn(new JmsMessage(message));
}
@Override
- public Exchange newInstance() {
- return new JmsExchange(getContext());
+ public JmsMessage getIn() {
+ return (JmsMessage) super.getIn();
}
- public Message createMessage(Session session) throws JMSException {
- Message request = getInMessage();
- if (request == null) {
- request = session.createMessage();
-
- /** TODO
- if (lazyHeaders != null) {
- // lets add any lazy headers
- for (Map.Entry<String, Object> entry : lazyHeaders.entrySet()) {
- request.setObjectProperty(entry.getKey(), entry.getValue());
- }
- }
- */
- }
- return request;
- }
-
- public Message getInMessage() {
- JmsMessage jmsMessage = (JmsMessage) getIn();
- if (jmsMessage != null) {
- return jmsMessage.getJmsMessage();
- }
- return null;
+ @Override
+ public JmsMessage getOut() {
+ return (JmsMessage) super.getOut();
}
+
+ @Override
+ public JmsMessage getFault() {
+ return (JmsMessage) super.getFault();
+ }
+
+ public JmsBinding getBinding() {
+ return binding;
+ }
+
+ @Override
+ public Exchange newInstance() {
+ return new JmsExchange(getContext(), binding);
+ }
+
@Override
protected org.apache.camel.Message createInMessage() {
Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java?view=diff&rev=521236&r1=521235&r2=521236
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java Thu Mar 22 05:19:26 2007
@@ -18,17 +18,25 @@
package org.apache.camel.component.jms;
import org.apache.camel.InvalidHeaderTypeException;
+import org.apache.camel.Exchange;
import org.apache.camel.impl.MessageSupport;
+import javax.jms.BytesMessage;
import javax.jms.JMSException;
+import javax.jms.MapMessage;
import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+import java.io.Serializable;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
/**
* Represents a {@link org.apache.camel.Message} for working with JMS
- *
+ *
* @version $Revision:520964 $
*/
public class JmsMessage extends MessageSupport {
@@ -42,6 +50,22 @@
this.jmsMessage = jmsMessage;
}
+
+ @Override
+ public Object getBody() {
+ Object answer = super.getBody();
+ if (answer == null && jmsMessage != null) {
+ answer = getExchange().getBinding().extractBodyFromJms(getExchange(), jmsMessage);
+ setBody(answer);
+ }
+ return answer;
+ }
+
+ @Override
+ public JmsExchange getExchange() {
+ return (JmsExchange) super.getExchange();
+ }
+
public Message getJmsMessage() {
return jmsMessage;
}
@@ -116,4 +140,9 @@
public JmsMessage newInstance() {
return new JmsMessage();
}
+
+
+
+
}
+
Modified: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java?view=diff&rev=521236&r1=521235&r2=521236
==============================================================================
--- activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java (original)
+++ activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java Thu Mar 22 05:19:26 2007
@@ -17,33 +17,90 @@
*/
package org.apache.camel.component.jms;
-import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.ConnectionFactory;
-
import junit.framework.TestCase;
-
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.component.jms.JmsExchange;
import org.apache.camel.builder.RouteBuilder;
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+import javax.jms.ObjectMessage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
/**
* @version $Revision$
*/
public class JmsRouteTest extends TestCase {
- public void testJmsRoute() throws Exception {
- final CountDownLatch latch = new CountDownLatch(1);
+ private static final transient Log log = LogFactory.getLog(JmsRouteTest.class);
+
+ protected JmsExchange receivedExchange;
+ protected CamelContext container = new DefaultCamelContext();
+ protected CountDownLatch latch = new CountDownLatch(1);
+ protected Endpoint<JmsExchange> endpoint;
+
+ public void testJmsRouteWithTextMessage() throws Exception {
+ String expectedBody = "Hello there!";
+
+ // now lets fire in a message
+ JmsExchange exchange = endpoint.createExchange();
+ JmsMessage in = exchange.getIn();
+ in.setBody(expectedBody);
+ in.setHeader("cheese", 123);
+ endpoint.onExchange(exchange);
+
+ // lets wait on the message being received
+ boolean received = latch.await(5, TimeUnit.SECONDS);
+ assertTrue("Did not recieve the message!", received);
+
+ assertNotNull(receivedExchange);
- CamelContext container = new DefaultCamelContext();
+ Object body = receivedExchange.getIn().getBody();
+ log.debug("Received body: " + body);
+ assertEquals("body", expectedBody, body);
+ Message jmsMessage = receivedExchange.getIn().getJmsMessage();
+ assertTrue("Received a JMS TextMessage: " + jmsMessage, jmsMessage instanceof TextMessage);
+
+ log.debug("Received JMS message: " + jmsMessage);
+ }
+
+ public void testJmsRouteWithObjectMessage() throws Exception {
+ PurchaseOrder expectedBody = new PurchaseOrder("Beer", 10);
+
+ // now lets fire in a message
+ JmsExchange exchange = endpoint.createExchange();
+ JmsMessage in = exchange.getIn();
+ in.setBody(expectedBody);
+ in.setHeader("cheese", 123);
+ endpoint.onExchange(exchange);
+
+ // lets wait on the message being received
+ boolean received = latch.await(5, TimeUnit.SECONDS);
+ assertTrue("Did not recieve the message!", received);
+
+ assertNotNull(receivedExchange);
+
+ Object body = receivedExchange.getIn().getBody();
+ log.debug("Received body: " + body);
+
+ assertEquals("body", expectedBody, body);
+
+ Message jmsMessage = receivedExchange.getIn().getJmsMessage();
+ assertTrue("Received a JMS TextMessage: " + jmsMessage, jmsMessage instanceof ObjectMessage);
+
+ log.debug("Received JMS message: " + jmsMessage);
+ }
+
+ @Override
+ protected void setUp() throws Exception {
// lets configure some componnets
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
container.addComponent("activemq", jmsComponentClientAcknowledge(connectionFactory));
@@ -55,26 +112,20 @@
from("jms:activemq:test.b").process(new Processor<JmsExchange>() {
public void onExchange(JmsExchange e) {
System.out.println("Received exchange: " + e.getIn());
+ receivedExchange = e;
latch.countDown();
}
});
}
});
+ endpoint = container.resolveEndpoint("jms:activemq:test.a");
+ assertNotNull("No endpoint found!", endpoint);
-
container.activateEndpoints();
-
- // now lets fire in a message
- Endpoint<JmsExchange> endpoint = container.resolveEndpoint("jms:activemq:test.a");
- JmsExchange exchange = endpoint.createExchange();
- //exchange2.setInBody("Hello there!")
- exchange.getIn().setHeader("cheese", 123);
- endpoint.onExchange(exchange);
-
- // now lets sleep for a while
- boolean received = latch.await(5, TimeUnit.SECONDS);
- assertTrue("Did not recieve the message!", received);
+ }
+ @Override
+ protected void tearDown() throws Exception {
container.deactivateEndpoints();
}
}
Added: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/PurchaseOrder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/PurchaseOrder.java?view=auto&rev=521236
==============================================================================
--- activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/PurchaseOrder.java (added)
+++ activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/PurchaseOrder.java Thu Mar 22 05:19:26 2007
@@ -0,0 +1,68 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import java.io.Serializable;
+
+/**
+ * A simple POJO for testing
+ *
+ * @version $Revision$
+ */
+public class PurchaseOrder implements Serializable {
+ private String product;
+ private double amount;
+
+ public PurchaseOrder(String product, double amount) {
+ this.product = product;
+ this.amount = amount;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other == null) {
+ return false;
+ }
+ if (this.getClass() != other.getClass()) {
+ return false;
+ }
+ PurchaseOrder that = (PurchaseOrder) other;
+ return this.product.equals(that.product) && this.amount == that.amount;
+ }
+
+ @Override
+ public int hashCode() {
+ return product.hashCode() * 37 + (int) Math.round(amount);
+ }
+
+ @Override
+ public String toString() {
+ return "PurchaseOrder[" + product + " x " + amount + "]";
+ }
+
+ public double getAmount() {
+ return amount;
+ }
+
+ public String getProduct() {
+ return product;
+ }
+}
Propchange: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/PurchaseOrder.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/PurchaseOrder.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/PurchaseOrder.java
------------------------------------------------------------------------------
svn:mime-type = text/plain