You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/05/16 19:44:21 UTC

svn commit: r657147 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Author: rajdavies
Date: Fri May 16 10:44:21 2008
New Revision: 657147

URL: http://svn.apache.org/viewvc?rev=657147&view=rev
Log:
patch for https://issues.apache.org/activemq/browse/AMQ-1661

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=657147&r1=657146&r2=657147&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri May 16 10:44:21 2008
@@ -388,7 +388,7 @@
         }
     }
 
-    protected void serviceRemoteCommand(Command command) {
+    protected void serviceRemoteCommand(Command command) {    	      	
         if (!disposed) {
             try {
                 if (command.isMessageDispatch()) {
@@ -580,9 +580,20 @@
                     final MessageDispatch md = (MessageDispatch)command;
                     DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
                     if (sub != null && md.getMessage()!=null) {
+                    	
+                    	  // See if this consumer's brokerPath tells us it came from the broker at the other end
+                    	  // of the bridge. I think we should be making this decision based on the message's
+                    	  // broker bread crumbs and not the consumer's? However, the message's broker bread
+                    	  // crumbs are null, which is another matter.   
+                    	  boolean cameFromRemote = false;
+                        Object consumerInfo = md.getMessage().getDataStructure(); 
+                        if( consumerInfo != null && (consumerInfo instanceof ConsumerInfo) )                  	                   	  
+                    	    cameFromRemote = contains( ((ConsumerInfo)consumerInfo).getBrokerPath(),remoteBrokerInfo.getBrokerId());                    	                           
+                                            	                     	
                         Message message = configureMessage(md);
                         if (trace) {
                             LOG.trace("bridging " + configuration.getBrokerName() + " -> " + remoteBrokerName + ": " + message);
+                            LOG.trace("cameFromRemote = "+cameFromRemote);    
                         }
 
                         if (!message.isResponseRequired() || isDuplex()) {
@@ -591,9 +602,16 @@
                             // send, we will preserve that QOS
                             // by bridging it using an async send (small chance
                             // of message loss).
-                            remoteBroker.oneway(message);
+                            
+                            // Don't send it off to the remote if it originally came from the remote. 
+                            if( !cameFromRemote ) {
+                               remoteBroker.oneway(message);
+                              }
+                            else{
+                              LOG.info("Message not forwarded on to remote, because message came from remote");                               
+                            }
                             localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));
-                            dequeueCounter.incrementAndGet();
+                            dequeueCounter.incrementAndGet();                          
 
                         } else {