You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by dk...@apache.org on 2015/07/30 16:55:45 UTC
activemq git commit: AMQ-5903 - add patch that fixes the broker camel
component to take all header values
Repository: activemq
Updated Branches:
refs/heads/activemq-5.11.x e756111d0 -> 256bcf334
AMQ-5903 - add patch that fixes the broker camel component to take all header values
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/256bcf33
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/256bcf33
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/256bcf33
Branch: refs/heads/activemq-5.11.x
Commit: 256bcf3342ea8c31f4f06cfcab3330d396a845d6
Parents: e756111
Author: Heath Kesler <he...@gmail.com>
Authored: Wed Jul 29 17:48:16 2015 -0600
Committer: Daniel Kulp <dk...@apache.org>
Committed: Thu Jul 30 10:52:30 2015 -0400
----------------------------------------------------------------------
.../camel/component/broker/BrokerProducer.java | 89 +++++++-------------
.../broker/BrokerComponentXMLConfigTest.java | 55 ++++++++----
.../activemq/camel/component/broker/camel.xml | 45 +++++-----
3 files changed, 94 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/256bcf33/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java
----------------------------------------------------------------------
diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java
index fcf1256..82adad4 100644
--- a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java
+++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java
@@ -16,17 +16,17 @@
*/
package org.apache.activemq.camel.component.broker;
-import java.util.Map;
-
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.jms.JmsMessage;
-import org.apache.camel.converter.ObjectConverter;
import org.apache.camel.impl.DefaultAsyncProducer;
+import javax.jms.JMSException;
+import java.util.Map;
+
public class BrokerProducer extends DefaultAsyncProducer {
private final BrokerEndpoint brokerEndpoint;
@@ -53,6 +53,7 @@ public class BrokerProducer extends DefaultAsyncProducer {
protected boolean processInOnly(final Exchange exchange, final AsyncCallback callback) {
try {
ActiveMQMessage message = getMessage(exchange);
+
if (message != null) {
message.setDestination(brokerEndpoint.getDestination());
//if the ProducerBrokerExchange is null the broker will create it
@@ -67,76 +68,48 @@ public class BrokerProducer extends DefaultAsyncProducer {
return true;
}
- private ActiveMQMessage getMessage(Exchange exchange) throws Exception {
- ActiveMQMessage result;
- Message camelMessage;
+ private ActiveMQMessage getMessage(Exchange exchange) throws IllegalStateException, JMSException {
+ Message camelMessage = getMessageFromExchange(exchange);
+ checkOriginalMessage(camelMessage);
+ ActiveMQMessage result = (ActiveMQMessage) ((JmsMessage) camelMessage).getJmsMessage();
+ applyNewHeaders(result, camelMessage.getHeaders());
+ return result;
+ }
+
+ private Message getMessageFromExchange(Exchange exchange) {
if (exchange.hasOut()) {
- camelMessage = exchange.getOut();
- } else {
- camelMessage = exchange.getIn();
+ return exchange.getOut();
}
- Map<String, Object> headers = camelMessage.getHeaders();
+ return exchange.getIn();
+ }
+ private void checkOriginalMessage(Message camelMessage) throws IllegalStateException {
/**
* We purposely don't want to support injecting messages half-way through
* broker processing - use the activemq camel component for that - but
- * we will support changing message headers and destinations
+ * we will support changing message headers and destinations.
*/
- if (camelMessage instanceof JmsMessage) {
- JmsMessage jmsMessage = (JmsMessage) camelMessage;
- if (jmsMessage.getJmsMessage() instanceof ActiveMQMessage) {
- result = (ActiveMQMessage) jmsMessage.getJmsMessage();
- //lets apply any new message headers
- setJmsHeaders(result, headers);
- } else {
- throw new IllegalStateException("Not the original message from the broker " + jmsMessage.getJmsMessage());
- }
- } else {
+
+ if (!(camelMessage instanceof JmsMessage)) {
throw new IllegalStateException("Not the original message from the broker " + camelMessage);
}
- return result;
+ javax.jms.Message message = ((JmsMessage) camelMessage).getJmsMessage();
+
+ if (!(message instanceof ActiveMQMessage)) {
+ throw new IllegalStateException("Not the original message from the broker " + message);
+ }
}
- private void setJmsHeaders(ActiveMQMessage message, Map<String, Object> headers) {
- message.setReadOnlyProperties(false);
+ private void applyNewHeaders(ActiveMQMessage message, Map<String, Object> headers) throws JMSException {
for (Map.Entry<String, Object> entry : headers.entrySet()) {
- if (entry.getKey().equalsIgnoreCase("JMSDeliveryMode")) {
- Object value = entry.getValue();
- if (value instanceof Number) {
- Number number = (Number) value;
- message.setJMSDeliveryMode(number.intValue());
- }
- }
- if (entry.getKey().equalsIgnoreCase("JmsPriority")) {
- Integer value = ObjectConverter.toInteger(entry.getValue());
- if (value != null) {
- message.setJMSPriority(value.intValue());
- }
- }
- if (entry.getKey().equalsIgnoreCase("JMSTimestamp")) {
- Long value = ObjectConverter.toLong(entry.getValue());
- if (value != null) {
- message.setJMSTimestamp(value.longValue());
- }
- }
- if (entry.getKey().equalsIgnoreCase("JMSExpiration")) {
- Long value = ObjectConverter.toLong(entry.getValue());
- if (value != null) {
- message.setJMSExpiration(value.longValue());
- }
- }
- if (entry.getKey().equalsIgnoreCase("JMSRedelivered")) {
- message.setJMSRedelivered(ObjectConverter.toBool(entry.getValue()));
- }
- if (entry.getKey().equalsIgnoreCase("JMSType")) {
- Object value = entry.getValue();
- if (value != null) {
- message.setJMSType(value.toString());
- }
+ String key = entry.getKey();
+ Object value = entry.getValue();
+ if(value == null) {
+ continue;
}
+ message.setObjectProperty(key, value.toString(), false);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/256bcf33/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java
----------------------------------------------------------------------
diff --git a/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java b/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java
index c2fc3f6..2773baa 100644
--- a/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java
+++ b/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java
@@ -16,22 +16,10 @@
*/
package org.apache.activemq.camel.component.broker;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.junit.After;
@@ -41,6 +29,14 @@ import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
+import javax.jms.*;
+import java.util.Enumeration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
public class BrokerComponentXMLConfigTest {
protected static final String CONF_ROOT = "src/test/resources/org/apache/activemq/camel/component/broker/";
@@ -70,7 +66,6 @@ public class BrokerComponentXMLConfigTest {
producerConnection = factory.createConnection();
producerConnection.start();
consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
@@ -133,7 +128,6 @@ public class BrokerComponentXMLConfigTest {
latch.await(timeOutInSeconds, TimeUnit.SECONDS);
assertEquals(0, latch.getCount());
-
}
@Test
@@ -179,4 +173,35 @@ public class BrokerComponentXMLConfigTest {
assertEquals(0, divertLatch.getCount());
}
+ @Test
+ public void testPreserveOriginalHeaders() throws Exception {
+ final ActiveMQQueue queue = new ActiveMQQueue(QUEUE_NAME);
+
+ Topic topic = consumerSession.createTopic(TOPIC_NAME);
+
+ final CountDownLatch latch = new CountDownLatch(messageCount);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ consumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(javax.jms.Message message) {
+ try {
+ assertEquals("321", message.getStringProperty("JMSXGroupID"));
+ assertEquals("custom", message.getStringProperty("CustomHeader"));
+ latch.countDown();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ MessageProducer producer = producerSession.createProducer(topic);
+
+ for (int i = 0; i < messageCount; i++) {
+ javax.jms.Message message = producerSession.createTextMessage("test: " + i);
+ message.setStringProperty("JMSXGroupID", "123");
+ producer.send(message);
+ }
+
+ latch.await(timeOutInSeconds, TimeUnit.SECONDS);
+ assertEquals(0, latch.getCount());
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/256bcf33/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml
----------------------------------------------------------------------
diff --git a/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml b/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml
index 750c134..b84350b 100644
--- a/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml
+++ b/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml
@@ -15,45 +15,46 @@
limitations under the License.
-->
-<beans
- xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="
- http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
- http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<camelContext id="camel" trace="false" xmlns="http://camel.apache.org/schema/spring">
-
- <!-- You can use Spring XML syntax to define the routes here using the <route> element -->
<route id="brokerComponentTest">
<from uri="broker:topic:test.broker.>"/>
<setHeader headerName="JMSPriority">
<constant>9</constant>
</setHeader>
+ <setHeader headerName="JMSXGroupID">
+ <constant>321</constant>
+ </setHeader>
+ <setHeader headerName="CustomHeader">
+ <constant>custom</constant>
+ </setHeader>
<to uri="broker:queue:test.broker.component.queue"/>
</route>
- <route id="brokerComponentDLQAboveLimitTest">
- <from uri="broker:queue:test.broker.component.route"/>
- <choice>
- <when>
- <spel>#{@destinationView.enqueueCount >= 100}</spel>
- <to uri="broker:queue:test.broker.component.ProcessLater"/>
- </when>
- <otherwise>
- <to uri="broker:queue:test.broker.component.route"/>
- </otherwise>
- </choice>
+ <route id="brokerComponentDLQAboveLimitTest">
+ <from uri="broker:queue:test.broker.component.route"/>
+ <choice>
+ <when>
+ <spel>#{@destinationView.enqueueCount >= 100}</spel>
+ <to uri="broker:queue:test.broker.component.ProcessLater"/>
+ </when>
+ <otherwise>
+ <to uri="broker:queue:test.broker.component.route"/>
+ </otherwise>
+ </choice>
</route>
-
-
</camelContext>
+
<bean id="brokerView" class="org.apache.activemq.broker.view.MessageBrokerView">
<constructor-arg value="testBroker"/>
</bean>
+
<bean id="destinationView" factory-bean="brokerView" factory-method="getDestinationView">
<constructor-arg value="test.broker.component.route"/>
-
</bean>
</beans>