You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/03/08 13:48:46 UTC

svn commit: r920306 [1/2] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/co...

Author: rajdavies
Date: Mon Mar  8 12:48:45 2010
New Revision: 920306

URL: http://svn.apache.org/viewvc?rev=920306&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-2632

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/RunBroker.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Command.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/RemoveSubscriptionInfo.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionControlMarshaller.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/CompositeTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionControlTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTopicTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBQueueTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Mon Mar  8 12:48:45 2010
@@ -215,6 +215,7 @@
 
         this.info = new ConnectionInfo(new ConnectionId(CONNECTION_ID_GENERATOR.generateId()));
         this.info.setManageable(true);
+        this.info.setFaultTolerant(transport.isFaultTolerant());
         this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
 
         this.transport.setTransportListener(this);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Mon Mar  8 12:48:45 2010
@@ -440,11 +440,10 @@
             String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
             advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
             
-            String[] uris = getBrokerService().getTransportConnectorURIs();
             String url = getBrokerService().getVmConnectorURI().toString();
-            if (uris != null && uris.length > 0) {
-                url = uris[0];
-            } 
+            if (getBrokerService().getDefaultSocketURI() != null) {
+                url = getBrokerService().getDefaultSocketURI().toString();
+            }
             advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);
             
             //set the data structure

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Mon Mar  8 12:48:45 2010
@@ -31,7 +31,6 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.management.MalformedObjectNameException;
@@ -154,6 +153,7 @@
     private boolean deleteAllMessagesOnStartup;
     private boolean advisorySupport = true;
     private URI vmConnectorURI;
+    private URI defaultSocketURI;
     private PolicyMap destinationPolicy;
     private final AtomicBoolean started = new AtomicBoolean(false);
     private final AtomicBoolean stopped = new AtomicBoolean(false);
@@ -1271,6 +1271,28 @@
     public void setVmConnectorURI(URI vmConnectorURI) {
         this.vmConnectorURI = vmConnectorURI;
     }
+    
+    public URI getDefaultSocketURI() {
+       
+            if (started.get()) {
+                if (this.defaultSocketURI==null) {
+                    for (TransportConnector tc:this.transportConnectors) {
+                        URI result = null;
+                        try {
+                            result = tc.getConnectUri();
+                        } catch (Exception e) {
+                          LOG.warn("Failed to get the ConnectURI for "+tc,e);
+                        }
+                        if (result != null) {
+                            this.defaultSocketURI=result;
+                            break;
+                        }
+                    }
+                }
+                return this.defaultSocketURI;
+            }
+       return null;
+    }
 
     /**
      * @return Returns the shutdownOnMasterFailure.
@@ -2007,6 +2029,9 @@
                     connector.setLocalUri(uri);
                     connector.setBrokerName(getBrokerName());
                     connector.setDurableDestinations(durableDestinations);
+                    if (getDefaultSocketURI() != null) {
+                        connector.setBrokerURL(getDefaultSocketURI().toString());
+                    }
                     connector.start();
                 }
                 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java Mon Mar  8 12:48:45 2010
@@ -17,10 +17,10 @@
 package org.apache.activemq.broker;
 
 import java.io.IOException;
-
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.region.ConnectionStatistics;
 import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionControl;
 import org.apache.activemq.command.Response;
 
 /**
@@ -51,6 +51,7 @@
      * Services a client command and submits it to the broker.
      * 
      * @param command
+     * @return Response
      */
     Response service(Command command);
 
@@ -110,5 +111,12 @@
      * @return
      */
     boolean isNetworkConnection();
+    
+    /**
+     * @return true if a fault tolerant connection
+     */
+    boolean isFaultTolerantConnection();
+    
+    void updateClient(ConnectionControl control);
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connector.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connector.java Mon Mar  8 12:48:45 2010
@@ -28,8 +28,7 @@
 public interface Connector extends Service {
 
     /**
-     * 
-     * @return
+     * @return brokerInfo
      */
     BrokerInfo getBrokerInfo();
 
@@ -37,4 +36,21 @@
      * @return the statistics for this connector
      */
     ConnectorStatistics getStatistics();
+    
+    /**
+     * @return true if update client connections when brokers leave/join a cluster
+     */
+    public boolean isUpdateClusterClients();
+
+    /**
+     * @return true if clients should be re-balanced across the cluster
+     */
+    public boolean isRebalanceClusterClients();
+    
+    /**
+     * Update all the connections with information
+     * about the connected brokers in the cluster
+     */
+    public void updateClientClusterInfo();
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Mon Mar  8 12:48:45 2010
@@ -121,7 +121,7 @@
     // Used to do async dispatch.. this should perhaps be pushed down into the
     // transport layer..
     private boolean inServiceException;
-    private ConnectionStatistics statistics = new ConnectionStatistics();
+    private final ConnectionStatistics statistics = new ConnectionStatistics();
     private boolean manageable;
     private boolean slow;
     private boolean markedCandidate;
@@ -133,15 +133,15 @@
     private boolean pendingStop;
     private long timeStamp;
     private final AtomicBoolean stopping = new AtomicBoolean(false);
-    private CountDownLatch stopped = new CountDownLatch(1);
+    private final CountDownLatch stopped = new CountDownLatch(1);
     private final AtomicBoolean asyncException = new AtomicBoolean(false);
     private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
     private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>();
-    private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
+    private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
     private ConnectionContext context;
     private boolean networkConnection;
     private boolean faultTolerantConnection;
-    private AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
+    private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
     private DemandForwardingBridge duplexBridge;
     private final TaskRunnerFactory taskRunnerFactory;
     private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
@@ -168,6 +168,7 @@
         this.taskRunnerFactory = taskRunnerFactory;
         this.transport = transport;
         this.transport.setTransportListener(new DefaultTransportListener() {
+            @Override
             public void onCommand(Object o) {
                 serviceLock.readLock().lock();
                 try {
@@ -184,6 +185,7 @@
                 }
             }
 
+            @Override
             public void onException(IOException exception) {
                 serviceLock.readLock().lock();
                 try {
@@ -241,6 +243,7 @@
     public void serviceExceptionAsync(final IOException e) {
         if (asyncException.compareAndSet(false, true)) {
             new Thread("Async Exception Handler") {
+                @Override
                 public void run() {
                     serviceException(e);
                 }
@@ -654,6 +657,7 @@
         }
         registerConnectionState(info.getConnectionId(), state);
         LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress());
+        this.faultTolerantConnection=info.isFaultTolerant();
         // Setup the context.
         String clientId = info.getClientId();
         context = new ConnectionContext();
@@ -672,6 +676,7 @@
         this.manageable = info.isManageable();
         state.setContext(context);
         state.setConnection(this);
+       
         try {
             broker.addConnection(context, info);
         } catch (Exception e) {
@@ -679,9 +684,9 @@
             LOG.warn("Failed to add Connection", e);
             throw e;
         }
-        if (info.isManageable() && broker.isFaultTolerantConfiguration()) {
+        if (info.isManageable()) {
             // send ConnectionCommand
-            ConnectionControl command = new ConnectionControl();
+            ConnectionControl command = this.connector.getConnectionControl();
             command.setFaultTolerant(broker.isFaultTolerantConfiguration());
             dispatchAsync(command);
         }
@@ -867,7 +872,10 @@
                 }
                 transport.start();
                 active = true;
-                dispatchAsync(connector.getBrokerInfo());
+                BrokerInfo info = connector.getBrokerInfo().copy();
+                info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos());
+                dispatchAsync(info);
+                
                 connector.onStarted(this);
             }
         } catch (Exception e) {
@@ -1120,6 +1128,10 @@
     public synchronized boolean isNetworkConnection() {
         return networkConnection;
     }
+    
+    public boolean isFaultTolerantConnection() {
+       return this.faultTolerantConnection;
+    }
 
     protected synchronized void setStarting(boolean starting) {
         this.starting = starting;
@@ -1222,6 +1234,13 @@
         }
         return null;
     }
+        
+    public void updateClient(ConnectionControl control) {
+        if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null
+                && this.wireFormatInfo.getVersion() >= 6) {
+            dispatchAsync(control);
+        }
+    }
 
     private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) {
         ProducerBrokerExchange result = producerExchanges.get(id);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java Mon Mar  8 12:48:45 2010
@@ -16,10 +16,17 @@
  */
 package org.apache.activemq.broker;
 
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.concurrent.CopyOnWriteArrayList;
+import javax.management.ObjectName;
 import org.apache.activemq.broker.jmx.ManagedTransportConnector;
 import org.apache.activemq.broker.jmx.ManagementContext;
 import org.apache.activemq.broker.region.ConnectorStatistics;
 import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.ConnectionControl;
 import org.apache.activemq.security.MessageAuthorizationPolicy;
 import org.apache.activemq.thread.DefaultThreadPools;
 import org.apache.activemq.thread.TaskRunnerFactory;
@@ -34,6 +41,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+
 import static org.apache.activemq.thread.DefaultThreadPools.*;
 
 import java.io.IOException;
@@ -53,7 +61,6 @@
 
     protected CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>();
     protected TransportStatusDetector statusDector;
-
     private BrokerService brokerService;
     private TransportServer server;
     private URI uri;
@@ -61,14 +68,16 @@
     private TaskRunnerFactory taskRunnerFactory;
     private MessageAuthorizationPolicy messageAuthorizationPolicy;
     private DiscoveryAgent discoveryAgent;
-    private ConnectorStatistics statistics = new ConnectorStatistics();
+    private final ConnectorStatistics statistics = new ConnectorStatistics();
     private URI discoveryUri;
     private URI connectUri;
     private String name;
     private boolean disableAsyncDispatch;
     private boolean enableStatusMonitor = false;
     private Broker broker;
-
+    private boolean updateClusterClients=false;
+    private boolean rebalanceClusterClients;
+    
     public TransportConnector() {
     }
 
@@ -109,6 +118,8 @@
         rc.setTaskRunnerFactory(getTaskRunnerFactory());
         rc.setUri(getUri());
         rc.setBrokerService(brokerService);
+        rc.setUpdateClusterClients(isUpdateClusterClients());
+        rc.setRebalanceClusterClients(isRebalanceClusterClients());
         return rc;
     }
 
@@ -193,16 +204,13 @@
     }
 
     public void start() throws Exception {
-        
-        TransportServer server = getServer();
-        
+        TransportServer server = getServer();  
         broker = brokerService.getBroker();
         brokerInfo.setBrokerName(broker.getBrokerName());
         brokerInfo.setBrokerId(broker.getBrokerId());
         brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
         brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
         brokerInfo.setBrokerURL(server.getConnectURI().toString());
-        
         server.setAcceptListener(new TransportAcceptListener() {
             public void onAccept(final Transport transport) {
                 try {
@@ -233,7 +241,6 @@
                 LOG.debug("Reason: " + error, error);
             }
         });
-        
         server.setBrokerInfo(brokerInfo);
         server.start();
         
@@ -366,6 +373,7 @@
         this.name = name;
     }
 
+    @Override
     public String toString() {
         String rc = getName();
         if (rc == null) {
@@ -373,6 +381,43 @@
         }
         return rc;
     }
+    
+    protected ConnectionControl getConnectionControl() {
+        boolean rebalance = isRebalanceClusterClients();
+            String connectedBrokers = "";
+            String self = "";
+            if (brokerService.getDefaultSocketURI() != null) {
+                self += brokerService.getDefaultSocketURI().toString();
+                self += ",";
+            }
+            if (rebalance == false) {
+                connectedBrokers += self;
+            }
+            if (this.broker.getPeerBrokerInfos() != null) {
+            for (BrokerInfo info : this.broker.getPeerBrokerInfos()) {
+                connectedBrokers += info.getBrokerURL();
+                connectedBrokers += ",";
+            }
+            }
+            if (rebalance) {
+                connectedBrokers += self;
+            }
+
+            ConnectionControl control = new ConnectionControl();
+            control.setConnectedBrokers(connectedBrokers);
+            control.setRebalanceConnection(rebalance);
+            return control;
+        
+    }
+    
+    public void updateClientClusterInfo() {
+        if (isRebalanceClusterClients() || isUpdateClusterClients()) {
+            ConnectionControl control = getConnectionControl();
+            for (Connection c: this.connections) {
+                c.updateClient(control);
+            }
+        }
+    }
 
     public boolean isDisableAsyncDispatch() {
         return disableAsyncDispatch;
@@ -410,4 +455,32 @@
 	public BrokerService getBrokerService() {
 		return brokerService;
 	}
+
+    /**
+     * @return the updateClusterClients
+     */
+    public boolean isUpdateClusterClients() {
+        return this.updateClusterClients;
+    }
+
+    /**
+     * @param updateClusterClients the updateClusterClients to set
+     */
+    public void setUpdateClusterClients(boolean updateClusterClients) {
+        this.updateClusterClients = updateClusterClients;
+    }
+
+    /**
+     * @return the rebalanceClusterClients
+     */
+    public boolean isRebalanceClusterClients() {
+        return this.rebalanceClusterClients;
+    }
+
+    /**
+     * @param rebalanceClusterClients the rebalanceClusterClients to set
+     */
+    public void setRebalanceClusterClients(boolean rebalanceClusterClients) {
+        this.rebalanceClusterClients = rebalanceClusterClients;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Mon Mar  8 12:48:45 2010
@@ -21,6 +21,7 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -34,6 +35,7 @@
 import org.apache.activemq.broker.ConsumerBrokerExchange;
 import org.apache.activemq.broker.EmptyBroker;
 import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -585,12 +587,14 @@
     @Override
     public synchronized void addBroker(Connection connection, BrokerInfo info) {
         brokerInfos.add(info);
+        updateClients();
     }
 
     @Override
     public synchronized void removeBroker(Connection connection, BrokerInfo info) {
         if (info != null) {
             brokerInfos.remove(info);
+            updateClients();
         }
     }
 
@@ -830,4 +834,11 @@
             LOG.warn("unmatched destination: " + destination + ", in consumerControl: "  + control);
         }
     }
+    
+    protected void updateClients() {
+        List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
+        for (TransportConnector connector : connectors) {
+            connector.updateClientClusterInfo();
+        }
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java Mon Mar  8 12:48:45 2010
@@ -17,7 +17,6 @@
 package org.apache.activemq.command;
 
 import java.util.Map;
-
 import org.apache.activemq.util.IntrospectionSupport;
 
 
@@ -61,6 +60,7 @@
         this.responseRequired = responseRequired;
     }
 
+    @Override
     public String toString() {
         return toString(null);
     }
@@ -104,6 +104,10 @@
     public boolean isShutdownInfo() {
         return false;
     }
+    
+    public boolean isConnectionControl() {
+        return false;
+    }
 
     /**
      * The endpoint within the transport where this message came from.

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java Mon Mar  8 12:48:45 2010
@@ -16,13 +16,12 @@
  */
 package org.apache.activemq.command;
 
-import org.apache.activemq.plugin.StatisticsBrokerPlugin;
+import java.io.IOException;
+import java.util.Properties;
 import org.apache.activemq.state.CommandVisitor;
 import org.apache.activemq.util.MarshallingSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import java.io.IOException;
-import java.util.Properties;
 
 /**
  * When a client connects to a broker, the broker send the client a BrokerInfo
@@ -49,7 +48,30 @@
     long connectionId;
     String brokerUploadUrl;
     String networkProperties;
+    
+    public BrokerInfo copy() {
+        BrokerInfo copy = new BrokerInfo();
+        copy(copy);
+        return copy;
+    }
+    
+    private void copy(BrokerInfo copy) {
+        super.copy(copy);
+        copy.brokerId = this.brokerId;
+        copy.brokerURL = this.brokerURL;
+        copy.slaveBroker = this.slaveBroker;
+        copy.masterBroker = this.masterBroker;
+        copy.faultTolerantConfiguration = this.faultTolerantConfiguration;
+        copy.networkConnection = this.networkConnection;
+        copy.duplexConnection = this.duplexConnection;
+        copy.peerBrokerInfos = this.peerBrokerInfos;
+        copy.brokerName = this.brokerName;
+        copy.connectionId = this.connectionId;
+        copy.brokerUploadUrl = this.brokerUploadUrl;
+        copy.networkProperties = this.networkProperties;
+    } 
 
+    @Override
     public boolean isBrokerInfo() {
         return true;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Command.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Command.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Command.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Command.java Mon Mar  8 12:48:45 2010
@@ -52,6 +52,8 @@
     boolean isMessageDispatchNotification();
 
     boolean isShutdownInfo();
+    
+    boolean isConnectionControl();
 
     Response visit(CommandVisitor visitor) throws Exception;
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java Mon Mar  8 12:48:45 2010
@@ -31,6 +31,9 @@
     protected boolean close;
     protected boolean exit;
     protected boolean faultTolerant;
+    protected String connectedBrokers="";
+    protected String reconnectTo = "";
+    protected boolean rebalanceConnection;
 
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
@@ -39,6 +42,10 @@
     public Response visit(CommandVisitor visitor) throws Exception {
         return visitor.processConnectionControl(this);
     }
+    @Override
+    public boolean isConnectionControl() {
+        return true;
+    }
 
     /**
      * @openwire:property version=1
@@ -114,4 +121,49 @@
     public void setSuspend(boolean suspend) {
         this.suspend = suspend;
     }
+
+    /**
+     * @openwire:property version=6 cache=false
+     * @return connected brokers.
+     */
+    public String getConnectedBrokers() {
+        return this.connectedBrokers;
+    }
+
+    /**
+     * @param connectedBrokers the connectedBrokers to set
+     */
+    public void setConnectedBrokers(String connectedBrokers) {
+        this.connectedBrokers = connectedBrokers;
+    }
+
+    /**
+     *  @openwire:property version=6 cache=false
+     * @return the reconnectTo
+     */
+    public String getReconnectTo() {
+        return this.reconnectTo;
+    }
+
+    /**
+     * @param reconnectTo the reconnectTo to set
+     */
+    public void setReconnectTo(String reconnectTo) {
+        this.reconnectTo = reconnectTo;
+    }
+
+    /**
+     * @return the rebalanceConnection
+     *  @openwire:property version=6 cache=false
+     */
+    public boolean isRebalanceConnection() {
+        return this.rebalanceConnection;
+    }
+
+    /**
+     * @param rebalanceConnection the rebalanceConnection to set
+     */
+    public void setRebalanceConnection(boolean rebalanceConnection) {
+        this.rebalanceConnection = rebalanceConnection;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java Mon Mar  8 12:48:45 2010
@@ -35,6 +35,7 @@
     protected boolean brokerMasterConnector;
     protected boolean manageable;
     protected boolean clientMaster = true;
+    protected boolean faultTolerant = false;
     protected transient Object transportContext;
 
     public ConnectionInfo() {
@@ -65,6 +66,7 @@
         copy.manageable = manageable;
         copy.clientMaster = clientMaster;
         copy.transportContext = transportContext;
+        copy.faultTolerant= faultTolerant;
     }
 
     /**
@@ -199,4 +201,19 @@
         this.clientMaster = clientMaster;
     }
 
+    /**
+     * @openwire:property version=6 cache=false
+     * @return the faultTolerant
+     */
+    public boolean isFaultTolerant() {
+        return this.faultTolerant;
+    }
+
+    /**
+     * @param faultTolerant the faultTolerant to set
+     */
+    public void setFaultTolerant(boolean faultTolerant) {
+        this.faultTolerant = faultTolerant;
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java Mon Mar  8 12:48:45 2010
@@ -117,6 +117,10 @@
     public boolean isShutdownInfo() {
         return false;
     }
+    
+    public boolean isConnectionControl() {
+        return false;
+    }
 
     public void setResponseRequired(boolean responseRequired) {
     }
@@ -135,7 +139,6 @@
             size = data.length;
         }
         return "PartialCommand[id: " + commandId + " data: " + size + " byte(s)]";
-    }
-    
+    }   
     
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/RemoveSubscriptionInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/RemoveSubscriptionInfo.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/RemoveSubscriptionInfo.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/RemoveSubscriptionInfo.java Mon Mar  8 12:48:45 2010
@@ -47,14 +47,12 @@
 
     /**
      * @openwire:property version=1
-     * @deprecated
      */
     public String getSubcriptionName() {
         return subscriptionName;
     }
 
     /**
-     * @deprecated
      */
     public void setSubcriptionName(String subscriptionName) {
         this.subscriptionName = subscriptionName;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java Mon Mar  8 12:48:45 2010
@@ -76,7 +76,6 @@
 
     /**
      * @openwire:property version=1
-     * @deprecated
      */
     public String getSubcriptionName() {
         return subscriptionName;
@@ -84,7 +83,6 @@
 
     /**
      * @param subscriptionName *
-     * @deprecated
      */
     public void setSubcriptionName(String subscriptionName) {
         this.subscriptionName = subscriptionName;
@@ -102,16 +100,19 @@
         return false;
     }
 
+    @Override
     public String toString() {
         return IntrospectionSupport.toString(this);
     }
 
+    @Override
     public int hashCode() {
         int h1 = clientId != null ? clientId.hashCode() : -1;
         int h2 = subscriptionName != null ? subscriptionName.hashCode() : -1;
         return h1 ^ h2;
     }
 
+    @Override
     public boolean equals(Object obj) {
         boolean result = false;
         if (obj instanceof SubscriptionInfo) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java Mon Mar  8 12:48:45 2010
@@ -23,7 +23,6 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-
 import org.apache.activemq.state.CommandVisitor;
 import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.activemq.util.ByteArrayOutputStream;
@@ -290,6 +289,7 @@
         return visitor.processWireFormat(this);
     }
 
+    @Override
     public String toString() {
         Map<String, Object> p = null;
         try {
@@ -356,6 +356,10 @@
     public boolean isShutdownInfo() {
         return false;
     }
+    
+    public boolean isConnectionControl() {
+        return false;
+    }
 
     public void setCachedMarshalledForm(WireFormat wireFormat, ByteSequence data) {
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Mon Mar  8 12:48:45 2010
@@ -30,7 +30,6 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.activemq.Service;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerService;
@@ -131,7 +130,7 @@
     private BrokerInfo localBrokerInfo;
     private BrokerInfo remoteBrokerInfo;
 
-    private AtomicBoolean started = new AtomicBoolean();
+    private final AtomicBoolean started = new AtomicBoolean();
     private TransportConnection duplexInitiatingConnection;
     private BrokerService brokerService = null;
 
@@ -153,11 +152,13 @@
         if (started.compareAndSet(false, true)) {
             localBroker.setTransportListener(new DefaultTransportListener() {
 
+                @Override
                 public void onCommand(Object o) {
                     Command command = (Command) o;
                     serviceLocalCommand(command);
                 }
 
+                @Override
                 public void onException(IOException error) {
                     serviceLocalException(error);
                 }
@@ -318,6 +319,7 @@
                 if (!isCreatedByDuplex()) {
                     BrokerInfo brokerInfo = new BrokerInfo();
                     brokerInfo.setBrokerName(configuration.getBrokerName());
+                    brokerInfo.setBrokerURL(configuration.getBrokerURL());
                     brokerInfo.setNetworkConnection(true);
                     brokerInfo.setDuplexConnection(configuration.isDuplex());
                     // set our properties

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java Mon Mar  8 12:48:45 2010
@@ -17,7 +17,6 @@
 package org.apache.activemq.network;
 
 import java.util.List;
-
 import org.apache.activemq.command.ActiveMQDestination;
 
 /**
@@ -36,6 +35,7 @@
     private int prefetchSize = 1000;
     private int networkTTL = 1;
     private String brokerName = "localhost";
+    private String brokerURL = "";
     private String userName;
     private String password;
     private String destinationFilter = ">";
@@ -274,4 +274,18 @@
     public void setSuppressDuplicateQueueSubscriptions(boolean val) {
         suppressDuplicateQueueSubscriptions = val;
     }
+
+    /**
+     * @return the brokerURL
+     */
+    public String getBrokerURL() {
+        return this.brokerURL;
+    }
+
+    /**
+     * @param brokerURL the brokerURL to set
+     */
+    public void setBrokerURL(String brokerURL) {
+        this.brokerURL = brokerURL;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionControlMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionControlMarshaller.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionControlMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionControlMarshaller.java Mon Mar  8 12:48:45 2010
@@ -71,6 +71,9 @@
         info.setFaultTolerant(bs.readBoolean());
         info.setResume(bs.readBoolean());
         info.setSuspend(bs.readBoolean());
+        info.setConnectedBrokers(tightUnmarshalString(dataIn, bs));
+        info.setReconnectTo(tightUnmarshalString(dataIn, bs));
+        info.setRebalanceConnection(bs.readBoolean());
 
     }
 
@@ -88,6 +91,9 @@
         bs.writeBoolean(info.isFaultTolerant());
         bs.writeBoolean(info.isResume());
         bs.writeBoolean(info.isSuspend());
+        rc += tightMarshalString1(info.getConnectedBrokers(), bs);
+        rc += tightMarshalString1(info.getReconnectTo(), bs);
+        bs.writeBoolean(info.isRebalanceConnection());
 
         return rc + 0;
     }
@@ -108,6 +114,9 @@
         bs.readBoolean();
         bs.readBoolean();
         bs.readBoolean();
+        tightMarshalString2(info.getConnectedBrokers(), dataOut, bs);
+        tightMarshalString2(info.getReconnectTo(), dataOut, bs);
+        bs.readBoolean();
 
     }
 
@@ -127,6 +136,9 @@
         info.setFaultTolerant(dataIn.readBoolean());
         info.setResume(dataIn.readBoolean());
         info.setSuspend(dataIn.readBoolean());
+        info.setConnectedBrokers(looseUnmarshalString(dataIn));
+        info.setReconnectTo(looseUnmarshalString(dataIn));
+        info.setRebalanceConnection(dataIn.readBoolean());
 
     }
 
@@ -144,6 +156,9 @@
         dataOut.writeBoolean(info.isFaultTolerant());
         dataOut.writeBoolean(info.isResume());
         dataOut.writeBoolean(info.isSuspend());
+        looseMarshalString(info.getConnectedBrokers(), dataOut);
+        looseMarshalString(info.getReconnectTo(), dataOut);
+        dataOut.writeBoolean(info.isRebalanceConnection());
 
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java Mon Mar  8 12:48:45 2010
@@ -85,6 +85,7 @@
         info.setBrokerMasterConnector(bs.readBoolean());
         info.setManageable(bs.readBoolean());
         info.setClientMaster(bs.readBoolean());
+        info.setFaultTolerant(bs.readBoolean());
 
     }
 
@@ -105,6 +106,7 @@
         bs.writeBoolean(info.isBrokerMasterConnector());
         bs.writeBoolean(info.isManageable());
         bs.writeBoolean(info.isClientMaster());
+        bs.writeBoolean(info.isFaultTolerant());
 
         return rc + 0;
     }
@@ -128,6 +130,7 @@
         bs.readBoolean();
         bs.readBoolean();
         bs.readBoolean();
+        bs.readBoolean();
 
     }
 
@@ -161,6 +164,7 @@
         info.setBrokerMasterConnector(dataIn.readBoolean());
         info.setManageable(dataIn.readBoolean());
         info.setClientMaster(dataIn.readBoolean());
+        info.setFaultTolerant(dataIn.readBoolean());
 
     }
 
@@ -181,6 +185,7 @@
         dataOut.writeBoolean(info.isBrokerMasterConnector());
         dataOut.writeBoolean(info.isManageable());
         dataOut.writeBoolean(info.isClientMaster());
+        dataOut.writeBoolean(info.isFaultTolerant());
 
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnector.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnector.java Mon Mar  8 12:48:45 2010
@@ -21,7 +21,6 @@
 import java.net.URISyntaxException;
 import java.util.Iterator;
 import java.util.concurrent.CopyOnWriteArrayList;
-
 import org.apache.activemq.Service;
 import org.apache.activemq.transport.CompositeTransport;
 import org.apache.activemq.transport.Transport;
@@ -46,7 +45,7 @@
     private URI remote;
     private URI localUri;
     private String name;
-    private CopyOnWriteArrayList<ProxyConnection> connections = new CopyOnWriteArrayList<ProxyConnection>();
+    private final CopyOnWriteArrayList<ProxyConnection> connections = new CopyOnWriteArrayList<ProxyConnection>();
 
     public void start() throws Exception {
 
@@ -131,13 +130,14 @@
 
     private Transport createRemoteTransport() throws Exception {
         Transport transport = TransportFactory.compositeConnect(remote);
-        CompositeTransport ct = (CompositeTransport)transport.narrow(CompositeTransport.class);
+        CompositeTransport ct = transport.narrow(CompositeTransport.class);
         if (ct != null && localUri != null) {
-            ct.add(new URI[] {localUri});
+            ct.add(false,new URI[] {localUri});
         }
 
         // Add a transport filter so that can track the transport life cycle
         transport = new TransportFilter(transport) {
+            @Override
             public void stop() throws Exception {
                 LOG.info("Stopping proxy.");
                 super.stop();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/CompositeTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/CompositeTransport.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/CompositeTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/CompositeTransport.java Mon Mar  8 12:48:45 2010
@@ -19,6 +19,6 @@
 import java.net.URI;
 
 public interface CompositeTransport extends Transport {
-    void add(URI[] uris);
-    void remove(URI[] uris);
+    void add(boolean rebalance,URI[] uris);
+    void remove(boolean rebalance,URI[] uris);
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java Mon Mar  8 12:48:45 2010
@@ -18,7 +18,6 @@
 
 import java.io.IOException;
 import java.net.URI;
-
 import org.apache.activemq.Service;
 
 /**
@@ -148,11 +147,28 @@
     boolean isConnected();
     
     /**
+     * @return true if reconnect is supported
+     */
+    boolean isReconnectSupported();
+    
+    /**
+     * @return true if updating uris is supported
+     */
+    boolean isUpdateURIsSupported();
+    /**
      * reconnect to another location
      * @param uri
      * @throws IOException on failure of if not supported
      */
     void reconnect(URI uri) throws IOException;
+    
+    /**
+     * Provide a list of available alternative locations
+     * @param rebalance 
+     * @param uris
+     * @throws IOException
+     */
+    void updateURIs(boolean rebalance,URI[] uris) throws IOException;
 
     /**
      * Returns a counter which gets incremented as data is read from the transport.

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java Mon Mar  8 12:48:45 2010
@@ -45,7 +45,8 @@
 
     /**
      * @see org.apache.activemq.Service#start()
-     * @throws IOException if the next channel has not been set.
+     * @throws IOException
+     *             if the next channel has not been set.
      */
     public void start() throws Exception {
         if (next == null) {
@@ -75,6 +76,7 @@
         return next;
     }
 
+    @Override
     public String toString() {
         return next.toString();
     }
@@ -126,19 +128,31 @@
         return next.isFaultTolerant();
     }
 
-	public boolean isDisposed() {
-		return next.isDisposed();
-	}
-	
-	public boolean isConnected() {
+    public boolean isDisposed() {
+        return next.isDisposed();
+    }
+
+    public boolean isConnected() {
         return next.isConnected();
     }
 
-	public void reconnect(URI uri) throws IOException {
-		next.reconnect(uri);
-	}
+    public void reconnect(URI uri) throws IOException {
+        next.reconnect(uri);
+    }
 
     public int getReceiveCounter() {
         return next.getReceiveCounter();
     }
+
+    public boolean isReconnectSupported() {
+        return next.isReconnectSupported();
+    }
+
+    public boolean isUpdateURIsSupported() {
+        return next.isUpdateURIsSupported();
+    }
+
+    public void updateURIs(boolean rebalance,URI[] uris) throws IOException {
+        next.updateURIs(rebalance,uris);
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java Mon Mar  8 12:48:45 2010
@@ -18,7 +18,6 @@
 
 import java.io.IOException;
 import java.net.URI;
-
 import org.apache.activemq.util.ServiceSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -96,8 +95,9 @@
             try {
                 transportListener.onException(e);
             } catch (RuntimeException e2) {
-                // Handle any unexpected runtime exceptions by debug logging them.
-                LOG.debug("Unexpected runtime exception: "+e2, e2);
+                // Handle any unexpected runtime exceptions by debug logging
+                // them.
+                LOG.debug("Unexpected runtime exception: " + e2, e2);
             }
         }
     }
@@ -111,18 +111,28 @@
     public boolean isFaultTolerant() {
         return false;
     }
-    
-   
-	public void reconnect(URI uri) throws IOException {
-		throw new IOException("Not supported");
-	}
-	
-	public boolean isDisposed() {
-		return isStopped();
-	}
-	
-	public  boolean isConnected() {
-	    return isStarted();
-	}
+
+    public void reconnect(URI uri) throws IOException {
+        throw new IOException("Not supported");
+    }
+
+    public boolean isReconnectSupported() {
+        return false;
+    }
+
+    public boolean isUpdateURIsSupported() {
+        return false;
+    }
+    public void updateURIs(boolean reblance,URI[] uris) throws IOException {
+        throw new IOException("Not supported");
+    }
+
+    public boolean isDisposed() {
+        return isStopped();
+    }
+
+    public boolean isConnected() {
+        return isStarted();
+    }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java Mon Mar  8 12:48:45 2010
@@ -20,7 +20,6 @@
 import java.net.URISyntaxException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.activemq.command.DiscoveryEvent;
 import org.apache.activemq.transport.CompositeTransport;
 import org.apache.activemq.transport.TransportFilter;
@@ -50,6 +49,7 @@
         this.next = next;
     }
 
+    @Override
     public void start() throws Exception {
         if (discoveryAgent == null) {
             throw new IllegalStateException("discoveryAgent not configured");
@@ -61,6 +61,7 @@
         next.start();
     }
 
+    @Override
     public void stop() throws Exception {
         ServiceStopper ss = new ServiceStopper();
         ss.stop(discoveryAgent);
@@ -75,7 +76,7 @@
                 URI uri = new URI(url);
                 serviceURIs.put(event.getServiceName(), uri);
                 LOG.info("Adding new broker connection URL: " + uri);
-                next.add(new URI[] {URISupport.applyParameters(uri, parameters)});
+                next.add(false,new URI[] {URISupport.applyParameters(uri, parameters)});
             } catch (URISyntaxException e) {
                 LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
             }
@@ -85,7 +86,7 @@
     public void onServiceRemove(DiscoveryEvent event) {
         URI uri = serviceURIs.get(event.getServiceName());
         if (uri != null) {
-            next.remove(new URI[] {uri});
+            next.remove(false,new URI[] {uri});
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java Mon Mar  8 12:48:45 2010
@@ -20,12 +20,11 @@
 
 import java.io.IOException;
 import java.net.URI;
-
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.Transport;
 
 class BackupTransport extends DefaultTransportListener{
-	private FailoverTransport failoverTransport;
+	private final FailoverTransport failoverTransport;
 	private Transport transport;
 	private URI uri;
 	private boolean disposed;
@@ -33,10 +32,11 @@
 	BackupTransport(FailoverTransport ft){
 		this.failoverTransport=ft;
 	}
-	public void onException(IOException error) {
+	@Override
+    public void onException(IOException error) {
 		this.disposed=true;
 		if (failoverTransport!=null) {
-			this.failoverTransport.reconnect();
+			this.failoverTransport.reconnect(false);
 		}
 	}
 
@@ -62,11 +62,13 @@
 		this.disposed = disposed;
 	}
 	
-	public int hashCode() {
+	@Override
+    public int hashCode() {
 		return uri != null ? uri.hashCode():-1;
 	}
 	
-	public boolean equals(Object obj) {
+	@Override
+    public boolean equals(Object obj) {
 		if (obj instanceof BackupTransport) {
 			BackupTransport other = (BackupTransport) obj;
 			return uri== null && other.uri==null ||