You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/06/04 19:30:52 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-5209

Repository: activemq
Updated Branches:
  refs/heads/trunk be0311bea -> 6c703ac6e


https://issues.apache.org/jira/browse/AMQ-5209

Add MessageId and original destination info to the advisory message
properties. 

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6c703ac6
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6c703ac6
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6c703ac6

Branch: refs/heads/trunk
Commit: 6c703ac6ee58bb27fa3a16a4e695a3b50c50a9f7
Parents: be0311b
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Jun 4 13:30:42 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Jun 4 13:30:42 2014 -0400

----------------------------------------------------------------------
 .../activemq/advisory/AdvisoryBroker.java       | 21 ++++++++++++++++++--
 .../activemq/advisory/AdvisorySupport.java      |  2 ++
 .../advisory/AdvisoryTempDestinationTests.java  |  2 +-
 3 files changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/6c703ac6/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index d48bf16..2583a23 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -349,7 +349,13 @@ public class AdvisoryBroker extends BrokerFilter {
                 ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(messageReference.getMessage().getDestination());
                 Message payload = messageReference.getMessage().copy();
                 payload.clearBody();
-                fireAdvisory(context, topic, payload);
+                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
+                ActiveMQDestination destination = payload.getDestination();
+                if (destination != null) {
+                    advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, payload.getMessageId().toString());
+                }
+                fireAdvisory(context, topic, payload, null, advisoryMessage);
             }
         } catch (Exception e) {
             handleFireFailure("consumed", e);
@@ -364,7 +370,13 @@ public class AdvisoryBroker extends BrokerFilter {
                 ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(messageReference.getMessage().getDestination());
                 Message payload = messageReference.getMessage().copy();
                 payload.clearBody();
-                fireAdvisory(context, topic, payload);
+                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
+                ActiveMQDestination destination = payload.getDestination();
+                if (destination != null) {
+                    advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, payload.getMessageId().toString());
+                }
+                fireAdvisory(context, topic, payload, null, advisoryMessage);
             }
         } catch (Exception e) {
             handleFireFailure("delivered", e);
@@ -383,7 +395,12 @@ public class AdvisoryBroker extends BrokerFilter {
                 if (sub instanceof TopicSubscription) {
                     advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT, ((TopicSubscription) sub).discarded());
                 }
+                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
                 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, sub.getConsumerInfo().getConsumerId().toString());
+                ActiveMQDestination destination = payload.getDestination();
+                if (destination != null) {
+                    advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, payload.getMessageId().toString());
+                }
                 fireAdvisory(context, topic, payload, null, advisoryMessage);
             }
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/6c703ac6/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java b/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
index bcf1a07..b26c600 100755
--- a/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
+
 import org.apache.activemq.ActiveMQMessageTransformation;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -61,6 +62,7 @@ public final class AdvisorySupport {
     public static final String MSG_PROPERTY_CONSUMER_ID = "consumerId";
     public static final String MSG_PROPERTY_PRODUCER_ID = "producerId";
     public static final String MSG_PROPERTY_MESSAGE_ID = "orignalMessageId";
+    public static final String MSG_PROPERTY_DESTINATION = "orignalDestination";
     public static final String MSG_PROPERTY_CONSUMER_COUNT = "consumerCount";
     public static final String MSG_PROPERTY_DISCARDED_COUNT = "discardedCount";
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/6c703ac6/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java
index b0bfd82..5e20f79 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java
@@ -158,7 +158,7 @@ public class AdvisoryTempDestinationTests extends TestCase {
             producer.send(m);
         }
 
-        Message msg = advisoryConsumer.receive(2000);
+        Message msg = advisoryConsumer.receive(5000);
         assertNotNull(msg);
     }