You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/06/04 19:30:52 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-5209
Repository: activemq
Updated Branches:
refs/heads/trunk be0311bea -> 6c703ac6e
https://issues.apache.org/jira/browse/AMQ-5209
Add MessageId and original destination info to the advisory message
properties.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6c703ac6
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6c703ac6
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6c703ac6
Branch: refs/heads/trunk
Commit: 6c703ac6ee58bb27fa3a16a4e695a3b50c50a9f7
Parents: be0311b
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Jun 4 13:30:42 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Jun 4 13:30:42 2014 -0400
----------------------------------------------------------------------
.../activemq/advisory/AdvisoryBroker.java | 21 ++++++++++++++++++--
.../activemq/advisory/AdvisorySupport.java | 2 ++
.../advisory/AdvisoryTempDestinationTests.java | 2 +-
3 files changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/6c703ac6/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index d48bf16..2583a23 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -349,7 +349,13 @@ public class AdvisoryBroker extends BrokerFilter {
ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(messageReference.getMessage().getDestination());
Message payload = messageReference.getMessage().copy();
payload.clearBody();
- fireAdvisory(context, topic, payload);
+ ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+ advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
+ ActiveMQDestination destination = payload.getDestination();
+ if (destination != null) {
+ advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, payload.getMessageId().toString());
+ }
+ fireAdvisory(context, topic, payload, null, advisoryMessage);
}
} catch (Exception e) {
handleFireFailure("consumed", e);
@@ -364,7 +370,13 @@ public class AdvisoryBroker extends BrokerFilter {
ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(messageReference.getMessage().getDestination());
Message payload = messageReference.getMessage().copy();
payload.clearBody();
- fireAdvisory(context, topic, payload);
+ ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+ advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
+ ActiveMQDestination destination = payload.getDestination();
+ if (destination != null) {
+ advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, payload.getMessageId().toString());
+ }
+ fireAdvisory(context, topic, payload, null, advisoryMessage);
}
} catch (Exception e) {
handleFireFailure("delivered", e);
@@ -383,7 +395,12 @@ public class AdvisoryBroker extends BrokerFilter {
if (sub instanceof TopicSubscription) {
advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT, ((TopicSubscription) sub).discarded());
}
+ advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, sub.getConsumerInfo().getConsumerId().toString());
+ ActiveMQDestination destination = payload.getDestination();
+ if (destination != null) {
+ advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, payload.getMessageId().toString());
+ }
fireAdvisory(context, topic, payload, null, advisoryMessage);
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/6c703ac6/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java b/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
index bcf1a07..b26c600 100755
--- a/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
import javax.jms.Destination;
import javax.jms.JMSException;
+
import org.apache.activemq.ActiveMQMessageTransformation;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
@@ -61,6 +62,7 @@ public final class AdvisorySupport {
public static final String MSG_PROPERTY_CONSUMER_ID = "consumerId";
public static final String MSG_PROPERTY_PRODUCER_ID = "producerId";
public static final String MSG_PROPERTY_MESSAGE_ID = "orignalMessageId";
+ public static final String MSG_PROPERTY_DESTINATION = "orignalDestination";
public static final String MSG_PROPERTY_CONSUMER_COUNT = "consumerCount";
public static final String MSG_PROPERTY_DISCARDED_COUNT = "discardedCount";
http://git-wip-us.apache.org/repos/asf/activemq/blob/6c703ac6/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java
index b0bfd82..5e20f79 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java
@@ -158,7 +158,7 @@ public class AdvisoryTempDestinationTests extends TestCase {
producer.send(m);
}
- Message msg = advisoryConsumer.receive(2000);
+ Message msg = advisoryConsumer.receive(5000);
assertNotNull(msg);
}