You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/05/05 15:40:37 UTC

activemq git commit: AMQ-6274 - set originalDestination for composite destinations

Repository: activemq
Updated Branches:
  refs/heads/master 7fd5fa925 -> 1ccd17791


AMQ-6274 - set originalDestination for composite destinations

This closes #184


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/1ccd1779
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/1ccd1779
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/1ccd1779

Branch: refs/heads/master
Commit: 1ccd17791b1cb5ba4487adb6d14a819d2d2840e2
Parents: 7fd5fa9
Author: Quinn Stevenson <qu...@pronoia-solutions.com>
Authored: Mon May 2 09:12:12 2016 -0600
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu May 5 11:38:48 2016 -0400

----------------------------------------------------------------------
 .../region/virtual/CompositeDestination.java    |  1 -
 .../virtual/CompositeDestinationFilter.java     |  2 ++
 .../broker/virtual/CompositeQueueTest.java      | 20 ++++++++++++++++++++
 .../MultipleCompositeToPhysicalQueueTest.java   | 13 +++++++++++--
 .../apache/activemq/spring/ConsumerBean.java    |  4 ++++
 5 files changed, 37 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/1ccd1779/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
index 1b976c0..a032710 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
@@ -29,7 +29,6 @@ import org.apache.activemq.command.CommandTypes;
  *
  */
 public abstract class CompositeDestination implements VirtualDestination {
-
     private String name;
     private Collection forwardTo;
     private boolean forwardOnly = true;

http://git-wip-us.apache.org/repos/asf/activemq/blob/1ccd1779/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
index 14e52e7..56506f8 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
@@ -114,8 +114,10 @@ public class CompositeDestinationFilter extends DestinationFilter {
 
     private void doForward(ProducerBrokerExchange context, Message message, Broker regionBroker, ActiveMQDestination destination) throws Exception {
         Message forwarded_message;
+
         if (copyMessage) {
             forwarded_message = message.copy();
+            forwarded_message.setOriginalDestination( message.getDestination() );
             forwarded_message.setDestination(destination);
         }
         else {

http://git-wip-us.apache.org/repos/asf/activemq/blob/1ccd1779/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java
index 3621a14..bcda4e9 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java
@@ -21,6 +21,7 @@ import java.net.URI;
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
@@ -28,13 +29,17 @@ import javax.jms.TextMessage;
 
 import org.apache.activemq.EmbeddedBrokerTestSupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.spring.ConsumerBean;
 import org.apache.activemq.xbean.XBeanBrokerFactory;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.junit.Assert.assertEquals;
+
 /**
  * 
  * 
@@ -48,6 +53,7 @@ public class CompositeQueueTest extends EmbeddedBrokerTestSupport {
     public String messageSelector1, messageSelector2 = null;
 
 
+    @Test
     public void testVirtualTopicCreation() throws Exception {
         if (connection == null) {
             connection = createConnection();
@@ -83,6 +89,8 @@ public class CompositeQueueTest extends EmbeddedBrokerTestSupport {
         }
 
         assertMessagesArrived(messageList1, messageList2);
+        assertOriginalDestination(messageList1, messageList2);
+
     }
 
     protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) {
@@ -90,6 +98,18 @@ public class CompositeQueueTest extends EmbeddedBrokerTestSupport {
         messageList2.assertMessagesArrived(total);
     }
 
+    protected void assertOriginalDestination(ConsumerBean messageList1, ConsumerBean messageList2) {
+        for( Message message: messageList1.getMessages()) {
+            ActiveMQMessage amqMessage = (ActiveMQMessage)message;
+            assertEquals( getProducerDestination(), amqMessage.getOriginalDestination() );
+        }
+
+        for( Message message: messageList1.getMessages()) {
+            ActiveMQMessage amqMessage = (ActiveMQMessage)message;
+            assertEquals( getProducerDestination(), amqMessage.getOriginalDestination() );
+        }
+    }
+
     protected TextMessage createMessage(Session session, int i) throws JMSException {
         TextMessage textMessage = session.createTextMessage("message: " + i);
         if (i % 2 != 0) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/1ccd1779/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MultipleCompositeToPhysicalQueueTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MultipleCompositeToPhysicalQueueTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MultipleCompositeToPhysicalQueueTest.java
index 6c72a11..fa87e2b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MultipleCompositeToPhysicalQueueTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MultipleCompositeToPhysicalQueueTest.java
@@ -25,6 +25,7 @@ import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
@@ -39,6 +40,7 @@ import org.apache.activemq.broker.region.virtual.CompositeQueue;
 import org.apache.activemq.broker.region.virtual.VirtualDestination;
 import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.junit.After;
 import org.junit.Before;
@@ -89,10 +91,17 @@ public class MultipleCompositeToPhysicalQueueTest {
         Session publisherSession = buildSession("Producer", url);
 
         createPublisher(publisherSession, PUB_BROADCAST.getVirtualDestination()).send(publisherSession.createTextMessage("BROADCAST"));
-        assertEquals("BROADCAST", ((TextMessage) consumer.receive()).getText());
+        ActiveMQMessage broadcastMessage = (ActiveMQMessage) consumer.receive();
+        ActiveMQDestination originalDestination = broadcastMessage.getOriginalDestination();
+
+        assertEquals("BROADCAST", ((TextMessage) broadcastMessage).getText());
+        assertEquals( PUB_BROADCAST.getName(), broadcastMessage.getOriginalDestination().getPhysicalName());
 
         createPublisher(publisherSession, PUB_INDIVIDUAL.getVirtualDestination()).send(publisherSession.createTextMessage("INDIVIDUAL"));
-        assertEquals("INDIVIDUAL", ((TextMessage) consumer.receive()).getText());
+        ActiveMQMessage individualMessage = (ActiveMQMessage)consumer.receive();
+
+        assertEquals("INDIVIDUAL", ((TextMessage)individualMessage).getText());
+        assertEquals( PUB_INDIVIDUAL.getName(), individualMessage.getOriginalDestination().getPhysicalName());
     }
 
     private BrokerService createBroker(boolean persistent) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/1ccd1779/activemq-unit-tests/src/test/java/org/apache/activemq/spring/ConsumerBean.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/spring/ConsumerBean.java b/activemq-unit-tests/src/test/java/org/apache/activemq/spring/ConsumerBean.java
index 8f22c33..40904f5 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/spring/ConsumerBean.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/spring/ConsumerBean.java
@@ -144,6 +144,10 @@ public class ConsumerBean extends Assert implements MessageListener {
         this.verbose = verbose;
     }
 
+    public List<Message> getMessages() {
+        return messages;
+    }
+
     /**
      * Identifies if the message is empty.
      *