You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by dk...@apache.org on 2015/07/30 16:55:45 UTC

activemq git commit: AMQ-5903 - add patch that fixes the broker camel component to take all header values

Repository: activemq
Updated Branches:
  refs/heads/activemq-5.11.x e756111d0 -> 256bcf334


AMQ-5903 - add patch that fixes the broker camel component to take all header values


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/256bcf33
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/256bcf33
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/256bcf33

Branch: refs/heads/activemq-5.11.x
Commit: 256bcf3342ea8c31f4f06cfcab3330d396a845d6
Parents: e756111
Author: Heath Kesler <he...@gmail.com>
Authored: Wed Jul 29 17:48:16 2015 -0600
Committer: Daniel Kulp <dk...@apache.org>
Committed: Thu Jul 30 10:52:30 2015 -0400

----------------------------------------------------------------------
 .../camel/component/broker/BrokerProducer.java  | 89 +++++++-------------
 .../broker/BrokerComponentXMLConfigTest.java    | 55 ++++++++----
 .../activemq/camel/component/broker/camel.xml   | 45 +++++-----
 3 files changed, 94 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/256bcf33/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java
----------------------------------------------------------------------
diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java
index fcf1256..82adad4 100644
--- a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java
+++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java
@@ -16,17 +16,17 @@
  */
 package org.apache.activemq.camel.component.broker;
 
-import java.util.Map;
-
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.component.jms.JmsMessage;
-import org.apache.camel.converter.ObjectConverter;
 import org.apache.camel.impl.DefaultAsyncProducer;
 
+import javax.jms.JMSException;
+import java.util.Map;
+
 public class BrokerProducer extends DefaultAsyncProducer {
     private final BrokerEndpoint brokerEndpoint;
 
@@ -53,6 +53,7 @@ public class BrokerProducer extends DefaultAsyncProducer {
     protected boolean processInOnly(final Exchange exchange, final AsyncCallback callback) {
         try {
             ActiveMQMessage message = getMessage(exchange);
+
             if (message != null) {
                 message.setDestination(brokerEndpoint.getDestination());
                 //if the ProducerBrokerExchange is null the broker will create it
@@ -67,76 +68,48 @@ public class BrokerProducer extends DefaultAsyncProducer {
         return true;
     }
 
-    private ActiveMQMessage getMessage(Exchange exchange) throws Exception {
-        ActiveMQMessage result;
-        Message camelMessage;
+    private ActiveMQMessage getMessage(Exchange exchange) throws IllegalStateException, JMSException {
+        Message camelMessage = getMessageFromExchange(exchange);
+        checkOriginalMessage(camelMessage);
+        ActiveMQMessage result = (ActiveMQMessage) ((JmsMessage) camelMessage).getJmsMessage();
+        applyNewHeaders(result, camelMessage.getHeaders());
+        return result;
+    }
+
+    private Message getMessageFromExchange(Exchange exchange) {
         if (exchange.hasOut()) {
-            camelMessage = exchange.getOut();
-        } else {
-            camelMessage = exchange.getIn();
+            return exchange.getOut();
         }
 
-        Map<String, Object> headers = camelMessage.getHeaders();
+        return exchange.getIn();
+    }
 
+    private void checkOriginalMessage(Message camelMessage) throws IllegalStateException {
         /**
          * We purposely don't want to support injecting messages half-way through
          * broker processing - use the activemq camel component for that - but
-         * we will support changing message headers and destinations
+         * we will support changing message headers and destinations.
          */
-        if (camelMessage instanceof JmsMessage) {
-            JmsMessage jmsMessage = (JmsMessage) camelMessage;
-            if (jmsMessage.getJmsMessage() instanceof ActiveMQMessage) {
-                result = (ActiveMQMessage) jmsMessage.getJmsMessage();
-                //lets apply any new message headers
-                setJmsHeaders(result, headers);
-            } else {
-                throw new IllegalStateException("Not the original message from the broker " + jmsMessage.getJmsMessage());
-            }
-        } else {
+
+        if (!(camelMessage instanceof JmsMessage)) {
             throw new IllegalStateException("Not the original message from the broker " + camelMessage);
         }
 
-        return result;
+        javax.jms.Message message = ((JmsMessage) camelMessage).getJmsMessage();
+
+        if (!(message instanceof ActiveMQMessage)) {
+            throw new IllegalStateException("Not the original message from the broker " + message);
+        }
     }
 
-    private void setJmsHeaders(ActiveMQMessage message, Map<String, Object> headers) {
-        message.setReadOnlyProperties(false);
+    private void applyNewHeaders(ActiveMQMessage message, Map<String, Object> headers) throws JMSException {
         for (Map.Entry<String, Object> entry : headers.entrySet()) {
-            if (entry.getKey().equalsIgnoreCase("JMSDeliveryMode")) {
-                Object value = entry.getValue();
-                if (value instanceof Number) {
-                    Number number = (Number) value;
-                    message.setJMSDeliveryMode(number.intValue());
-                }
-            }
-            if (entry.getKey().equalsIgnoreCase("JmsPriority")) {
-                Integer value = ObjectConverter.toInteger(entry.getValue());
-                if (value != null) {
-                    message.setJMSPriority(value.intValue());
-                }
-            }
-            if (entry.getKey().equalsIgnoreCase("JMSTimestamp")) {
-                Long value = ObjectConverter.toLong(entry.getValue());
-                if (value != null) {
-                    message.setJMSTimestamp(value.longValue());
-                }
-            }
-            if (entry.getKey().equalsIgnoreCase("JMSExpiration")) {
-                Long value = ObjectConverter.toLong(entry.getValue());
-                if (value != null) {
-                    message.setJMSExpiration(value.longValue());
-                }
-            }
-            if (entry.getKey().equalsIgnoreCase("JMSRedelivered")) {
-                message.setJMSRedelivered(ObjectConverter.toBool(entry.getValue()));
-            }
-            if (entry.getKey().equalsIgnoreCase("JMSType")) {
-                Object value = entry.getValue();
-                if (value != null) {
-                    message.setJMSType(value.toString());
-                }
+            String key = entry.getKey();
+            Object value = entry.getValue();
+            if(value == null) {
+                continue;
             }
+            message.setObjectProperty(key, value.toString(), false);
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/256bcf33/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java
----------------------------------------------------------------------
diff --git a/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java b/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java
index c2fc3f6..2773baa 100644
--- a/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java
+++ b/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java
@@ -16,22 +16,10 @@
  */
 package org.apache.activemq.camel.component.broker;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerRegistry;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.xbean.BrokerFactoryBean;
 import org.junit.After;
@@ -41,6 +29,14 @@ import org.springframework.core.io.ClassPathResource;
 import org.springframework.core.io.FileSystemResource;
 import org.springframework.core.io.Resource;
 
+import javax.jms.*;
+import java.util.Enumeration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 public class BrokerComponentXMLConfigTest {
 
     protected static final String CONF_ROOT = "src/test/resources/org/apache/activemq/camel/component/broker/";
@@ -70,7 +66,6 @@ public class BrokerComponentXMLConfigTest {
         producerConnection = factory.createConnection();
         producerConnection.start();
         consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
         producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
     }
 
@@ -133,7 +128,6 @@ public class BrokerComponentXMLConfigTest {
 
         latch.await(timeOutInSeconds, TimeUnit.SECONDS);
         assertEquals(0, latch.getCount());
-
     }
 
     @Test
@@ -179,4 +173,35 @@ public class BrokerComponentXMLConfigTest {
         assertEquals(0, divertLatch.getCount());
     }
 
+    @Test
+    public void testPreserveOriginalHeaders() throws Exception {
+        final ActiveMQQueue queue = new ActiveMQQueue(QUEUE_NAME);
+
+        Topic topic = consumerSession.createTopic(TOPIC_NAME);
+
+        final CountDownLatch latch = new CountDownLatch(messageCount);
+        MessageConsumer consumer = consumerSession.createConsumer(queue);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(javax.jms.Message message) {
+                try {
+                    assertEquals("321", message.getStringProperty("JMSXGroupID"));
+                    assertEquals("custom", message.getStringProperty("CustomHeader"));
+                    latch.countDown();
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+        MessageProducer producer = producerSession.createProducer(topic);
+
+        for (int i = 0; i < messageCount; i++) {
+            javax.jms.Message message = producerSession.createTextMessage("test: " + i);
+            message.setStringProperty("JMSXGroupID", "123");
+            producer.send(message);
+        }
+
+        latch.await(timeOutInSeconds, TimeUnit.SECONDS);
+        assertEquals(0, latch.getCount());
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/256bcf33/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml
----------------------------------------------------------------------
diff --git a/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml b/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml
index 750c134..b84350b 100644
--- a/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml
+++ b/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml
@@ -15,45 +15,46 @@
    limitations under the License.
 -->
 
-<beans
-        xmlns="http://www.springframework.org/schema/beans"
-        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-        xsi:schemaLocation="
-     http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
-     http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+                           http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
 
     <camelContext id="camel" trace="false" xmlns="http://camel.apache.org/schema/spring">
-
-        <!-- You can use Spring XML syntax to define the routes here using the <route> element -->
         <route id="brokerComponentTest">
             <from uri="broker:topic:test.broker.>"/>
             <setHeader headerName="JMSPriority">
                 <constant>9</constant>
             </setHeader>
+            <setHeader headerName="JMSXGroupID">
+                <constant>321</constant>
+            </setHeader>
+            <setHeader headerName="CustomHeader">
+                <constant>custom</constant>
+            </setHeader>
             <to uri="broker:queue:test.broker.component.queue"/>
         </route>
 
-    <route id="brokerComponentDLQAboveLimitTest">
-        <from uri="broker:queue:test.broker.component.route"/>
-        <choice>
-            <when>
-                <spel>#{@destinationView.enqueueCount >= 100}</spel>
-                <to uri="broker:queue:test.broker.component.ProcessLater"/>
-            </when>
-            <otherwise>
-                <to uri="broker:queue:test.broker.component.route"/>
-            </otherwise>
-        </choice>
+        <route id="brokerComponentDLQAboveLimitTest">
+            <from uri="broker:queue:test.broker.component.route"/>
+            <choice>
+                <when>
+                    <spel>#{@destinationView.enqueueCount >= 100}</spel>
+                    <to uri="broker:queue:test.broker.component.ProcessLater"/>
+                </when>
+                <otherwise>
+                    <to uri="broker:queue:test.broker.component.route"/>
+                </otherwise>
+            </choice>
         </route>
-
-
     </camelContext>
+
     <bean id="brokerView" class="org.apache.activemq.broker.view.MessageBrokerView">
         <constructor-arg value="testBroker"/>
     </bean>
+
     <bean id="destinationView" factory-bean="brokerView" factory-method="getDestinationView">
         <constructor-arg value="test.broker.component.route"/>
-
     </bean>
 </beans>