You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/08/08 20:58:13 UTC

svn commit: r563982 [9/32] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/blob/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jmx...

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java Wed Aug  8 11:56:59 2007
@@ -28,70 +28,74 @@
 import java.util.Iterator;
 import java.util.List;
 
-
 /**
  * Consolidates subscriptions
  * 
  * @version $Revision: 1.1 $
  */
-public class ConduitBridge extends DemandForwardingBridge{
-    static final private Log log=LogFactory.getLog(ConduitBridge.class);
+public class ConduitBridge extends DemandForwardingBridge {
+    static final private Log log = LogFactory.getLog(ConduitBridge.class);
+
     /**
      * Constructor
+     * 
      * @param localBroker
      * @param remoteBroker
      */
-    public ConduitBridge(NetworkBridgeConfiguration configuration,Transport localBroker,Transport remoteBroker){
-        super(configuration,localBroker,remoteBroker);
+    public ConduitBridge(NetworkBridgeConfiguration configuration, Transport localBroker,
+                         Transport remoteBroker) {
+        super(configuration, localBroker, remoteBroker);
     }
-    
-    protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException{
-        
-        if (addToAlreadyInterestedConsumers(info)){
-            return null; //don't want this subscription added
+
+    protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
+
+        if (addToAlreadyInterestedConsumers(info)) {
+            return null; // don't want this subscription added
         }
         return doCreateDemandSubscription(info);
     }
-    
-    protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info){
-    	    	
-    	if( info.getSelector()!=null )
-    		return false;
-    	
-        //search through existing subscriptions and see if we have a match
+
+    protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info) {
+
+        if (info.getSelector() != null)
+            return false;
+
+        // search through existing subscriptions and see if we have a match
         boolean matched = false;
-        DestinationFilter filter=DestinationFilter.parseFilter(info.getDestination());
-        for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();){
+        DestinationFilter filter = DestinationFilter.parseFilter(info.getDestination());
+        for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) {
             DemandSubscription ds = (DemandSubscription)i.next();
-            if (filter.matches(ds.getLocalInfo().getDestination())){
-                //add the interest in the subscription
-                //ds.add(ds.getRemoteInfo().getConsumerId());
+            if (filter.matches(ds.getLocalInfo().getDestination())) {
+                // add the interest in the subscription
+                // ds.add(ds.getRemoteInfo().getConsumerId());
                 ds.add(info.getConsumerId());
                 matched = true;
-                //continue - we want interest to any existing DemandSubscriptions
+                // continue - we want interest to any existing
+                // DemandSubscriptions
             }
         }
         return matched;
     }
-    
-    protected void removeDemandSubscription(ConsumerId id) throws IOException{
+
+    protected void removeDemandSubscription(ConsumerId id) throws IOException {
         List tmpList = new ArrayList();
-    
-        for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();){
+
+        for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) {
             DemandSubscription ds = (DemandSubscription)i.next();
             ds.remove(id);
-            if (ds.isEmpty()){
+            if (ds.isEmpty()) {
                 tmpList.add(ds);
             }
         }
-        for (Iterator i = tmpList.iterator(); i.hasNext();){
-            DemandSubscription ds = (DemandSubscription) i.next();
+        for (Iterator i = tmpList.iterator(); i.hasNext();) {
+            DemandSubscription ds = (DemandSubscription)i.next();
             subscriptionMapByLocalId.remove(ds.getRemoteInfo().getConsumerId());
             removeSubscription(ds);
-            if(log.isTraceEnabled())
-                log.trace("removing sub on "+localBroker+" from "+remoteBrokerName+" :  "+ds.getRemoteInfo());
+            if (log.isTraceEnabled())
+                log.trace("removing sub on " + localBroker + " from " + remoteBrokerName + " :  "
+                          + ds.getRemoteInfo());
         }
-       
+
     }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java Wed Aug  8 11:56:59 2007
@@ -35,41 +35,42 @@
  */
 public class DemandForwardingBridge extends DemandForwardingBridgeSupport {
 
-    protected final BrokerId remoteBrokerPath[] = new BrokerId[] { null };
+    protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null};
     protected Object brokerInfoMutex = new Object();
     protected BrokerId remoteBrokerId;
 
-    public DemandForwardingBridge(NetworkBridgeConfiguration configuration,Transport localBroker,Transport remoteBroker){
-        super(configuration,localBroker, remoteBroker);
+    public DemandForwardingBridge(NetworkBridgeConfiguration configuration, Transport localBroker,
+                                  Transport remoteBroker) {
+        super(configuration, localBroker, remoteBroker);
     }
 
     protected void serviceRemoteBrokerInfo(Command command) throws IOException {
-        synchronized(brokerInfoMutex){
-            BrokerInfo remoteBrokerInfo=(BrokerInfo) command;
-            remoteBrokerId=remoteBrokerInfo.getBrokerId();
-            remoteBrokerPath[0]=remoteBrokerId;
-            remoteBrokerName=remoteBrokerInfo.getBrokerName();
-            if(localBrokerId!=null){
-                if(localBrokerId.equals(remoteBrokerId)){
+        synchronized (brokerInfoMutex) {
+            BrokerInfo remoteBrokerInfo = (BrokerInfo)command;
+            remoteBrokerId = remoteBrokerInfo.getBrokerId();
+            remoteBrokerPath[0] = remoteBrokerId;
+            remoteBrokerName = remoteBrokerInfo.getBrokerName();
+            if (localBrokerId != null) {
+                if (localBrokerId.equals(remoteBrokerId)) {
                     log.info("Disconnecting loop back connection.");
-                    //waitStarted();
+                    // waitStarted();
                     ServiceSupport.dispose(this);
                 }
             }
-        	remoteBrokerNameKnownLatch.countDown();
+            remoteBrokerNameKnownLatch.countDown();
         }
     }
 
     protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) {
-        info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(),getRemoteBrokerPath()));
+        info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
     }
 
     protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
-        synchronized(brokerInfoMutex){
-            localBrokerId=((BrokerInfo) command).getBrokerId();
-            localBrokerPath[0]=localBrokerId;
-            if(remoteBrokerId!=null){
-                if(remoteBrokerId.equals(localBrokerId)){
+        synchronized (brokerInfoMutex) {
+            localBrokerId = ((BrokerInfo)command).getBrokerId();
+            localBrokerPath[0] = localBrokerId;
+            if (remoteBrokerId != null) {
+                if (remoteBrokerId.equals(localBrokerId)) {
                     log.info("Disconnecting loop back connection.");
                     waitStarted();
                     ServiceSupport.dispose(this);
@@ -77,12 +78,12 @@
             }
         }
     }
-    
+
     protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
         return new NetworkBridgeFilter(remoteBrokerPath[0], configuration.getNetworkTTL());
     }
-    
-    protected BrokerId[] getRemoteBrokerPath(){
+
+    protected BrokerId[] getRemoteBrokerPath() {
         return remoteBrokerPath;
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java Wed Aug  8 11:56:59 2007
@@ -29,92 +29,93 @@
  * 
  * @version $Revision: 1.1 $
  */
-public class DemandSubscription{
+public class DemandSubscription {
     private ConsumerInfo remoteInfo;
     private ConsumerInfo localInfo;
     private Set remoteSubsIds = new CopyOnWriteArraySet();
     private AtomicInteger dispatched = new AtomicInteger(0);
 
-    DemandSubscription(ConsumerInfo info){
-        remoteInfo=info;
-        localInfo=info.copy();
+    DemandSubscription(ConsumerInfo info) {
+        remoteInfo = info;
+        localInfo = info.copy();
         localInfo.setBrokerPath(info.getBrokerPath());
         remoteSubsIds.add(info.getConsumerId());
-    } 
+    }
 
     /**
      * Increment the consumers associated with this subscription
+     * 
      * @param id
      * @return true if added
      */
-    public boolean add(ConsumerId id){
+    public boolean add(ConsumerId id) {
         return remoteSubsIds.add(id);
     }
-    
+
     /**
      * Increment the consumers associated with this subscription
+     * 
      * @param id
      * @return true if added
      */
-    public boolean remove(ConsumerId id){
+    public boolean remove(ConsumerId id) {
         return remoteSubsIds.remove(id);
     }
-    
+
     /**
      * @return true if there are no interested consumers
      */
-    public boolean isEmpty(){
+    public boolean isEmpty() {
         return remoteSubsIds.isEmpty();
     }
-    
-    
+
     /**
      * @return Returns the dispatched.
      */
-    public int getDispatched(){
+    public int getDispatched() {
         return dispatched.get();
     }
 
     /**
      * @param dispatched The dispatched to set.
      */
-    public void setDispatched(int dispatched){
+    public void setDispatched(int dispatched) {
         this.dispatched.set(dispatched);
     }
-    
+
     /**
      * @return dispatched count after incremented
      */
-    public int incrementDispatched(){
+    public int incrementDispatched() {
         return dispatched.incrementAndGet();
     }
 
     /**
      * @return Returns the localInfo.
      */
-    public ConsumerInfo getLocalInfo(){
+    public ConsumerInfo getLocalInfo() {
         return localInfo;
     }
 
     /**
      * @param localInfo The localInfo to set.
      */
-    public void setLocalInfo(ConsumerInfo localInfo){
-        this.localInfo=localInfo;
+    public void setLocalInfo(ConsumerInfo localInfo) {
+        this.localInfo = localInfo;
     }
 
     /**
      * @return Returns the remoteInfo.
      */
-    public ConsumerInfo getRemoteInfo(){
+    public ConsumerInfo getRemoteInfo() {
         return remoteInfo;
     }
 
     /**
      * @param remoteInfo The remoteInfo to set.
      */
-    public void setRemoteInfo(ConsumerInfo remoteInfo){
-        this.remoteInfo=remoteInfo;
+    public void setRemoteInfo(ConsumerInfo remoteInfo) {
+        this.remoteInfo = remoteInfo;
     }
-    
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java Wed Aug  8 11:56:59 2007
@@ -44,7 +44,7 @@
 
     private DiscoveryAgent discoveryAgent;
     private ConcurrentHashMap bridges = new ConcurrentHashMap();
-    
+
     public DiscoveryNetworkConnector() {
     }
 
@@ -56,74 +56,74 @@
         setDiscoveryAgent(DiscoveryAgentFactory.createDiscoveryAgent(discoveryURI));
     }
 
-    public void onServiceAdd(DiscoveryEvent event){
-        String localURIName=localURI.getScheme() + "://" + localURI.getHost();
+    public void onServiceAdd(DiscoveryEvent event) {
+        String localURIName = localURI.getScheme() + "://" + localURI.getHost();
         // Ignore events once we start stopping.
-        if(serviceSupport.isStopped()||serviceSupport.isStopping())
+        if (serviceSupport.isStopped() || serviceSupport.isStopping())
             return;
-        String url=event.getServiceName();
-        if(url!=null){
+        String url = event.getServiceName();
+        if (url != null) {
             URI uri;
-            try{
-                uri=new URI(url);
-            }catch(URISyntaxException e){
-                log.warn("Could not connect to remote URI: "+url+" due to bad URI syntax: "+e,e);
+            try {
+                uri = new URI(url);
+            } catch (URISyntaxException e) {
+                log.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
                 return;
             }
             // Should we try to connect to that URI?
-            if(bridges.containsKey(uri)||localURI.equals(uri)
-                    ||(connectionFilter!=null&&!connectionFilter.connectTo(uri)))
+            if (bridges.containsKey(uri) || localURI.equals(uri)
+                || (connectionFilter != null && !connectionFilter.connectTo(uri)))
                 return;
-            URI connectUri=uri;
-            log.info("Establishing network connection between from "+localURIName+" to "+connectUri);
+            URI connectUri = uri;
+            log.info("Establishing network connection between from " + localURIName + " to " + connectUri);
             Transport remoteTransport;
-            try{
-                remoteTransport=TransportFactory.connect(connectUri);
-            }catch(Exception e){
-                log.warn("Could not connect to remote URI: "+localURIName+": "+e.getMessage());
-                log.debug("Connection failure exception: "+e,e);
+            try {
+                remoteTransport = TransportFactory.connect(connectUri);
+            } catch (Exception e) {
+                log.warn("Could not connect to remote URI: " + localURIName + ": " + e.getMessage());
+                log.debug("Connection failure exception: " + e, e);
                 return;
             }
             Transport localTransport;
-            try{
-                localTransport=createLocalTransport();
-            }catch(Exception e){
+            try {
+                localTransport = createLocalTransport();
+            } catch (Exception e) {
                 ServiceSupport.dispose(remoteTransport);
-                log.warn("Could not connect to local URI: "+localURIName+": "+e.getMessage());
-                log.debug("Connection failure exception: "+e,e);
+                log.warn("Could not connect to local URI: " + localURIName + ": " + e.getMessage());
+                log.debug("Connection failure exception: " + e, e);
                 return;
             }
-            NetworkBridge bridge=createBridge(localTransport,remoteTransport,event);
-            bridges.put(uri,bridge);
-            try{
+            NetworkBridge bridge = createBridge(localTransport, remoteTransport, event);
+            bridges.put(uri, bridge);
+            try {
                 bridge.start();
-            }catch(Exception e){
+            } catch (Exception e) {
                 ServiceSupport.dispose(localTransport);
                 ServiceSupport.dispose(remoteTransport);
-                log.warn("Could not start network bridge between: "+localURIName+" and: "+uri+" due to: "+e);
-                log.debug("Start failure exception: "+e,e);
-                try{
+                log.warn("Could not start network bridge between: " + localURIName + " and: " + uri
+                         + " due to: " + e);
+                log.debug("Start failure exception: " + e, e);
+                try {
                     discoveryAgent.serviceFailed(event);
-                }catch(IOException e1){
+                } catch (IOException e1) {
                 }
                 return;
             }
         }
     }
-    
+
     public void onServiceRemove(DiscoveryEvent event) {
         String url = event.getServiceName();
         if (url != null) {
             URI uri;
             try {
                 uri = new URI(url);
-            }
-            catch (URISyntaxException e) {
+            } catch (URISyntaxException e) {
                 log.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
                 return;
             }
 
-            NetworkBridge bridge = (NetworkBridge) bridges.remove(uri);
+            NetworkBridge bridge = (NetworkBridge)bridges.remove(uri);
             if (bridge == null)
                 return;
 
@@ -153,55 +153,52 @@
 
     protected void handleStop(ServiceStopper stopper) throws Exception {
         for (Iterator i = bridges.values().iterator(); i.hasNext();) {
-            NetworkBridge bridge = (NetworkBridge) i.next();
+            NetworkBridge bridge = (NetworkBridge)i.next();
             try {
                 bridge.stop();
-            }
-            catch (Exception e) {
+            } catch (Exception e) {
                 stopper.onException(this, e);
             }
         }
         try {
             this.discoveryAgent.stop();
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             stopper.onException(this, e);
         }
 
         super.handleStop(stopper);
     }
 
-    protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) {
+    protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport,
+                                         final DiscoveryEvent event) {
         NetworkBridgeListener listener = new NetworkBridgeListener() {
 
-            public void bridgeFailed(){
-                if( !serviceSupport.isStopped() ) {
+            public void bridgeFailed() {
+                if (!serviceSupport.isStopped()) {
                     try {
                         discoveryAgent.serviceFailed(event);
                     } catch (IOException e) {
                     }
                 }
-                
+
             }
 
-			public void onStart(NetworkBridge bridge) {
-				 registerNetworkBridgeMBean(bridge);
-			}
+            public void onStart(NetworkBridge bridge) {
+                registerNetworkBridgeMBean(bridge);
+            }
 
-			public void onStop(NetworkBridge bridge) {
-				unregisterNetworkBridgeMBean(bridge);
-			}
+            public void onStop(NetworkBridge bridge) {
+                unregisterNetworkBridgeMBean(bridge);
+            }
 
-            
         };
-        DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this,localTransport,remoteTransport,listener);
+        DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this, localTransport,
+                                                                          remoteTransport, listener);
         return configureBridge(result);
     }
 
     public String getName() {
         return discoveryAgent.toString();
     }
-
-   
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java Wed Aug  8 11:56:59 2007
@@ -26,81 +26,85 @@
 
 import java.io.IOException;
 import java.util.Iterator;
+
 /**
  * Consolidates subscriptions
  * 
  * @version $Revision: 1.1 $
  */
-public class DurableConduitBridge extends ConduitBridge{
-    static final private Log log=LogFactory.getLog(DurableConduitBridge.class);
+public class DurableConduitBridge extends ConduitBridge {
+    static final private Log log = LogFactory.getLog(DurableConduitBridge.class);
 
     /**
      * Constructor
-     * @param configuration 
+     * 
+     * @param configuration
      * 
      * @param localBroker
      * @param remoteBroker
      */
-    public DurableConduitBridge(NetworkBridgeConfiguration configuration,Transport localBroker,Transport remoteBroker){
-        super(configuration,localBroker,remoteBroker);
+    public DurableConduitBridge(NetworkBridgeConfiguration configuration, Transport localBroker,
+                                Transport remoteBroker) {
+        super(configuration, localBroker, remoteBroker);
     }
 
     /**
      * Subscriptions for these destinations are always created
      * 
      */
-    protected void setupStaticDestinations(){
+    protected void setupStaticDestinations() {
         super.setupStaticDestinations();
-        ActiveMQDestination[] dests=durableDestinations;
-        if(dests!=null){
-            for(int i=0;i<dests.length;i++){
-                ActiveMQDestination dest=dests[i];
-                if(isPermissableDestination(dest) && !doesConsumerExist(dest)){
-                    DemandSubscription sub=createDemandSubscription(dest);
-                    if(dest.isTopic()){
+        ActiveMQDestination[] dests = durableDestinations;
+        if (dests != null) {
+            for (int i = 0; i < dests.length; i++) {
+                ActiveMQDestination dest = dests[i];
+                if (isPermissableDestination(dest) && !doesConsumerExist(dest)) {
+                    DemandSubscription sub = createDemandSubscription(dest);
+                    if (dest.isTopic()) {
                         sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest));
                     }
-                    try{
+                    try {
                         addSubscription(sub);
-                    }catch(IOException e){
-                        log.error("Failed to add static destination "+dest,e);
+                    } catch (IOException e) {
+                        log.error("Failed to add static destination " + dest, e);
                     }
-                    if(log.isTraceEnabled())
-                        log.trace("Forwarding messages for durable destination: "+dest);
+                    if (log.isTraceEnabled())
+                        log.trace("Forwarding messages for durable destination: " + dest);
                 }
             }
         }
     }
 
-    protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException{
-        if(addToAlreadyInterestedConsumers(info)){
+    protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
+        if (addToAlreadyInterestedConsumers(info)) {
             return null; // don't want this subscription added
         }
         // not matched so create a new one
         // but first, if it's durable - changed set the
         // ConsumerId here - so it won't be removed if the
         // durable subscriber goes away on the other end
-        if(info.isDurable()||(info.getDestination().isQueue()&&!info.getDestination().isTemporary())){
-            info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator.getNextSequenceId()));
+        if (info.isDurable() || (info.getDestination().isQueue() && !info.getDestination().isTemporary())) {
+            info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator
+                .getNextSequenceId()));
         }
-        if(info.isDurable()){
+        if (info.isDurable()) {
             // set the subscriber name to something reproducible
-           
+
             info.setSubscriptionName(getSubscriberName(info.getDestination()));
         }
         return doCreateDemandSubscription(info);
     }
-    
-    protected String getSubscriberName(ActiveMQDestination dest){
-        String subscriberName = configuration.getBrokerName()+"_"+dest.getPhysicalName();
+
+    protected String getSubscriberName(ActiveMQDestination dest) {
+        String subscriberName = configuration.getBrokerName() + "_" + dest.getPhysicalName();
         return subscriberName;
     }
 
-    protected boolean doesConsumerExist(ActiveMQDestination dest){
-        DestinationFilter filter=DestinationFilter.parseFilter(dest);
-        for(Iterator i=subscriptionMapByLocalId.values().iterator();i.hasNext();){
-            DemandSubscription ds=(DemandSubscription) i.next();
-            if(filter.matches(ds.getLocalInfo().getDestination())){
+    protected boolean doesConsumerExist(ActiveMQDestination dest) {
+        DestinationFilter filter = DestinationFilter.parseFilter(dest);
+        for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) {
+            DemandSubscription ds = (DemandSubscription)i.next();
+            if (filter.matches(ds.getLocalInfo().getDestination())) {
                 return true;
             }
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java Wed Aug  8 11:56:59 2007
@@ -46,7 +46,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-
 /**
  * Forwards all messages from the local broker to the remote broker.
  * 
@@ -54,34 +53,34 @@
  * 
  * @version $Revision$
  */
-public class ForwardingBridge  implements Service{
-    
+public class ForwardingBridge implements Service {
+
     static final private Log log = LogFactory.getLog(ForwardingBridge.class);
 
     private final Transport localBroker;
     private final Transport remoteBroker;
-    
+
     IdGenerator idGenerator = new IdGenerator();
     ConnectionInfo connectionInfo;
     SessionInfo sessionInfo;
     ProducerInfo producerInfo;
     ConsumerInfo queueConsumerInfo;
     ConsumerInfo topicConsumerInfo;
-    
+
     private String clientId;
-    private int prefetchSize=1000;
+    private int prefetchSize = 1000;
     private boolean dispatchAsync;
     private String destinationFilter = ">";
-    
+
     BrokerId localBrokerId;
     BrokerId remoteBrokerId;
     private NetworkBridgeListener bridgeFailedListener;
 
-	BrokerInfo localBrokerInfo;
-	BrokerInfo remoteBrokerInfo;
-	
-	final AtomicLong enqueueCounter = new AtomicLong();
-	final AtomicLong dequeueCounter = new AtomicLong();
+    BrokerInfo localBrokerInfo;
+    BrokerInfo remoteBrokerInfo;
+
+    final AtomicLong enqueueCounter = new AtomicLong();
+    final AtomicLong dequeueCounter = new AtomicLong();
 
     public ForwardingBridge(Transport localBroker, Transport remoteBroker) {
         this.localBroker = localBroker;
@@ -89,28 +88,31 @@
     }
 
     public void start() throws Exception {
-        log.info("Starting a network connection between " + localBroker + " and " + remoteBroker + " has been established.");
+        log.info("Starting a network connection between " + localBroker + " and " + remoteBroker
+                 + " has been established.");
 
-        localBroker.setTransportListener(new DefaultTransportListener(){
+        localBroker.setTransportListener(new DefaultTransportListener() {
             public void onCommand(Object o) {
-            	Command command = (Command) o;
+                Command command = (Command)o;
                 serviceLocalCommand(command);
             }
+
             public void onException(IOException error) {
                 serviceLocalException(error);
             }
         });
-        
-        remoteBroker.setTransportListener(new DefaultTransportListener(){
+
+        remoteBroker.setTransportListener(new DefaultTransportListener() {
             public void onCommand(Object o) {
-            	Command command = (Command) o;
+                Command command = (Command)o;
                 serviceRemoteCommand(command);
             }
+
             public void onException(IOException error) {
                 serviceRemoteException(error);
             }
         });
-        
+
         localBroker.start();
         remoteBroker.start();
     }
@@ -120,8 +122,7 @@
             public void run() {
                 try {
                     startBridge();
-                }
-                catch (IOException e) {
+                } catch (IOException e) {
                     log.error("Failed to start network bridge: " + e, e);
                 }
             }
@@ -139,22 +140,22 @@
         localBroker.oneway(connectionInfo);
         remoteBroker.oneway(connectionInfo);
 
-        sessionInfo=new SessionInfo(connectionInfo, 1);
+        sessionInfo = new SessionInfo(connectionInfo, 1);
         localBroker.oneway(sessionInfo);
         remoteBroker.oneway(sessionInfo);
-        
+
         queueConsumerInfo = new ConsumerInfo(sessionInfo, 1);
         queueConsumerInfo.setDispatchAsync(dispatchAsync);
         queueConsumerInfo.setDestination(new ActiveMQQueue(destinationFilter));
         queueConsumerInfo.setPrefetchSize(prefetchSize);
         queueConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
         localBroker.oneway(queueConsumerInfo);
-        
+
         producerInfo = new ProducerInfo(sessionInfo, 1);
         producerInfo.setResponseRequired(false);
         remoteBroker.oneway(producerInfo);
-        
-        if( connectionInfo.getClientId()!=null ) {
+
+        if (connectionInfo.getClientId() != null) {
             topicConsumerInfo = new ConsumerInfo(sessionInfo, 2);
             topicConsumerInfo.setDispatchAsync(dispatchAsync);
             topicConsumerInfo.setSubscriptionName("topic-bridge");
@@ -164,12 +165,13 @@
             topicConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
             localBroker.oneway(topicConsumerInfo);
         }
-        log.info("Network connection between " + localBroker + " and " + remoteBroker + " has been established.");
+        log.info("Network connection between " + localBroker + " and " + remoteBroker
+                 + " has been established.");
     }
-    
+
     public void stop() throws Exception {
         try {
-            if( connectionInfo!=null ) {
+            if (connectionInfo != null) {
                 localBroker.request(connectionInfo.createRemoveCommand());
                 remoteBroker.request(connectionInfo.createRemoveCommand());
             }
@@ -184,29 +186,29 @@
             ss.throwFirstException();
         }
     }
-    
+
     public void serviceRemoteException(Throwable error) {
-        log.info("Unexpected remote exception: "+error);
+        log.info("Unexpected remote exception: " + error);
         log.debug("Exception trace: ", error);
     }
-    
+
     protected void serviceRemoteCommand(Command command) {
         try {
-            if(command.isBrokerInfo() ) {
-                synchronized( this ) {
-                	remoteBrokerInfo = ((BrokerInfo)command);
+            if (command.isBrokerInfo()) {
+                synchronized (this) {
+                    remoteBrokerInfo = ((BrokerInfo)command);
                     remoteBrokerId = remoteBrokerInfo.getBrokerId();
-                    if( localBrokerId !=null) {
-                        if( localBrokerId.equals(remoteBrokerId) ) {
+                    if (localBrokerId != null) {
+                        if (localBrokerId.equals(remoteBrokerId)) {
                             log.info("Disconnecting loop back connection.");
                             ServiceSupport.dispose(this);
                         } else {
-                            triggerStartBridge();                            
+                            triggerStartBridge();
                         }
                     }
                 }
             } else {
-                log.warn("Unexpected remote command: "+command);
+                log.warn("Unexpected remote command: " + command);
             }
         } catch (IOException e) {
             serviceLocalException(e);
@@ -214,46 +216,50 @@
     }
 
     public void serviceLocalException(Throwable error) {
-        log.info("Unexpected local exception: "+error);
+        log.info("Unexpected local exception: " + error);
         log.debug("Exception trace: ", error);
         fireBridgeFailed();
-    }    
+    }
+
     protected void serviceLocalCommand(Command command) {
         try {
-            if( command.isMessageDispatch() ) {
-            	
-            	enqueueCounter.incrementAndGet();
-            	
-                final MessageDispatch md = (MessageDispatch) command;
+            if (command.isMessageDispatch()) {
+
+                enqueueCounter.incrementAndGet();
+
+                final MessageDispatch md = (MessageDispatch)command;
                 Message message = md.getMessage();
                 message.setProducerId(producerInfo.getProducerId());
-                
-                if( message.getOriginalTransactionId()==null )
+
+                if (message.getOriginalTransactionId() == null)
                     message.setOriginalTransactionId(message.getTransactionId());
                 message.setTransactionId(null);
 
-                
-                if( !message.isResponseRequired() ) {
-                    // If the message was originally sent using async send, we will preserve that QOS
-                    // by bridging it using an async send (small chance of message loss).
+                if (!message.isResponseRequired()) {
+                    // If the message was originally sent using async send, we
+                    // will preserve that QOS
+                    // by bridging it using an async send (small chance of
+                    // message loss).
                     remoteBroker.oneway(message);
-                	dequeueCounter.incrementAndGet();
-                    localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
-                    
+                    dequeueCounter.incrementAndGet();
+                    localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));
+
                 } else {
-                    
-                    // The message was not sent using async send, so we should only ack the local 
-                    // broker when we get confirmation that the remote broker has received the message.
+
+                    // The message was not sent using async send, so we should
+                    // only ack the local
+                    // broker when we get confirmation that the remote broker
+                    // has received the message.
                     ResponseCallback callback = new ResponseCallback() {
                         public void onCompletion(FutureResponse future) {
                             try {
                                 Response response = future.getResult();
-                                if(response.isException()){
-                                    ExceptionResponse er=(ExceptionResponse) response;
+                                if (response.isException()) {
+                                    ExceptionResponse er = (ExceptionResponse)response;
                                     serviceLocalException(er.getException());
                                 } else {
-                                	dequeueCounter.incrementAndGet();
-                                    localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
+                                    dequeueCounter.incrementAndGet();
+                                    localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));
                                 }
                             } catch (IOException e) {
                                 serviceLocalException(e);
@@ -263,41 +269,49 @@
 
                     remoteBroker.asyncRequest(message, callback);
                 }
-                
-                                
-                // Ack on every message since we don't know if the broker is blocked due to memory
-                // usage and is waiting for an Ack to un-block him. 
-
-                // Acking a range is more efficient, but also more prone to locking up a server
-                // Perhaps doing something like the following should be policy based.
-//                if( md.getConsumerId().equals(queueConsumerInfo.getConsumerId()) ) {
-//                    queueDispatched++;
-//                    if( queueDispatched > (queueConsumerInfo.getPrefetchSize()/2) ) {
-//                        localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, queueDispatched));
-//                        queueDispatched=0;
-//                    }
-//                } else {
-//                    topicDispatched++;
-//                    if( topicDispatched > (topicConsumerInfo.getPrefetchSize()/2) ) {
-//                        localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, topicDispatched));
-//                        topicDispatched=0;
-//                    }
-//                }
-            } else if(command.isBrokerInfo() ) {
-                synchronized( this ) {
-                	localBrokerInfo = ((BrokerInfo)command);
+
+                // Ack on every message since we don't know if the broker is
+                // blocked due to memory
+                // usage and is waiting for an Ack to un-block him.
+
+                // Acking a range is more efficient, but also more prone to
+                // locking up a server
+                // Perhaps doing something like the following should be policy
+                // based.
+                // if(
+                // md.getConsumerId().equals(queueConsumerInfo.getConsumerId())
+                // ) {
+                // queueDispatched++;
+                // if( queueDispatched > (queueConsumerInfo.getPrefetchSize()/2)
+                // ) {
+                // localBroker.oneway(new MessageAck(md,
+                // MessageAck.STANDARD_ACK_TYPE, queueDispatched));
+                // queueDispatched=0;
+                // }
+                // } else {
+                // topicDispatched++;
+                // if( topicDispatched > (topicConsumerInfo.getPrefetchSize()/2)
+                // ) {
+                // localBroker.oneway(new MessageAck(md,
+                // MessageAck.STANDARD_ACK_TYPE, topicDispatched));
+                // topicDispatched=0;
+                // }
+                // }
+            } else if (command.isBrokerInfo()) {
+                synchronized (this) {
+                    localBrokerInfo = ((BrokerInfo)command);
                     localBrokerId = localBrokerInfo.getBrokerId();
-                    if( remoteBrokerId !=null) {
-                        if( remoteBrokerId.equals(localBrokerId) ) {
+                    if (remoteBrokerId != null) {
+                        if (remoteBrokerId.equals(localBrokerId)) {
                             log.info("Disconnecting loop back connection.");
                             ServiceSupport.dispose(this);
                         } else {
-                            triggerStartBridge();                            
+                            triggerStartBridge();
                         }
                     }
                 }
             } else {
-                log.debug("Unexpected local command: "+command);
+                log.debug("Unexpected local command: " + command);
             }
         } catch (IOException e) {
             serviceLocalException(e);
@@ -307,6 +321,7 @@
     public String getClientId() {
         return clientId;
     }
+
     public void setClientId(String clientId) {
         this.clientId = clientId;
     }
@@ -314,6 +329,7 @@
     public int getPrefetchSize() {
         return prefetchSize;
     }
+
     public void setPrefetchSize(int prefetchSize) {
         this.prefetchSize = prefetchSize;
     }
@@ -321,6 +337,7 @@
     public boolean isDispatchAsync() {
         return dispatchAsync;
     }
+
     public void setDispatchAsync(boolean dispatchAsync) {
         this.dispatchAsync = dispatchAsync;
     }
@@ -328,44 +345,44 @@
     public String getDestinationFilter() {
         return destinationFilter;
     }
+
     public void setDestinationFilter(String destinationFilter) {
         this.destinationFilter = destinationFilter;
     }
 
-   
-    public void setNetworkBridgeFailedListener(NetworkBridgeListener listener){
-      this.bridgeFailedListener=listener;  
+    public void setNetworkBridgeFailedListener(NetworkBridgeListener listener) {
+        this.bridgeFailedListener = listener;
     }
-    
+
     private void fireBridgeFailed() {
         NetworkBridgeListener l = this.bridgeFailedListener;
-        if (l!=null) {
+        if (l != null) {
             l.bridgeFailed();
         }
     }
 
-	public String getRemoteAddress() {
-		return remoteBroker.getRemoteAddress();
-	}
-
-	public String getLocalAddress() {
-		return localBroker.getRemoteAddress();
-	}
-
-	public String getLocalBrokerName() {
-		return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
-	}
-
-	public String getRemoteBrokerName() {
-		return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
-	}
-	
-	public long getDequeueCounter() {
-		return dequeueCounter.get();
-	}
-
-	public long getEnqueueCounter() {
-		return enqueueCounter.get();
-	}
+    public String getRemoteAddress() {
+        return remoteBroker.getRemoteAddress();
+    }
+
+    public String getLocalAddress() {
+        return localBroker.getRemoteAddress();
+    }
+
+    public String getLocalBrokerName() {
+        return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
+    }
+
+    public String getRemoteBrokerName() {
+        return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
+    }
+
+    public long getDequeueCounter() {
+        return dequeueCounter.get();
+    }
+
+    public long getEnqueueCounter() {
+        return enqueueCounter.get();
+    }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java Wed Aug  8 11:56:59 2007
@@ -121,24 +121,21 @@
         if (bridge != null) {
             try {
                 bridge.stop();
-            }
-            catch (Exception e) {
+            } catch (Exception e) {
                 stopper.onException(this, e);
             }
         }
         if (remoteTransport != null) {
             try {
                 remoteTransport.stop();
-            }
-            catch (Exception e) {
+            } catch (Exception e) {
                 stopper.onException(this, e);
             }
         }
         if (localTransport != null) {
             try {
                 localTransport.stop();
-            }
-            catch (Exception e) {
+            } catch (Exception e) {
                 stopper.onException(this, e);
             }
         }
@@ -149,7 +146,7 @@
     }
 
     protected DemandForwardingBridgeSupport createBridge(Transport local, Transport remote) {
-        return new CompositeDemandForwardingBridge(this,local, remote);
+        return new CompositeDemandForwardingBridge(this, local, remote);
     }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java Wed Aug  8 11:56:59 2007
@@ -23,48 +23,44 @@
  */
 public class NetworkBridgeFactory {
 
-	/**
-	 * Create a network bridge
-	 * 
-	 * @param config
-	 * @param localTransport
-	 * @param remoteTransport
-	 * @return the NetworkBridge
-	 */
-	public static DemandForwardingBridge createBridge(
-			NetworkBridgeConfiguration config, Transport localTransport,
-			Transport remoteTransport) {
-		return createBridge(config, localTransport, remoteTransport, null);
-	}
+    /**
+     * Create a network bridge
+     * 
+     * @param config
+     * @param localTransport
+     * @param remoteTransport
+     * @return the NetworkBridge
+     */
+    public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration config,
+                                                      Transport localTransport, Transport remoteTransport) {
+        return createBridge(config, localTransport, remoteTransport, null);
+    }
 
-	/**
-	 * create a network bridge
-	 * 
-	 * @param configuration
-	 * @param localTransport
-	 * @param remoteTransport
-	 * @param listener
-	 * @return the NetworkBridge
-	 */
-	public static DemandForwardingBridge createBridge(
-			NetworkBridgeConfiguration configuration, Transport localTransport,
-			Transport remoteTransport, final NetworkBridgeListener listener) {
-		DemandForwardingBridge result = null;
-		if (configuration.isConduitSubscriptions()) {
-			if (configuration.isDynamicOnly()) {
-				result = new ConduitBridge(configuration, localTransport,
-						remoteTransport);
-			} else {
-				result = new DurableConduitBridge(configuration,
-						localTransport, remoteTransport);
-			}
-		} else {
-			result = new DemandForwardingBridge(configuration, localTransport,
-					remoteTransport);
-		}
-		if (listener != null) {
-			result.setNetworkBridgeListener(listener);
-		}
-		return result;
-	}
+    /**
+     * create a network bridge
+     * 
+     * @param configuration
+     * @param localTransport
+     * @param remoteTransport
+     * @param listener
+     * @return the NetworkBridge
+     */
+    public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration configuration,
+                                                      Transport localTransport, Transport remoteTransport,
+                                                      final NetworkBridgeListener listener) {
+        DemandForwardingBridge result = null;
+        if (configuration.isConduitSubscriptions()) {
+            if (configuration.isDynamicOnly()) {
+                result = new ConduitBridge(configuration, localTransport, remoteTransport);
+            } else {
+                result = new DurableConduitBridge(configuration, localTransport, remoteTransport);
+            }
+        } else {
+            result = new DemandForwardingBridge(configuration, localTransport, remoteTransport);
+        }
+        if (listener != null) {
+            result.setNetworkBridgeListener(listener);
+        }
+        return result;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java Wed Aug  8 11:56:59 2007
@@ -16,31 +16,26 @@
  */
 package org.apache.activemq.network;
 
-
-
 /**
- *called when a bridge fails
+ * called when a bridge fails
  * 
  * @version $Revision: 1.1 $
  */
-public interface NetworkBridgeListener{
-    
+public interface NetworkBridgeListener {
+
     /**
      * called when the transport fails
-     *
      */
     public void bridgeFailed();
 
     /**
      * called after the bridge is started.
-     *
      */
-	public void onStart(NetworkBridge bridge);
-	
+    public void onStart(NetworkBridge bridge);
+
     /**
      * called before the bridge is stopped.
-     *
      */
-	public void onStop(NetworkBridge bridge);
+    public void onStop(NetworkBridge bridge);
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java Wed Aug  8 11:56:59 2007
@@ -41,236 +41,231 @@
 /**
  * @version $Revision$
  */
-public abstract class NetworkConnector extends NetworkBridgeConfiguration implements Service{
+public abstract class NetworkConnector extends NetworkBridgeConfiguration implements Service {
 
-    protected static final Log log=LogFactory.getLog(NetworkConnector.class);
+    protected static final Log log = LogFactory.getLog(NetworkConnector.class);
     protected URI localURI;
     private Set durableDestinations;
-    private List excludedDestinations=new CopyOnWriteArrayList();
-    private List dynamicallyIncludedDestinations=new CopyOnWriteArrayList();
-    private List staticallyIncludedDestinations=new CopyOnWriteArrayList();
+    private List excludedDestinations = new CopyOnWriteArrayList();
+    private List dynamicallyIncludedDestinations = new CopyOnWriteArrayList();
+    private List staticallyIncludedDestinations = new CopyOnWriteArrayList();
     protected ConnectionFilter connectionFilter;
     private BrokerService brokerService;
     private ObjectName objectName;
-    
-    protected ServiceSupport serviceSupport=new ServiceSupport(){
 
-        protected void doStart() throws Exception{
-           handleStart();
+    protected ServiceSupport serviceSupport = new ServiceSupport() {
+
+        protected void doStart() throws Exception {
+            handleStart();
         }
 
-        protected void doStop(ServiceStopper stopper) throws Exception{
+        protected void doStop(ServiceStopper stopper) throws Exception {
             handleStop(stopper);
         }
     };
 
-    public NetworkConnector(){
+    public NetworkConnector() {
     }
 
-    public NetworkConnector(URI localURI){
-        this.localURI=localURI;
+    public NetworkConnector(URI localURI) {
+        this.localURI = localURI;
     }
 
-    public URI getLocalUri() throws URISyntaxException{
+    public URI getLocalUri() throws URISyntaxException {
         return localURI;
     }
 
-    public void setLocalUri(URI localURI){
-        this.localURI=localURI;
+    public void setLocalUri(URI localURI) {
+        this.localURI = localURI;
     }
 
-    
     /**
      * @return Returns the durableDestinations.
      */
-    public Set getDurableDestinations(){
+    public Set getDurableDestinations() {
         return durableDestinations;
     }
 
     /**
      * @param durableDestinations The durableDestinations to set.
      */
-    public void setDurableDestinations(Set durableDestinations){
-        this.durableDestinations=durableDestinations;
+    public void setDurableDestinations(Set durableDestinations) {
+        this.durableDestinations = durableDestinations;
     }
 
     /**
      * @return Returns the excludedDestinations.
      */
-    public List getExcludedDestinations(){
+    public List getExcludedDestinations() {
         return excludedDestinations;
     }
 
     /**
      * @param excludedDestinations The excludedDestinations to set.
      */
-    public void setExcludedDestinations(List excludedDestinations){
-        this.excludedDestinations=excludedDestinations;
+    public void setExcludedDestinations(List excludedDestinations) {
+        this.excludedDestinations = excludedDestinations;
     }
 
-    public void addExcludedDestination(ActiveMQDestination destiantion){
+    public void addExcludedDestination(ActiveMQDestination destiantion) {
         this.excludedDestinations.add(destiantion);
     }
 
     /**
      * @return Returns the staticallyIncludedDestinations.
      */
-    public List getStaticallyIncludedDestinations(){
+    public List getStaticallyIncludedDestinations() {
         return staticallyIncludedDestinations;
     }
 
     /**
-     * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set.
+     * @param staticallyIncludedDestinations The staticallyIncludedDestinations
+     *                to set.
      */
-    public void setStaticallyIncludedDestinations(List staticallyIncludedDestinations){
-        this.staticallyIncludedDestinations=staticallyIncludedDestinations;
+    public void setStaticallyIncludedDestinations(List staticallyIncludedDestinations) {
+        this.staticallyIncludedDestinations = staticallyIncludedDestinations;
     }
 
-    public void addStaticallyIncludedDestination(ActiveMQDestination destiantion){
+    public void addStaticallyIncludedDestination(ActiveMQDestination destiantion) {
         this.staticallyIncludedDestinations.add(destiantion);
     }
 
     /**
      * @return Returns the dynamicallyIncludedDestinations.
      */
-    public List getDynamicallyIncludedDestinations(){
+    public List getDynamicallyIncludedDestinations() {
         return dynamicallyIncludedDestinations;
     }
 
     /**
-     * @param dynamicallyIncludedDestinations The dynamicallyIncludedDestinations to set.
+     * @param dynamicallyIncludedDestinations The
+     *                dynamicallyIncludedDestinations to set.
      */
-    public void setDynamicallyIncludedDestinations(List dynamicallyIncludedDestinations){
-        this.dynamicallyIncludedDestinations=dynamicallyIncludedDestinations;
+    public void setDynamicallyIncludedDestinations(List dynamicallyIncludedDestinations) {
+        this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
     }
 
-    public void addDynamicallyIncludedDestination(ActiveMQDestination destiantion){
+    public void addDynamicallyIncludedDestination(ActiveMQDestination destiantion) {
         this.dynamicallyIncludedDestinations.add(destiantion);
     }
-    
-    public ConnectionFilter getConnectionFilter(){
+
+    public ConnectionFilter getConnectionFilter() {
         return connectionFilter;
     }
 
-    public void setConnectionFilter(ConnectionFilter connectionFilter){
-        this.connectionFilter=connectionFilter;
+    public void setConnectionFilter(ConnectionFilter connectionFilter) {
+        this.connectionFilter = connectionFilter;
     }
 
-
     // Implementation methods
     // -------------------------------------------------------------------------
-    protected NetworkBridge configureBridge(DemandForwardingBridgeSupport result){
-        List destsList=getDynamicallyIncludedDestinations();
-        ActiveMQDestination dests[]=(ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]);
+    protected NetworkBridge configureBridge(DemandForwardingBridgeSupport result) {
+        List destsList = getDynamicallyIncludedDestinations();
+        ActiveMQDestination dests[] = (ActiveMQDestination[])destsList
+            .toArray(new ActiveMQDestination[destsList.size()]);
         result.setDynamicallyIncludedDestinations(dests);
-        destsList=getExcludedDestinations();
-        dests=(ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]);
+        destsList = getExcludedDestinations();
+        dests = (ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]);
         result.setExcludedDestinations(dests);
-        destsList=getStaticallyIncludedDestinations();
-        dests=(ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]);
+        destsList = getStaticallyIncludedDestinations();
+        dests = (ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]);
         result.setStaticallyIncludedDestinations(dests);
-        if(durableDestinations!=null){
-            ActiveMQDestination[] dest=new ActiveMQDestination[durableDestinations.size()];
-            dest=(ActiveMQDestination[])durableDestinations.toArray(dest);
+        if (durableDestinations != null) {
+            ActiveMQDestination[] dest = new ActiveMQDestination[durableDestinations.size()];
+            dest = (ActiveMQDestination[])durableDestinations.toArray(dest);
             result.setDurableDestinations(dest);
         }
         return result;
     }
 
-    protected Transport createLocalTransport() throws Exception{
+    protected Transport createLocalTransport() throws Exception {
         return TransportFactory.connect(localURI);
     }
 
-    public void start() throws Exception{
+    public void start() throws Exception {
         serviceSupport.start();
     }
 
-    public void stop() throws Exception{
+    public void stop() throws Exception {
         serviceSupport.stop();
     }
-    
+
     public abstract String getName();
-    
-    protected void handleStart() throws Exception{
-        if(localURI==null){
+
+    protected void handleStart() throws Exception {
+        if (localURI == null) {
             throw new IllegalStateException("You must configure the 'localURI' property");
         }
-        log.info("Network Connector "+getName()+" Started");
+        log.info("Network Connector " + getName() + " Started");
     }
 
-    protected void handleStop(ServiceStopper stopper) throws Exception{
-        log.info("Network Connector "+getName()+" Stopped");
+    protected void handleStop(ServiceStopper stopper) throws Exception {
+        log.info("Network Connector " + getName() + " Stopped");
     }
-    
+
     public ObjectName getObjectName() {
-		return objectName;
-	}
+        return objectName;
+    }
+
+    public void setObjectName(ObjectName objectName) {
+        this.objectName = objectName;
+    }
+
+    public BrokerService getBrokerService() {
+        return brokerService;
+    }
 
-	public void setObjectName(ObjectName objectName) {
-		this.objectName = objectName;
-	}
-
-	public BrokerService getBrokerService() {
-		return brokerService;
-	}
-
-	public void setBrokerService(BrokerService brokerService) {
-		this.brokerService = brokerService;
-	}
-
-	protected void registerNetworkBridgeMBean(NetworkBridge bridge) {
-		if (!getBrokerService().isUseJmx())
-			return;
-
-		MBeanServer mbeanServer = getBrokerService().getManagementContext()
-				.getMBeanServer();
-		if (mbeanServer != null) {
-			NetworkBridgeViewMBean view = new NetworkBridgeView(bridge);
-			try {
-				ObjectName objectName = createNetworkBridgeObjectName(bridge);
-				mbeanServer.registerMBean(view, objectName);
-			} catch (Throwable e) {
-				log.debug("Network bridge could not be registered in JMX: "
-						+ e.getMessage(), e);
-			}
-		}
-	}
-
-	protected void unregisterNetworkBridgeMBean(NetworkBridge bridge) {
-		if (!getBrokerService().isUseJmx())
-			return;
-
-		MBeanServer mbeanServer = getBrokerService().getManagementContext()
-				.getMBeanServer();
-		if (mbeanServer != null) {
-			try {
-				ObjectName objectName = createNetworkBridgeObjectName(bridge);
-				mbeanServer.unregisterMBean(objectName);
-			} catch (Throwable e) {
-				log.debug("Network bridge could not be unregistered in JMX: "
-						+ e.getMessage(), e);
-			}
-		}
-	}
-
-	protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge)
-			throws MalformedObjectNameException {
-		ObjectName connectorName = getObjectName();
-		Hashtable map = connectorName.getKeyPropertyList();
-		return new ObjectName(connectorName.getDomain()
-				+ ":"
-				+ "BrokerName="
-				+ JMXSupport.encodeObjectNamePart((String) map
-						.get("BrokerName"))
-				+ ","
-				+ "Type=NetworkBridge,"
-				+ "NetworkConnectorName="
-				+ JMXSupport.encodeObjectNamePart((String) map
-						.get("NetworkConnectorName"))
-				+ ","
-				+ "Name="
-				+ JMXSupport.encodeObjectNamePart(JMXSupport
-						.encodeObjectNamePart(bridge.getRemoteAddress())));
-	}
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerService = brokerService;
+    }
+
+    protected void registerNetworkBridgeMBean(NetworkBridge bridge) {
+        if (!getBrokerService().isUseJmx())
+            return;
+
+        MBeanServer mbeanServer = getBrokerService().getManagementContext().getMBeanServer();
+        if (mbeanServer != null) {
+            NetworkBridgeViewMBean view = new NetworkBridgeView(bridge);
+            try {
+                ObjectName objectName = createNetworkBridgeObjectName(bridge);
+                mbeanServer.registerMBean(view, objectName);
+            } catch (Throwable e) {
+                log.debug("Network bridge could not be registered in JMX: " + e.getMessage(), e);
+            }
+        }
+    }
+
+    protected void unregisterNetworkBridgeMBean(NetworkBridge bridge) {
+        if (!getBrokerService().isUseJmx())
+            return;
+
+        MBeanServer mbeanServer = getBrokerService().getManagementContext().getMBeanServer();
+        if (mbeanServer != null) {
+            try {
+                ObjectName objectName = createNetworkBridgeObjectName(bridge);
+                mbeanServer.unregisterMBean(objectName);
+            } catch (Throwable e) {
+                log.debug("Network bridge could not be unregistered in JMX: " + e.getMessage(), e);
+            }
+        }
+    }
+
+    protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge)
+        throws MalformedObjectNameException {
+        ObjectName connectorName = getObjectName();
+        Hashtable map = connectorName.getKeyPropertyList();
+        return new ObjectName(connectorName.getDomain()
+                              + ":"
+                              + "BrokerName="
+                              + JMXSupport.encodeObjectNamePart((String)map.get("BrokerName"))
+                              + ","
+                              + "Type=NetworkBridge,"
+                              + "NetworkConnectorName="
+                              + JMXSupport.encodeObjectNamePart((String)map.get("NetworkConnectorName"))
+                              + ","
+                              + "Name="
+                              + JMXSupport.encodeObjectNamePart(JMXSupport.encodeObjectNamePart(bridge
+                                  .getRemoteAddress())));
+    }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundQueueBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundQueueBridge.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundQueueBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundQueueBridge.java Wed Aug  8 11:56:59 2007
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.network.jms;
 
-
 /**
  * Create an Inbound Queue Bridge
  * 
@@ -24,38 +23,40 @@
  * 
  * @version $Revision: 1.1.1.1 $
  */
-public class InboundQueueBridge extends QueueBridge{
-       
+public class InboundQueueBridge extends QueueBridge {
+
     String inboundQueueName;
     String localQueueName;
+
     /**
      * Constructor that takes a foriegn destination as an argument
+     * 
      * @param inboundQueueName
      */
-    public  InboundQueueBridge(String inboundQueueName){
-       this.inboundQueueName = inboundQueueName;
-       this.localQueueName = inboundQueueName;
+    public InboundQueueBridge(String inboundQueueName) {
+        this.inboundQueueName = inboundQueueName;
+        this.localQueueName = inboundQueueName;
     }
-    
+
     /**
      * Default Contructor
      */
-    public  InboundQueueBridge(){
+    public InboundQueueBridge() {
     }
 
     /**
      * @return Returns the inboundQueueName.
      */
-    public String getInboundQueueName(){
+    public String getInboundQueueName() {
         return inboundQueueName;
     }
 
     /**
      * @param inboundQueueName The inboundQueueName to set.
      */
-    public void setInboundQueueName(String inboundQueueName){
-        this.inboundQueueName=inboundQueueName;
-        if (this.localQueueName == null){
+    public void setInboundQueueName(String inboundQueueName) {
+        this.inboundQueueName = inboundQueueName;
+        if (this.localQueueName == null) {
             this.localQueueName = inboundQueueName;
         }
     }
@@ -63,15 +64,15 @@
     /**
      * @return the localQueueName
      */
-    public String getLocalQueueName(){
+    public String getLocalQueueName() {
         return localQueueName;
     }
 
     /**
      * @param localQueueName the localQueueName to set
      */
-    public void setLocalQueueName(String localQueueName){
-        this.localQueueName=localQueueName;
+    public void setLocalQueueName(String localQueueName) {
+        this.localQueueName = localQueueName;
     }
-    
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundTopicBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundTopicBridge.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundTopicBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundTopicBridge.java Wed Aug  8 11:56:59 2007
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.network.jms;
 
-
 /**
  * Create an Inbound Topic Bridge
  * 
@@ -24,38 +23,40 @@
  * 
  * @version $Revision: 1.1.1.1 $
  */
-public class InboundTopicBridge extends TopicBridge{
-       
+public class InboundTopicBridge extends TopicBridge {
+
     String inboundTopicName;
     String localTopicName;
+
     /**
      * Constructor that takes a foriegn destination as an argument
+     * 
      * @param inboundTopicName
      */
-    public  InboundTopicBridge(String  inboundTopicName){
+    public InboundTopicBridge(String inboundTopicName) {
         this.inboundTopicName = inboundTopicName;
         this.localTopicName = inboundTopicName;
     }
-    
+
     /**
      * Default Contructor
      */
-    public  InboundTopicBridge(){
+    public InboundTopicBridge() {
     }
 
     /**
      * @return Returns the outboundTopicName.
      */
-    public String getInboundTopicName(){
+    public String getInboundTopicName() {
         return inboundTopicName;
     }
 
     /**
-     * @param inboundTopicName 
+     * @param inboundTopicName
      */
-    public void setInboundTopicName(String inboundTopicName){
-        this.inboundTopicName=inboundTopicName;
-        if(this.localTopicName==null){
+    public void setInboundTopicName(String inboundTopicName) {
+        this.inboundTopicName = inboundTopicName;
+        if (this.localTopicName == null) {
             this.localTopicName = inboundTopicName;
         }
     }
@@ -63,15 +64,15 @@
     /**
      * @return the localTopicName
      */
-    public String getLocalTopicName(){
+    public String getLocalTopicName() {
         return localTopicName;
     }
 
     /**
      * @param localTopicName the localTopicName to set
      */
-    public void setLocalTopicName(String localTopicName){
-        this.localTopicName=localTopicName;
+    public void setLocalTopicName(String localTopicName) {
+        this.localTopicName = localTopicName;
     }
-    
+
 }