You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/07/29 10:57:32 UTC

svn commit: r560696 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/TransportConnection.java network/DemandForwardingBridgeSupport.java

Author: rajdavies
Date: Sun Jul 29 01:57:31 2007
New Revision: 560696

URL: http://svn.apache.org/viewvc?view=rev&rev=560696
Log:
Fix for http://issues.apache.org/activemq/browse/AMQ-920

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=560696&r1=560695&r2=560696
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Sun Jul 29 01:57:31 2007
@@ -15,6 +15,7 @@
 package org.apache.activemq.broker;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -29,7 +30,6 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.ft.MasterBroker;
 import org.apache.activemq.broker.region.ConnectionStatistics;
@@ -80,11 +80,13 @@
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.DefaultTransportListener;
+import org.apache.activemq.transport.ResponseCorrelator;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.MarshallingSupport;
 import org.apache.activemq.util.ServiceSupport;
+import org.apache.activemq.util.URISupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -834,7 +836,7 @@
     public synchronized void start() throws Exception{
         starting=true;
         try{
-        	transport.start();
+            transport.start();
         	
         	if (taskRunnerFactory != null) {
 				taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: " + getRemoteAddress());
@@ -1090,12 +1092,21 @@
                 NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
                 IntrospectionSupport.setProperties(config,props,"");
                 config.setBrokerName(broker.getBrokerName());
-                Transport localTransport = TransportFactory.connect(broker.getVmConnectorURI());
-                duplexBridge = NetworkBridgeFactory.createBridge(config,localTransport,transport);
+                URI uri = broker.getVmConnectorURI();
+                HashMap map = new HashMap(URISupport.parseParamters(uri));
+                map.put("network", "true");
+                map.put("async","false");
+                uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
+                Transport localTransport = TransportFactory.connect(uri);
+                Transport remoteBridgeTransport = new ResponseCorrelator(transport);
+                duplexBridge = NetworkBridgeFactory.createBridge(config,localTransport,remoteBridgeTransport);
                 //now turn duplex off this side
+                info.setDuplexConnection(false);
                 duplexBridge.setCreatedByDuplex(true);
-                duplexBridge.start();
+                duplexBridge.duplexStart(brokerInfo,info);
+                
                 log.info("Created Duplex Bridge back to " + info.getBrokerName());
+                return null;
             }catch(Exception e){
                log.error("Creating duplex network bridge",e);
             }
@@ -1103,7 +1114,6 @@
         // We only expect to get one broker info command per connection
         if(this.brokerInfo!=null){
             log.warn("Unexpected extra broker info command received: "+info);
-            Thread.dumpStack();
         }
         this.brokerInfo=info;
         broker.addBroker(this,info);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?view=diff&rev=560696&r1=560695&r2=560696
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Sun Jul 29 01:57:31 2007
@@ -27,6 +27,7 @@
 
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTempDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.BrokerId;
@@ -110,103 +111,107 @@
 
     final AtomicLong enqueueCounter = new AtomicLong();
     final AtomicLong dequeueCounter = new AtomicLong();
+    private AtomicBoolean started = new AtomicBoolean();
     
     public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
         this.configuration=configuration;
         this.localBroker = localBroker;
         this.remoteBroker = remoteBroker;
     }
-
-    public void start() throws Exception {
-        localBroker.setTransportListener(new DefaultTransportListener(){
-            public void onCommand(Object o){
-            	Command command = (Command) o;
-                serviceLocalCommand(command);
-            }
-    
-            public void onException(IOException error){
-                serviceLocalException(error);
-            }
-        });
-        remoteBroker.setTransportListener(new TransportListener(){
-            public void onCommand(Object o){
-            	Command command = (Command) o;
-                serviceRemoteCommand(command);
-            }
     
-            public void onException(IOException error){
-                serviceRemoteException(error);
-            }
-    
-            public void transportInterupted(){
-                //clear any subscriptions - to try and prevent the bridge from stalling the broker
-                if( remoteInterupted.compareAndSet(false, true) ) {
-                	
-                    log.info("Outbound transport to " + remoteBrokerName + " interrupted.");                                        
-
-                	if( localBridgeStarted.get() ) {
-	                    clearDownSubscriptions();
-	                    synchronized( DemandForwardingBridgeSupport.this ) {
-		                    try{
-		                        localBroker.oneway(localConnectionInfo.createRemoveCommand());
-		                    }catch(TransportDisposedIOException td){
-                                log.debug("local broker is now disposed",td);
+    public void duplexStart(BrokerInfo localBrokerInfo,BrokerInfo remoteBrokerInfo) throws Exception{
+        this.localBrokerInfo=localBrokerInfo;
+        this.remoteBrokerInfo=remoteBrokerInfo;
+        start();
+        serviceRemoteCommand(remoteBrokerInfo);
+    }
+
+    public void start() throws Exception{
+        if(started.compareAndSet(false,true)){
+            localBroker.setTransportListener(new DefaultTransportListener(){
+
+                public void onCommand(Object o){
+                    Command command=(Command)o;
+                    serviceLocalCommand(command);
+                }
+
+                public void onException(IOException error){
+                    serviceLocalException(error);
+                }
+            });
+            remoteBroker.setTransportListener(new TransportListener(){
+
+                public void onCommand(Object o){
+                    Command command=(Command)o;
+                    serviceRemoteCommand(command);
+                }
+
+                public void onException(IOException error){
+                    serviceRemoteException(error);
+                }
+
+                public void transportInterupted(){
+                    // clear any subscriptions - to try and prevent the bridge from stalling the broker
+                    if(remoteInterupted.compareAndSet(false,true)){
+                        log.info("Outbound transport to "+remoteBrokerName+" interrupted.");
+                        if(localBridgeStarted.get()){
+                            clearDownSubscriptions();
+                            synchronized(DemandForwardingBridgeSupport.this){
+                                try{
+                                    localBroker.oneway(localConnectionInfo.createRemoveCommand());
+                                }catch(TransportDisposedIOException td){
+                                    log.debug("local broker is now disposed",td);
+                                }catch(IOException e){
+                                    log.warn("Caught exception from local start",e);
+                                }
                             }
-                            catch(IOException e){
-		                        log.warn("Caught exception from local start",e);
-		                    }
-	                    }
-                	}
-                	
-                    localBridgeStarted.set(false);
-                    remoteBridgeStarted.set(false);
-                    startedLatch = new CountDownLatch(2);
+                        }
+                        localBridgeStarted.set(false);
+                        remoteBridgeStarted.set(false);
+                        startedLatch=new CountDownLatch(2);
+                    }
                 }
-                
-            }
-    
-            public void transportResumed(){                
-                if( remoteInterupted.compareAndSet(true, false) ) {
-                	
-                	// We want to slow down false connects so that we don't get in a busy loop.
-                	// False connects can occurr if you using SSH tunnels.
-                	if( !lastConnectSucceeded.get() ) {
-                		try {
-                            log.debug("Previous connection was never fully established. Sleeping for second to avoid busy loop.");                                        
-							Thread.sleep(1000);
-						} catch (InterruptedException e) {
-							Thread.currentThread().interrupt();
-						}
-                	}
-                    lastConnectSucceeded.set(false);
-                    try {
-                        startLocalBridge();
-                        remoteBridgeStarted.set(true);
-                        startedLatch.countDown();
-                        log.info("Outbound transport to " + remoteBrokerName + " resumed");   
-                    }catch(Exception e) {
-                        log.error("Caught exception  from local start in resume transport",e );
+
+                public void transportResumed(){
+                    if(remoteInterupted.compareAndSet(true,false)){
+                        // We want to slow down false connects so that we don't get in a busy loop.
+                        // False connects can occurr if you using SSH tunnels.
+                        if(!lastConnectSucceeded.get()){
+                            try{
+                                log
+                                        .debug("Previous connection was never fully established. Sleeping for second to avoid busy loop.");
+                                Thread.sleep(1000);
+                            }catch(InterruptedException e){
+                                Thread.currentThread().interrupt();
+                            }
+                        }
+                        lastConnectSucceeded.set(false);
+                        try{
+                            startLocalBridge();
+                            remoteBridgeStarted.set(true);
+                            startedLatch.countDown();
+                            log.info("Outbound transport to "+remoteBrokerName+" resumed");
+                        }catch(Exception e){
+                            log.error("Caught exception  from local start in resume transport",e);
+                        }
                     }
-                                                         
                 }
+            });
+
+            localBroker.start();
+            remoteBroker.start();
+            try{
+                triggerRemoteStartBridge();
+            }catch(IOException e){
+                log.warn("Caught exception from remote start",e);
+            }
+            NetworkBridgeListener l=this.networkBridgeListener;
+            if(l!=null){
+                l.onStart(this);
             }
-        });
-        localBroker.start();
-        remoteBroker.start();
-        
-        try{                    	
-            triggerRemoteStartBridge();
-        }catch(IOException e){
-            log.warn("Caught exception from remote start",e);
-        }
-        
-        NetworkBridgeListener l = this.networkBridgeListener;
-        if (l!=null) {
-            l.onStart(this);
         }
-
     }
-
+    
     protected void triggerLocalStartBridge() throws IOException {
         Thread thead=new Thread(){
             public void run(){
@@ -259,95 +264,88 @@
         }
     }
 
-    protected void startRemoteBridge() throws Exception {
-        if(remoteBridgeStarted.compareAndSet(false,true)) {
-    
-        	synchronized (this) {
-        		        		
-            	if( remoteConnectionInfo!=null ) {
-            		remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
-            	}
-            	
+    protected void startRemoteBridge() throws Exception{
+        if(remoteBridgeStarted.compareAndSet(false,true)){
+            synchronized(this){
+                if(isCreatedByDuplex()==false){
+                    BrokerInfo brokerInfo=new BrokerInfo();
+                    brokerInfo.setBrokerName(configuration.getBrokerName());
+                    brokerInfo.setNetworkConnection(true);
+                    brokerInfo.setDuplexConnection(configuration.isDuplex());
+                    // set our properties
+                    Properties props=new Properties();
+                    IntrospectionSupport.getProperties(this,props,null);
+                    String str=MarshallingSupport.propertiesToString(props);
+                    brokerInfo.setNetworkProperties(str);
+                    remoteBroker.oneway(brokerInfo);
+                }
+                if(remoteConnectionInfo!=null){
+                    remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
+                }
                 remoteConnectionInfo=new ConnectionInfo();
                 remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
                 remoteConnectionInfo.setClientId("NC_"+configuration.getBrokerName()+"_outbound");
                 remoteConnectionInfo.setUserName(configuration.getUserName());
                 remoteConnectionInfo.setPassword(configuration.getPassword());
                 remoteBroker.oneway(remoteConnectionInfo);
-                if (isCreatedByDuplex()==false) {
-                BrokerInfo brokerInfo=new BrokerInfo();
-                brokerInfo.setBrokerName(configuration.getBrokerName());
-                brokerInfo.setNetworkConnection(true);
-                brokerInfo.setDuplexConnection(configuration.isDuplex());
-              
-                //set our properties
-                Properties props = new Properties();
-                IntrospectionSupport.getProperties(this,props,null); 
-                String str = MarshallingSupport.propertiesToString(props);
-                brokerInfo.setNetworkProperties(str);                
-                remoteBroker.oneway(brokerInfo);
-                }
-
+                
                 SessionInfo remoteSessionInfo=new SessionInfo(remoteConnectionInfo,1);
                 remoteBroker.oneway(remoteSessionInfo);
-
                 producerInfo=new ProducerInfo(remoteSessionInfo,1);
                 producerInfo.setResponseRequired(false);
                 remoteBroker.oneway(producerInfo);
-
                 // Listen to consumer advisory messages on the remote broker to determine demand.
                 demandConsumerInfo=new ConsumerInfo(remoteSessionInfo,1);
                 demandConsumerInfo.setDispatchAsync(configuration.isDispatchAsync());
-                String advisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX+configuration.getDestinationFilter();
-                if( configuration.isBridgeTempDestinations() ) {
-                	advisoryTopic += ","+AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
+                String advisoryTopic=AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX
+                        +configuration.getDestinationFilter();
+                if(configuration.isBridgeTempDestinations()){
+                    advisoryTopic+=","+AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
                 }
                 demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
                 demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
-                remoteBroker.oneway(demandConsumerInfo);                
+                remoteBroker.oneway(demandConsumerInfo);
                 startedLatch.countDown();
-                
-                if (!disposed){
+                if(!disposed){
                     triggerLocalStartBridge();
                 }
-        		
-        	}
+            }
         }
     }
 
     public void stop() throws Exception{
-        log.debug(" stopping "+configuration.getBrokerName()+" bridge to "+remoteBrokerName+" is disposed already ? "+disposed);
-        boolean wasDisposedAlready=disposed;
-        if(!disposed){
-            NetworkBridgeListener l = this.networkBridgeListener;
-            if (l!=null) {
-                l.onStop(this);
+        if(started.compareAndSet(true,false)){
+            log.debug(" stopping "+configuration.getBrokerName()+" bridge to "+remoteBrokerName
+                    +" is disposed already ? "+disposed);
+            boolean wasDisposedAlready=disposed;
+            if(!disposed){
+                NetworkBridgeListener l=this.networkBridgeListener;
+                if(l!=null){
+                    l.onStop(this);
+                }
+                try{
+                    disposed=true;
+                    remoteBridgeStarted.set(false);
+                    localBroker.oneway(new ShutdownInfo());
+                    remoteBroker.oneway(new ShutdownInfo());
+                }catch(IOException e){
+                    log.debug("Caught exception stopping",e);
+                }finally{
+                    ServiceStopper ss=new ServiceStopper();
+                    ss.stop(localBroker);
+                    ss.stop(remoteBroker);
+                    // Release the started Latch since another thread could be stuck waiting for it to start up.
+                    startedLatch.countDown();
+                    startedLatch.countDown();
+                    ss.throwFirstException();
+                }
             }
-
-            try{
-                disposed=true;
-                remoteBridgeStarted.set(false);
-                localBroker.oneway(new ShutdownInfo());
-                remoteBroker.oneway(new ShutdownInfo());
-            }catch(IOException e){
-                log.debug("Caught exception stopping",e);
-            }finally{
-                ServiceStopper ss=new ServiceStopper();
-                ss.stop(localBroker);
-                ss.stop(remoteBroker);
-                
-				// Release the started Latch since another thread could be stuck waiting for it to start up.
-				startedLatch.countDown();
-				startedLatch.countDown();
-
-                ss.throwFirstException();
+            if(wasDisposedAlready){
+                log.debug(configuration.getBrokerName()+" bridge to "+remoteBrokerName+" stopped");
+            }else{
+                log.info(configuration.getBrokerName()+" bridge to "+remoteBrokerName+" stopped");
             }
         }
-        if(wasDisposedAlready){
-            log.debug(configuration.getBrokerName()+" bridge to "+remoteBrokerName+" stopped");
-        }else{
-            log.info(configuration.getBrokerName()+" bridge to "+remoteBrokerName+" stopped");
-        }
     }
     
     public void serviceRemoteException(Throwable error){
@@ -370,12 +368,12 @@
         }
     }
 
-    protected void serviceRemoteCommand(Command command) {
+    protected void serviceRemoteCommand(Command command){
         if(!disposed){
             try{
                 if(command.isMessageDispatch()){
                     waitStarted();
-                    MessageDispatch md=(MessageDispatch) command;
+                    MessageDispatch md=(MessageDispatch)command;
                     serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
                     demandConsumerDispatched++;
                     if(demandConsumerDispatched>(demandConsumerInfo.getPrefetchSize()*.75)){
@@ -383,117 +381,137 @@
                         demandConsumerDispatched=0;
                     }
                 }else if(command.isBrokerInfo()){
-                	
-                	lastConnectSucceeded.set(true);
-                        remoteBrokerInfo = ((BrokerInfo)command);
-
-                	serviceRemoteBrokerInfo(command);
-                	// Let the local broker know the remote broker's ID.
-                	localBroker.oneway(command);
-                    
-                }else if(command.getClass() == ConnectionError.class ) {
-                	ConnectionError ce = (ConnectionError) command;
-                	serviceRemoteException(ce.getException());
+                    lastConnectSucceeded.set(true);
+                    remoteBrokerInfo=((BrokerInfo)command);
+                    serviceRemoteBrokerInfo(command);
+                    // Let the local broker know the remote broker's ID.
+                    localBroker.oneway(command);
+                }else if(command.getClass()==ConnectionError.class){
+                    ConnectionError ce=(ConnectionError)command;
+                    serviceRemoteException(ce.getException());
                 }else{
-                    switch(command.getDataStructureType()){
-                    case KeepAliveInfo.DATA_STRUCTURE_TYPE:
-                    case WireFormatInfo.DATA_STRUCTURE_TYPE:
-                    case ShutdownInfo.DATA_STRUCTURE_TYPE:
-                        break;
-                    default:
-                        log.warn("Unexpected remote command: "+command);
+                    if(configuration.isDuplex()||createdByDuplex){
+                        if(command.isMessage()){
+                            ActiveMQMessage message=(ActiveMQMessage)command;
+                            if(AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())){
+                                serviceRemoteConsumerAdvisory(message.getDataStructure());
+                            }else{
+                                localBroker.oneway(message);
+                            }
+                        }else{
+                            switch(command.getDataStructureType()){
+                            case ConnectionInfo.DATA_STRUCTURE_TYPE:
+                            case SessionInfo.DATA_STRUCTURE_TYPE:
+                            case ProducerInfo.DATA_STRUCTURE_TYPE:
+                                localBroker.oneway(command);
+                                break;
+                            case ConsumerInfo.DATA_STRUCTURE_TYPE:
+                                if(!addConsumerInfo((ConsumerInfo)command)){
+                                    if(log.isDebugEnabled())
+                                        log.debug("Ignoring ConsumerInfo: "+command);
+                                }
+                                break;
+                            default:
+                                if(log.isDebugEnabled())
+                                    log.debug("Ignoring remote command: "+command);
+                            }
+                        }
+                    }else{
+                        switch(command.getDataStructureType()){
+                        case KeepAliveInfo.DATA_STRUCTURE_TYPE:
+                        case WireFormatInfo.DATA_STRUCTURE_TYPE:
+                        case ShutdownInfo.DATA_STRUCTURE_TYPE:
+                            break;
+                        default:
+                            log.warn("Unexpected remote command: "+command);
+                        }
                     }
                 }
-            }catch(Exception e){
+            }catch(Throwable e){
                 serviceRemoteException(e);
             }
         }
     }
 
-    private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
-       
-        final int networkTTL = configuration.getNetworkTTL();
+    private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException{
+        final int networkTTL=configuration.getNetworkTTL();
         if(data.getClass()==ConsumerInfo.class){
             // Create a new local subscription
-            ConsumerInfo info=(ConsumerInfo) data;
+            ConsumerInfo info=(ConsumerInfo)data;
             BrokerId[] path=info.getBrokerPath();
-            if((path!=null&&path.length>= networkTTL)){
+            if((path!=null&&path.length>=networkTTL)){
                 if(log.isDebugEnabled())
-                    log.debug(configuration.getBrokerName() + " Ignoring Subscription " + info + " restricted to " + networkTTL + " network hops only");
+                    log.debug(configuration.getBrokerName()+" Ignoring Subscription "+info+" restricted to "+networkTTL
+                            +" network hops only");
                 return;
             }
             if(contains(info.getBrokerPath(),localBrokerPath[0])){
                 // Ignore this consumer as it's a consumer we locally sent to the broker.
                 if(log.isDebugEnabled())
-                    log.debug(configuration.getBrokerName()  + " Ignoring sub " + info + " already routed through this broker once");
+                    log.debug(configuration.getBrokerName()+" Ignoring sub "+info
+                            +" already routed through this broker once");
                 return;
             }
-            if (!isPermissableDestination(info.getDestination())){
-                //ignore if not in the permited or in the excluded list
+            if(!isPermissableDestination(info.getDestination())){
+                // ignore if not in the permited or in the excluded list
                 if(log.isDebugEnabled())
-                    log.debug(configuration.getBrokerName()  + " Ignoring sub " + info + " destination " + info.getDestination() + " is not permiited");
+                    log.debug(configuration.getBrokerName()+" Ignoring sub "+info+" destination "+info.getDestination()
+                            +" is not permiited");
                 return;
             }
-            // Update the packet to show where it came from.
-            info=info.copy();
-            addRemoteBrokerToBrokerPath(info);
-            DemandSubscription sub=createDemandSubscription(info);
-            if (sub != null){
-                addSubscription(sub);
+            if(addConsumerInfo(info)){
                 if(log.isDebugEnabled())
-                    log.debug(configuration.getBrokerName() + " Forwarding sub on "+localBroker+" from "+remoteBrokerName+" :  "+info);
-            }else {
+                    log.debug(configuration.getBrokerName()+" Forwarding sub on "+localBroker+" from "+remoteBrokerName
+                            +" :  "+info);
+            }else{
                 if(log.isDebugEnabled())
-                    log.debug(configuration.getBrokerName()  + " Ignoring sub " + info + " already subscribed to matching destination");
+                    log.debug(configuration.getBrokerName()+" Ignoring sub "+info
+                            +" already subscribed to matching destination");
             }
-        }
-        else if (data.getClass()==DestinationInfo.class){
-//          It's a destination info - we want to pass up
-            //infomation about temporary destinations 
-            DestinationInfo destInfo = (DestinationInfo) data;
+        }else if(data.getClass()==DestinationInfo.class){
+            // It's a destination info - we want to pass up
+            // infomation about temporary destinations
+            DestinationInfo destInfo=(DestinationInfo)data;
             BrokerId[] path=destInfo.getBrokerPath();
-            if((path!=null&&path.length>= networkTTL)){
+            if((path!=null&&path.length>=networkTTL)){
                 if(log.isDebugEnabled())
-                    log.debug("Ignoring Subscription " + destInfo + " restricted to " + networkTTL + " network hops only");
+                    log.debug("Ignoring Subscription "+destInfo+" restricted to "+networkTTL+" network hops only");
                 return;
             }
             if(contains(destInfo.getBrokerPath(),localBrokerPath[0])){
                 // Ignore this consumer as it's a consumer we locally sent to the broker.
                 if(log.isDebugEnabled())
-                    log.debug("Ignoring sub " + destInfo + " already routed through this broker once");
+                    log.debug("Ignoring sub "+destInfo+" already routed through this broker once");
                 return;
             }
-            
             destInfo.setConnectionId(localConnectionInfo.getConnectionId());
-            if (destInfo.getDestination() instanceof ActiveMQTempDestination){
-                //re-set connection id so comes from here
-                ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
+            if(destInfo.getDestination() instanceof ActiveMQTempDestination){
+                // re-set connection id so comes from here
+                ActiveMQTempDestination tempDest=(ActiveMQTempDestination)destInfo.getDestination();
                 tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
             }
-                        
             destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(),getRemoteBrokerPath()));
-            
-            log.debug("Replying destination control command: "+destInfo);            
+            log.debug("Replying destination control command: "+destInfo);
             localBroker.oneway(destInfo);
-            
-        }
-        else if(data.getClass()==RemoveInfo.class){
-            ConsumerId id=(ConsumerId) ((RemoveInfo) data).getObjectId();
+        }else if(data.getClass()==RemoveInfo.class){
+            ConsumerId id=(ConsumerId)((RemoveInfo)data).getObjectId();
             removeDemandSubscription(id);
         }
     }
 
-    public void serviceLocalException(Throwable error) {
-    	if( !disposed ) {
-	        log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown due to a local error: "+error);
-	        log.debug("The local Exception was:"+error,error);
-	        new Thread() {
-	        	public void run() {
-	                ServiceSupport.dispose(DemandForwardingBridgeSupport.this);
-	        	}
-	        }.start();
+    public void serviceLocalException(Throwable error){
+        if(!disposed){
+            log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown due to a local error: "
+                    +error);
+            log.debug("The local Exception was:"+error,error);
+            new Thread(){
+
+                public void run(){
+                    ServiceSupport.dispose(DemandForwardingBridgeSupport.this);
+                }
+            }.start();
             fireBridgeFailed();
-    	}
+        }
     }
 
     protected void addSubscription(DemandSubscription sub) throws IOException {
@@ -741,8 +759,7 @@
                 }
             }
             return false;
-        }
-    
+        } 
         return true;
     }
 
@@ -766,6 +783,18 @@
             } 
         }
     }
+    
+    protected boolean addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
+        boolean result = false;
+        ConsumerInfo info=consumerInfo.copy();
+        addRemoteBrokerToBrokerPath(info);
+        DemandSubscription sub=createDemandSubscription(info);
+        if (sub != null){
+            addSubscription(sub);
+            result = true;
+        }
+        return result;
+    }
 
     protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
         return doCreateDemandSubscription(info);
@@ -775,7 +804,13 @@
         DemandSubscription result=new DemandSubscription(info);
         result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
                         .getNextSequenceId()));
-        
+        if (info.getDestination().isTemporary()) {
+            //reset the local connection Id
+          
+            ActiveMQTempDestination dest = (ActiveMQTempDestination)result.getLocalInfo().getDestination();
+            dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
+        }
+                
         if( configuration.isDecreaseNetworkConsumerPriority() ) {
             byte priority=ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
             if(priority>Byte.MIN_VALUE&&info.getBrokerPath()!=null&&info.getBrokerPath().length>1){
@@ -828,7 +863,7 @@
         subscriptionMapByLocalId.clear();
         subscriptionMapByRemoteId.clear();
     }
-
+    
     protected abstract NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException;
 
     protected abstract void serviceLocalBrokerInfo(Command command) throws InterruptedException;