You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2007/12/06 18:37:35 UTC
svn commit: r601794 - in /activemq/camel/trunk/components/camel-jms/src:
main/java/org/apache/camel/component/jms/
test/java/org/apache/camel/component/jms/
Author: jstrachan
Date: Thu Dec 6 09:37:31 2007
New Revision: 601794
URL: http://svn.apache.org/viewvc?rev=601794&view=rev
Log:
applied another great patch from Roman for https://issues.apache.org/activemq/browse/CAMEL-255
Added:
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/ConsumeJmsBytesMessageTest.java (with props)
Modified:
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/ConsumeJmsMapMessageTest.java
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java?rev=601794&r1=601793&r2=601794&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java Thu Dec 6 09:37:31 2007
@@ -35,8 +35,10 @@
import javax.jms.TextMessage;
import javax.xml.transform.TransformerException;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.converter.jaxp.XmlConverter;
+import org.apache.camel.util.CamelContextHelper;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.commons.logging.Log;
@@ -73,8 +75,10 @@
else if (message instanceof MapMessage) {
return createMapFromMapMessage((MapMessage) message);
}
- else if (message instanceof BytesMessage || message instanceof StreamMessage) {
- // TODO we need a decoder to be able to process the message
+ else if (message instanceof BytesMessage) {
+ return createByteArrayFromBytesMessage((BytesMessage)message);
+ }
+ else if (message instanceof StreamMessage) {
return message;
}
else {
@@ -86,6 +90,15 @@
}
}
+ protected byte[] createByteArrayFromBytesMessage(BytesMessage message) throws JMSException {
+ if (message.getBodyLength() > Integer.MAX_VALUE) {
+ return null;
+ }
+ byte[] result = new byte[(int) message.getBodyLength()];
+ message.readBytes(result);
+ return result;
+ }
+
/**
* Creates a JMS message from the Camel exchange and message
*
@@ -100,7 +113,7 @@
answer = jmsMessage.getJmsMessage();
}
if (answer == null) {
- answer = createJmsMessage(camelMessage.getBody(), session);
+ answer = createJmsMessage(camelMessage.getBody(), session, exchange.getContext());
appendJmsProperties(answer, exchange, camelMessage);
}
return answer;
@@ -142,7 +155,7 @@
}
}
- protected Message createJmsMessage(Object body, Session session) throws JMSException {
+ protected Message createJmsMessage(Object body, Session session, CamelContext context) throws JMSException {
if (body instanceof Node) {
// lets convert the document to a String format
try {
@@ -154,14 +167,40 @@
throw jmsException;
}
}
+ if (body instanceof byte[]) {
+ BytesMessage result = session.createBytesMessage();
+ result.writeBytes((byte[]) body);
+ return result;
+ }
+ if (body instanceof Map) {
+ MapMessage result = session.createMapMessage();
+ Map<?, ?> map = (Map<?, ?>) body;
+ try {
+ populateMapMessage(result, map, context);
+ return result;
+ } catch (JMSException e) {
+ // if MapMessage creation failed then fall back to Object Message
+ }
+ }
if (body instanceof String) {
return session.createTextMessage((String) body);
}
- else if (body instanceof Serializable) {
+ if (body instanceof Serializable) {
return session.createObjectMessage((Serializable) body);
}
- else {
- return session.createMessage();
+ return session.createMessage();
+ }
+
+ /**
+ * Populates a {@link MapMessage} from a {@link Map} instance.
+ */
+ protected void populateMapMessage(MapMessage message, Map<?, ?> map, CamelContext context)
+ throws JMSException {
+ for (Object key : map.keySet()) {
+ String keyString = CamelContextHelper.convertTo(context, String.class, key);
+ if (keyString != null) {
+ message.setObject(keyString, map.get(key));
+ }
}
}
Added: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/ConsumeJmsBytesMessageTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/ConsumeJmsBytesMessageTest.java?rev=601794&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/ConsumeJmsBytesMessageTest.java (added)
+++ activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/ConsumeJmsBytesMessageTest.java Thu Dec 6 09:37:31 2007
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+
+/**
+ * @version $Revision: 591979 $
+ */
+public class ConsumeJmsBytesMessageTest extends ContextTestSupport {
+ protected JmsTemplate jmsTemplate;
+ private MockEndpoint endpoint;
+
+ public void testConsumeMapMessage() throws Exception {
+ endpoint.expectedMessageCount(1);
+
+ jmsTemplate.setPubSubDomain(false);
+ jmsTemplate.send("test.bytes", new MessageCreator() {
+ public Message createMessage(Session session) throws JMSException {
+ BytesMessage bytesMessage = session.createBytesMessage();
+ bytesMessage.writeByte((byte) 1);
+ bytesMessage.writeByte((byte) 2);
+ bytesMessage.writeByte((byte) 3);
+ return bytesMessage;
+ }
+ });
+
+ endpoint.assertIsSatisfied();
+ assertCorrectBytesReceived();
+ }
+
+ protected void assertCorrectBytesReceived() {
+ Exchange exchange = endpoint.getReceivedExchanges().get(0);
+ JmsExchange jmsExchange = assertIsInstanceOf(JmsExchange.class, exchange);
+ byte[] bytes = exchange.getIn().getBody(byte[].class);
+
+ log.info("Received bytes: " + Arrays.toString(bytes));
+
+ assertNotNull("Should have received a bytes message!", bytes);
+ assertIsInstanceOf(BytesMessage.class, jmsExchange.getInMessage());
+ assertEquals("Wrong byte 1", 1, bytes[0]);
+ assertEquals("Wrong payload lentght", 3, bytes.length);
+ }
+
+ public void testSendMapMessage() throws Exception {
+
+ endpoint.expectedMessageCount(1);
+
+ byte[] bytes = new byte[] {1, 2, 3};
+
+ template.sendBody("direct:test", bytes);
+
+ endpoint.assertIsSatisfied();
+ assertCorrectBytesReceived();
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ endpoint = getMockEndpoint("mock:result");
+ }
+
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext camelContext = super.createCamelContext();
+
+ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+ jmsTemplate = new JmsTemplate(connectionFactory);
+ camelContext.addComponent("activemq", jmsComponentClientAcknowledge(connectionFactory));
+
+ return camelContext;
+ }
+
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ from("activemq:test.bytes").to("mock:result");
+ from("direct:test").to("activemq:test.bytes");
+ }
+ };
+ }
+}
Propchange: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/ConsumeJmsBytesMessageTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/ConsumeJmsMapMessageTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/ConsumeJmsMapMessageTest.java?rev=601794&r1=601793&r2=601794&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/ConsumeJmsMapMessageTest.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/ConsumeJmsMapMessageTest.java Thu Dec 6 09:37:31 2007
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.jms;
+import java.util.HashMap;
import java.util.Map;
import javax.jms.ConnectionFactory;
@@ -39,9 +40,9 @@
*/
public class ConsumeJmsMapMessageTest extends ContextTestSupport {
protected JmsTemplate jmsTemplate;
+ private MockEndpoint endpoint;
public void testConsumeMapMessage() throws Exception {
- MockEndpoint endpoint = getMockEndpoint("mock:result");
endpoint.expectedMessageCount(1);
jmsTemplate.setPubSubDomain(false);
@@ -55,17 +56,42 @@
});
endpoint.assertIsSatisfied();
+ assertCorrectMapReceived();
+ }
+
+ protected void assertCorrectMapReceived() {
Exchange exchange = endpoint.getReceivedExchanges().get(0);
+ JmsExchange jmsExchange = assertIsInstanceOf(JmsExchange.class, exchange);
Map map = exchange.getIn().getBody(Map.class);
log.info("Received map: " + map);
assertNotNull("Should have received a map message!", map);
+ assertIsInstanceOf(MapMessage.class, jmsExchange.getInMessage());
assertEquals("map.foo", "abc", map.get("foo"));
assertEquals("map.bar", "xyz", map.get("bar"));
assertEquals("map.size", 2, map.size());
}
+ public void testSendMapMessage() throws Exception {
+
+ endpoint.expectedMessageCount(1);
+
+ Map<String, String> map = new HashMap<String, String>();
+ map.put("foo", "abc");
+ map.put("bar", "xyz");
+
+ template.sendBody("direct:test", map);
+
+ endpoint.assertIsSatisfied();
+ assertCorrectMapReceived();
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ endpoint = getMockEndpoint("mock:result");
+ }
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
@@ -81,7 +107,8 @@
return new RouteBuilder() {
public void configure() throws Exception {
from("activemq:test.map").to("mock:result");
+ from("direct:test").to("activemq:test.map");
}
};
}
-}
\ No newline at end of file
+}