You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/05/16 19:44:21 UTC
svn commit: r657147 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Author: rajdavies
Date: Fri May 16 10:44:21 2008
New Revision: 657147
URL: http://svn.apache.org/viewvc?rev=657147&view=rev
Log:
patch for https://issues.apache.org/activemq/browse/AMQ-1661
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=657147&r1=657146&r2=657147&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri May 16 10:44:21 2008
@@ -388,7 +388,7 @@
}
}
- protected void serviceRemoteCommand(Command command) {
+ protected void serviceRemoteCommand(Command command) {
if (!disposed) {
try {
if (command.isMessageDispatch()) {
@@ -580,9 +580,20 @@
final MessageDispatch md = (MessageDispatch)command;
DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
if (sub != null && md.getMessage()!=null) {
+
+ // See if this consumer's brokerPath tells us it came from the broker at the other end
+ // of the bridge. I think we should be making this decision based on the message's
+ // broker bread crumbs and not the consumer's? However, the message's broker bread
+ // crumbs are null, which is another matter.
+ boolean cameFromRemote = false;
+ Object consumerInfo = md.getMessage().getDataStructure();
+ if( consumerInfo != null && (consumerInfo instanceof ConsumerInfo) )
+ cameFromRemote = contains( ((ConsumerInfo)consumerInfo).getBrokerPath(),remoteBrokerInfo.getBrokerId());
+
Message message = configureMessage(md);
if (trace) {
LOG.trace("bridging " + configuration.getBrokerName() + " -> " + remoteBrokerName + ": " + message);
+ LOG.trace("cameFromRemote = "+cameFromRemote);
}
if (!message.isResponseRequired() || isDuplex()) {
@@ -591,9 +602,16 @@
// send, we will preserve that QOS
// by bridging it using an async send (small chance
// of message loss).
- remoteBroker.oneway(message);
+
+ // Don't send it off to the remote if it originally came from the remote.
+ if( !cameFromRemote ) {
+ remoteBroker.oneway(message);
+ }
+ else{
+ LOG.info("Message not forwarded on to remote, because message came from remote");
+ }
localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));
- dequeueCounter.incrementAndGet();
+ dequeueCounter.incrementAndGet();
} else {