You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/07/29 10:57:32 UTC
svn commit: r560696 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
broker/TransportConnection.java network/DemandForwardingBridgeSupport.java
Author: rajdavies
Date: Sun Jul 29 01:57:31 2007
New Revision: 560696
URL: http://svn.apache.org/viewvc?view=rev&rev=560696
Log:
Fix for http://issues.apache.org/activemq/browse/AMQ-920
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=560696&r1=560695&r2=560696
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Sun Jul 29 01:57:31 2007
@@ -15,6 +15,7 @@
package org.apache.activemq.broker;
import java.io.IOException;
+import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -29,7 +30,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-
import org.apache.activemq.Service;
import org.apache.activemq.broker.ft.MasterBroker;
import org.apache.activemq.broker.region.ConnectionStatistics;
@@ -80,11 +80,13 @@
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.DefaultTransportListener;
+import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.ServiceSupport;
+import org.apache.activemq.util.URISupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -834,7 +836,7 @@
public synchronized void start() throws Exception{
starting=true;
try{
- transport.start();
+ transport.start();
if (taskRunnerFactory != null) {
taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: " + getRemoteAddress());
@@ -1090,12 +1092,21 @@
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
IntrospectionSupport.setProperties(config,props,"");
config.setBrokerName(broker.getBrokerName());
- Transport localTransport = TransportFactory.connect(broker.getVmConnectorURI());
- duplexBridge = NetworkBridgeFactory.createBridge(config,localTransport,transport);
+ URI uri = broker.getVmConnectorURI();
+ HashMap map = new HashMap(URISupport.parseParamters(uri));
+ map.put("network", "true");
+ map.put("async","false");
+ uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
+ Transport localTransport = TransportFactory.connect(uri);
+ Transport remoteBridgeTransport = new ResponseCorrelator(transport);
+ duplexBridge = NetworkBridgeFactory.createBridge(config,localTransport,remoteBridgeTransport);
//now turn duplex off this side
+ info.setDuplexConnection(false);
duplexBridge.setCreatedByDuplex(true);
- duplexBridge.start();
+ duplexBridge.duplexStart(brokerInfo,info);
+
log.info("Created Duplex Bridge back to " + info.getBrokerName());
+ return null;
}catch(Exception e){
log.error("Creating duplex network bridge",e);
}
@@ -1103,7 +1114,6 @@
// We only expect to get one broker info command per connection
if(this.brokerInfo!=null){
log.warn("Unexpected extra broker info command received: "+info);
- Thread.dumpStack();
}
this.brokerInfo=info;
broker.addBroker(this,info);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?view=diff&rev=560696&r1=560695&r2=560696
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Sun Jul 29 01:57:31 2007
@@ -27,6 +27,7 @@
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId;
@@ -110,103 +111,107 @@
final AtomicLong enqueueCounter = new AtomicLong();
final AtomicLong dequeueCounter = new AtomicLong();
+ private AtomicBoolean started = new AtomicBoolean();
public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
this.configuration=configuration;
this.localBroker = localBroker;
this.remoteBroker = remoteBroker;
}
-
- public void start() throws Exception {
- localBroker.setTransportListener(new DefaultTransportListener(){
- public void onCommand(Object o){
- Command command = (Command) o;
- serviceLocalCommand(command);
- }
-
- public void onException(IOException error){
- serviceLocalException(error);
- }
- });
- remoteBroker.setTransportListener(new TransportListener(){
- public void onCommand(Object o){
- Command command = (Command) o;
- serviceRemoteCommand(command);
- }
- public void onException(IOException error){
- serviceRemoteException(error);
- }
-
- public void transportInterupted(){
- //clear any subscriptions - to try and prevent the bridge from stalling the broker
- if( remoteInterupted.compareAndSet(false, true) ) {
-
- log.info("Outbound transport to " + remoteBrokerName + " interrupted.");
-
- if( localBridgeStarted.get() ) {
- clearDownSubscriptions();
- synchronized( DemandForwardingBridgeSupport.this ) {
- try{
- localBroker.oneway(localConnectionInfo.createRemoveCommand());
- }catch(TransportDisposedIOException td){
- log.debug("local broker is now disposed",td);
+ public void duplexStart(BrokerInfo localBrokerInfo,BrokerInfo remoteBrokerInfo) throws Exception{
+ this.localBrokerInfo=localBrokerInfo;
+ this.remoteBrokerInfo=remoteBrokerInfo;
+ start();
+ serviceRemoteCommand(remoteBrokerInfo);
+ }
+
+ public void start() throws Exception{
+ if(started.compareAndSet(false,true)){
+ localBroker.setTransportListener(new DefaultTransportListener(){
+
+ public void onCommand(Object o){
+ Command command=(Command)o;
+ serviceLocalCommand(command);
+ }
+
+ public void onException(IOException error){
+ serviceLocalException(error);
+ }
+ });
+ remoteBroker.setTransportListener(new TransportListener(){
+
+ public void onCommand(Object o){
+ Command command=(Command)o;
+ serviceRemoteCommand(command);
+ }
+
+ public void onException(IOException error){
+ serviceRemoteException(error);
+ }
+
+ public void transportInterupted(){
+ // clear any subscriptions - to try and prevent the bridge from stalling the broker
+ if(remoteInterupted.compareAndSet(false,true)){
+ log.info("Outbound transport to "+remoteBrokerName+" interrupted.");
+ if(localBridgeStarted.get()){
+ clearDownSubscriptions();
+ synchronized(DemandForwardingBridgeSupport.this){
+ try{
+ localBroker.oneway(localConnectionInfo.createRemoveCommand());
+ }catch(TransportDisposedIOException td){
+ log.debug("local broker is now disposed",td);
+ }catch(IOException e){
+ log.warn("Caught exception from local start",e);
+ }
}
- catch(IOException e){
- log.warn("Caught exception from local start",e);
- }
- }
- }
-
- localBridgeStarted.set(false);
- remoteBridgeStarted.set(false);
- startedLatch = new CountDownLatch(2);
+ }
+ localBridgeStarted.set(false);
+ remoteBridgeStarted.set(false);
+ startedLatch=new CountDownLatch(2);
+ }
}
-
- }
-
- public void transportResumed(){
- if( remoteInterupted.compareAndSet(true, false) ) {
-
- // We want to slow down false connects so that we don't get in a busy loop.
- // False connects can occurr if you using SSH tunnels.
- if( !lastConnectSucceeded.get() ) {
- try {
- log.debug("Previous connection was never fully established. Sleeping for second to avoid busy loop.");
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- lastConnectSucceeded.set(false);
- try {
- startLocalBridge();
- remoteBridgeStarted.set(true);
- startedLatch.countDown();
- log.info("Outbound transport to " + remoteBrokerName + " resumed");
- }catch(Exception e) {
- log.error("Caught exception from local start in resume transport",e );
+
+ public void transportResumed(){
+ if(remoteInterupted.compareAndSet(true,false)){
+ // We want to slow down false connects so that we don't get in a busy loop.
+ // False connects can occurr if you using SSH tunnels.
+ if(!lastConnectSucceeded.get()){
+ try{
+ log
+ .debug("Previous connection was never fully established. Sleeping for second to avoid busy loop.");
+ Thread.sleep(1000);
+ }catch(InterruptedException e){
+ Thread.currentThread().interrupt();
+ }
+ }
+ lastConnectSucceeded.set(false);
+ try{
+ startLocalBridge();
+ remoteBridgeStarted.set(true);
+ startedLatch.countDown();
+ log.info("Outbound transport to "+remoteBrokerName+" resumed");
+ }catch(Exception e){
+ log.error("Caught exception from local start in resume transport",e);
+ }
}
-
}
+ });
+
+ localBroker.start();
+ remoteBroker.start();
+ try{
+ triggerRemoteStartBridge();
+ }catch(IOException e){
+ log.warn("Caught exception from remote start",e);
+ }
+ NetworkBridgeListener l=this.networkBridgeListener;
+ if(l!=null){
+ l.onStart(this);
}
- });
- localBroker.start();
- remoteBroker.start();
-
- try{
- triggerRemoteStartBridge();
- }catch(IOException e){
- log.warn("Caught exception from remote start",e);
- }
-
- NetworkBridgeListener l = this.networkBridgeListener;
- if (l!=null) {
- l.onStart(this);
}
-
}
-
+
protected void triggerLocalStartBridge() throws IOException {
Thread thead=new Thread(){
public void run(){
@@ -259,95 +264,88 @@
}
}
- protected void startRemoteBridge() throws Exception {
- if(remoteBridgeStarted.compareAndSet(false,true)) {
-
- synchronized (this) {
-
- if( remoteConnectionInfo!=null ) {
- remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
- }
-
+ protected void startRemoteBridge() throws Exception{
+ if(remoteBridgeStarted.compareAndSet(false,true)){
+ synchronized(this){
+ if(isCreatedByDuplex()==false){
+ BrokerInfo brokerInfo=new BrokerInfo();
+ brokerInfo.setBrokerName(configuration.getBrokerName());
+ brokerInfo.setNetworkConnection(true);
+ brokerInfo.setDuplexConnection(configuration.isDuplex());
+ // set our properties
+ Properties props=new Properties();
+ IntrospectionSupport.getProperties(this,props,null);
+ String str=MarshallingSupport.propertiesToString(props);
+ brokerInfo.setNetworkProperties(str);
+ remoteBroker.oneway(brokerInfo);
+ }
+ if(remoteConnectionInfo!=null){
+ remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
+ }
remoteConnectionInfo=new ConnectionInfo();
remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
remoteConnectionInfo.setClientId("NC_"+configuration.getBrokerName()+"_outbound");
remoteConnectionInfo.setUserName(configuration.getUserName());
remoteConnectionInfo.setPassword(configuration.getPassword());
remoteBroker.oneway(remoteConnectionInfo);
- if (isCreatedByDuplex()==false) {
- BrokerInfo brokerInfo=new BrokerInfo();
- brokerInfo.setBrokerName(configuration.getBrokerName());
- brokerInfo.setNetworkConnection(true);
- brokerInfo.setDuplexConnection(configuration.isDuplex());
-
- //set our properties
- Properties props = new Properties();
- IntrospectionSupport.getProperties(this,props,null);
- String str = MarshallingSupport.propertiesToString(props);
- brokerInfo.setNetworkProperties(str);
- remoteBroker.oneway(brokerInfo);
- }
-
+
SessionInfo remoteSessionInfo=new SessionInfo(remoteConnectionInfo,1);
remoteBroker.oneway(remoteSessionInfo);
-
producerInfo=new ProducerInfo(remoteSessionInfo,1);
producerInfo.setResponseRequired(false);
remoteBroker.oneway(producerInfo);
-
// Listen to consumer advisory messages on the remote broker to determine demand.
demandConsumerInfo=new ConsumerInfo(remoteSessionInfo,1);
demandConsumerInfo.setDispatchAsync(configuration.isDispatchAsync());
- String advisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX+configuration.getDestinationFilter();
- if( configuration.isBridgeTempDestinations() ) {
- advisoryTopic += ","+AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
+ String advisoryTopic=AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX
+ +configuration.getDestinationFilter();
+ if(configuration.isBridgeTempDestinations()){
+ advisoryTopic+=","+AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
}
demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
- remoteBroker.oneway(demandConsumerInfo);
+ remoteBroker.oneway(demandConsumerInfo);
startedLatch.countDown();
-
- if (!disposed){
+ if(!disposed){
triggerLocalStartBridge();
}
-
- }
+ }
}
}
public void stop() throws Exception{
- log.debug(" stopping "+configuration.getBrokerName()+" bridge to "+remoteBrokerName+" is disposed already ? "+disposed);
- boolean wasDisposedAlready=disposed;
- if(!disposed){
- NetworkBridgeListener l = this.networkBridgeListener;
- if (l!=null) {
- l.onStop(this);
+ if(started.compareAndSet(true,false)){
+ log.debug(" stopping "+configuration.getBrokerName()+" bridge to "+remoteBrokerName
+ +" is disposed already ? "+disposed);
+ boolean wasDisposedAlready=disposed;
+ if(!disposed){
+ NetworkBridgeListener l=this.networkBridgeListener;
+ if(l!=null){
+ l.onStop(this);
+ }
+ try{
+ disposed=true;
+ remoteBridgeStarted.set(false);
+ localBroker.oneway(new ShutdownInfo());
+ remoteBroker.oneway(new ShutdownInfo());
+ }catch(IOException e){
+ log.debug("Caught exception stopping",e);
+ }finally{
+ ServiceStopper ss=new ServiceStopper();
+ ss.stop(localBroker);
+ ss.stop(remoteBroker);
+ // Release the started Latch since another thread could be stuck waiting for it to start up.
+ startedLatch.countDown();
+ startedLatch.countDown();
+ ss.throwFirstException();
+ }
}
-
- try{
- disposed=true;
- remoteBridgeStarted.set(false);
- localBroker.oneway(new ShutdownInfo());
- remoteBroker.oneway(new ShutdownInfo());
- }catch(IOException e){
- log.debug("Caught exception stopping",e);
- }finally{
- ServiceStopper ss=new ServiceStopper();
- ss.stop(localBroker);
- ss.stop(remoteBroker);
-
- // Release the started Latch since another thread could be stuck waiting for it to start up.
- startedLatch.countDown();
- startedLatch.countDown();
-
- ss.throwFirstException();
+ if(wasDisposedAlready){
+ log.debug(configuration.getBrokerName()+" bridge to "+remoteBrokerName+" stopped");
+ }else{
+ log.info(configuration.getBrokerName()+" bridge to "+remoteBrokerName+" stopped");
}
}
- if(wasDisposedAlready){
- log.debug(configuration.getBrokerName()+" bridge to "+remoteBrokerName+" stopped");
- }else{
- log.info(configuration.getBrokerName()+" bridge to "+remoteBrokerName+" stopped");
- }
}
public void serviceRemoteException(Throwable error){
@@ -370,12 +368,12 @@
}
}
- protected void serviceRemoteCommand(Command command) {
+ protected void serviceRemoteCommand(Command command){
if(!disposed){
try{
if(command.isMessageDispatch()){
waitStarted();
- MessageDispatch md=(MessageDispatch) command;
+ MessageDispatch md=(MessageDispatch)command;
serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
demandConsumerDispatched++;
if(demandConsumerDispatched>(demandConsumerInfo.getPrefetchSize()*.75)){
@@ -383,117 +381,137 @@
demandConsumerDispatched=0;
}
}else if(command.isBrokerInfo()){
-
- lastConnectSucceeded.set(true);
- remoteBrokerInfo = ((BrokerInfo)command);
-
- serviceRemoteBrokerInfo(command);
- // Let the local broker know the remote broker's ID.
- localBroker.oneway(command);
-
- }else if(command.getClass() == ConnectionError.class ) {
- ConnectionError ce = (ConnectionError) command;
- serviceRemoteException(ce.getException());
+ lastConnectSucceeded.set(true);
+ remoteBrokerInfo=((BrokerInfo)command);
+ serviceRemoteBrokerInfo(command);
+ // Let the local broker know the remote broker's ID.
+ localBroker.oneway(command);
+ }else if(command.getClass()==ConnectionError.class){
+ ConnectionError ce=(ConnectionError)command;
+ serviceRemoteException(ce.getException());
}else{
- switch(command.getDataStructureType()){
- case KeepAliveInfo.DATA_STRUCTURE_TYPE:
- case WireFormatInfo.DATA_STRUCTURE_TYPE:
- case ShutdownInfo.DATA_STRUCTURE_TYPE:
- break;
- default:
- log.warn("Unexpected remote command: "+command);
+ if(configuration.isDuplex()||createdByDuplex){
+ if(command.isMessage()){
+ ActiveMQMessage message=(ActiveMQMessage)command;
+ if(AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())){
+ serviceRemoteConsumerAdvisory(message.getDataStructure());
+ }else{
+ localBroker.oneway(message);
+ }
+ }else{
+ switch(command.getDataStructureType()){
+ case ConnectionInfo.DATA_STRUCTURE_TYPE:
+ case SessionInfo.DATA_STRUCTURE_TYPE:
+ case ProducerInfo.DATA_STRUCTURE_TYPE:
+ localBroker.oneway(command);
+ break;
+ case ConsumerInfo.DATA_STRUCTURE_TYPE:
+ if(!addConsumerInfo((ConsumerInfo)command)){
+ if(log.isDebugEnabled())
+ log.debug("Ignoring ConsumerInfo: "+command);
+ }
+ break;
+ default:
+ if(log.isDebugEnabled())
+ log.debug("Ignoring remote command: "+command);
+ }
+ }
+ }else{
+ switch(command.getDataStructureType()){
+ case KeepAliveInfo.DATA_STRUCTURE_TYPE:
+ case WireFormatInfo.DATA_STRUCTURE_TYPE:
+ case ShutdownInfo.DATA_STRUCTURE_TYPE:
+ break;
+ default:
+ log.warn("Unexpected remote command: "+command);
+ }
}
}
- }catch(Exception e){
+ }catch(Throwable e){
serviceRemoteException(e);
}
}
}
- private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
-
- final int networkTTL = configuration.getNetworkTTL();
+ private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException{
+ final int networkTTL=configuration.getNetworkTTL();
if(data.getClass()==ConsumerInfo.class){
// Create a new local subscription
- ConsumerInfo info=(ConsumerInfo) data;
+ ConsumerInfo info=(ConsumerInfo)data;
BrokerId[] path=info.getBrokerPath();
- if((path!=null&&path.length>= networkTTL)){
+ if((path!=null&&path.length>=networkTTL)){
if(log.isDebugEnabled())
- log.debug(configuration.getBrokerName() + " Ignoring Subscription " + info + " restricted to " + networkTTL + " network hops only");
+ log.debug(configuration.getBrokerName()+" Ignoring Subscription "+info+" restricted to "+networkTTL
+ +" network hops only");
return;
}
if(contains(info.getBrokerPath(),localBrokerPath[0])){
// Ignore this consumer as it's a consumer we locally sent to the broker.
if(log.isDebugEnabled())
- log.debug(configuration.getBrokerName() + " Ignoring sub " + info + " already routed through this broker once");
+ log.debug(configuration.getBrokerName()+" Ignoring sub "+info
+ +" already routed through this broker once");
return;
}
- if (!isPermissableDestination(info.getDestination())){
- //ignore if not in the permited or in the excluded list
+ if(!isPermissableDestination(info.getDestination())){
+ // ignore if not in the permited or in the excluded list
if(log.isDebugEnabled())
- log.debug(configuration.getBrokerName() + " Ignoring sub " + info + " destination " + info.getDestination() + " is not permiited");
+ log.debug(configuration.getBrokerName()+" Ignoring sub "+info+" destination "+info.getDestination()
+ +" is not permiited");
return;
}
- // Update the packet to show where it came from.
- info=info.copy();
- addRemoteBrokerToBrokerPath(info);
- DemandSubscription sub=createDemandSubscription(info);
- if (sub != null){
- addSubscription(sub);
+ if(addConsumerInfo(info)){
if(log.isDebugEnabled())
- log.debug(configuration.getBrokerName() + " Forwarding sub on "+localBroker+" from "+remoteBrokerName+" : "+info);
- }else {
+ log.debug(configuration.getBrokerName()+" Forwarding sub on "+localBroker+" from "+remoteBrokerName
+ +" : "+info);
+ }else{
if(log.isDebugEnabled())
- log.debug(configuration.getBrokerName() + " Ignoring sub " + info + " already subscribed to matching destination");
+ log.debug(configuration.getBrokerName()+" Ignoring sub "+info
+ +" already subscribed to matching destination");
}
- }
- else if (data.getClass()==DestinationInfo.class){
-// It's a destination info - we want to pass up
- //infomation about temporary destinations
- DestinationInfo destInfo = (DestinationInfo) data;
+ }else if(data.getClass()==DestinationInfo.class){
+ // It's a destination info - we want to pass up
+ // infomation about temporary destinations
+ DestinationInfo destInfo=(DestinationInfo)data;
BrokerId[] path=destInfo.getBrokerPath();
- if((path!=null&&path.length>= networkTTL)){
+ if((path!=null&&path.length>=networkTTL)){
if(log.isDebugEnabled())
- log.debug("Ignoring Subscription " + destInfo + " restricted to " + networkTTL + " network hops only");
+ log.debug("Ignoring Subscription "+destInfo+" restricted to "+networkTTL+" network hops only");
return;
}
if(contains(destInfo.getBrokerPath(),localBrokerPath[0])){
// Ignore this consumer as it's a consumer we locally sent to the broker.
if(log.isDebugEnabled())
- log.debug("Ignoring sub " + destInfo + " already routed through this broker once");
+ log.debug("Ignoring sub "+destInfo+" already routed through this broker once");
return;
}
-
destInfo.setConnectionId(localConnectionInfo.getConnectionId());
- if (destInfo.getDestination() instanceof ActiveMQTempDestination){
- //re-set connection id so comes from here
- ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
+ if(destInfo.getDestination() instanceof ActiveMQTempDestination){
+ // re-set connection id so comes from here
+ ActiveMQTempDestination tempDest=(ActiveMQTempDestination)destInfo.getDestination();
tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
}
-
destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(),getRemoteBrokerPath()));
-
- log.debug("Replying destination control command: "+destInfo);
+ log.debug("Replying destination control command: "+destInfo);
localBroker.oneway(destInfo);
-
- }
- else if(data.getClass()==RemoveInfo.class){
- ConsumerId id=(ConsumerId) ((RemoveInfo) data).getObjectId();
+ }else if(data.getClass()==RemoveInfo.class){
+ ConsumerId id=(ConsumerId)((RemoveInfo)data).getObjectId();
removeDemandSubscription(id);
}
}
- public void serviceLocalException(Throwable error) {
- if( !disposed ) {
- log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown due to a local error: "+error);
- log.debug("The local Exception was:"+error,error);
- new Thread() {
- public void run() {
- ServiceSupport.dispose(DemandForwardingBridgeSupport.this);
- }
- }.start();
+ public void serviceLocalException(Throwable error){
+ if(!disposed){
+ log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown due to a local error: "
+ +error);
+ log.debug("The local Exception was:"+error,error);
+ new Thread(){
+
+ public void run(){
+ ServiceSupport.dispose(DemandForwardingBridgeSupport.this);
+ }
+ }.start();
fireBridgeFailed();
- }
+ }
}
protected void addSubscription(DemandSubscription sub) throws IOException {
@@ -741,8 +759,7 @@
}
}
return false;
- }
-
+ }
return true;
}
@@ -766,6 +783,18 @@
}
}
}
+
+ protected boolean addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
+ boolean result = false;
+ ConsumerInfo info=consumerInfo.copy();
+ addRemoteBrokerToBrokerPath(info);
+ DemandSubscription sub=createDemandSubscription(info);
+ if (sub != null){
+ addSubscription(sub);
+ result = true;
+ }
+ return result;
+ }
protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
return doCreateDemandSubscription(info);
@@ -775,7 +804,13 @@
DemandSubscription result=new DemandSubscription(info);
result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
.getNextSequenceId()));
-
+ if (info.getDestination().isTemporary()) {
+ //reset the local connection Id
+
+ ActiveMQTempDestination dest = (ActiveMQTempDestination)result.getLocalInfo().getDestination();
+ dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
+ }
+
if( configuration.isDecreaseNetworkConsumerPriority() ) {
byte priority=ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
if(priority>Byte.MIN_VALUE&&info.getBrokerPath()!=null&&info.getBrokerPath().length>1){
@@ -828,7 +863,7 @@
subscriptionMapByLocalId.clear();
subscriptionMapByRemoteId.clear();
}
-
+
protected abstract NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException;
protected abstract void serviceLocalBrokerInfo(Command command) throws InterruptedException;