You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/03/13 17:20:00 UTC

svn commit: r517753 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/command/ main/java/org/apache/activemq/network/ main/java/org/apache/activemq/openwire/v3/ main/java/org/apache/activemq/uti...

Author: rajdavies
Date: Tue Mar 13 09:19:58 2007
New Revision: 517753

URL: http://svn.apache.org/viewvc?view=rev&rev=517753
Log:
working towards a solution for http://issues.apache.org/activemq/browse/AMQ-920

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java
      - copied, changed from r516655, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/Bridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFailedListener.java   (with props)
Removed:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/Bridge.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/BrokerInfoMarshaller.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MessagePullMarshaller.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MarshallingSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/BrokerInfoTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/MessagePullTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsUsingTcpTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkUsingTcpTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkUsingTcpTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Tue Mar 13 09:19:58 2007
@@ -69,6 +69,7 @@
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.network.DemandForwardingBridge;
 import org.apache.activemq.network.NetworkBridgeConfiguration;
+import org.apache.activemq.network.NetworkBridgeFactory;
 import org.apache.activemq.security.MessageAuthorizationPolicy;
 import org.apache.activemq.state.CommandVisitor;
 import org.apache.activemq.state.ConsumerState;
@@ -80,6 +81,7 @@
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.MarshallingSupport;
 import org.apache.activemq.util.ServiceSupport;
@@ -871,6 +873,9 @@
                 if(masterBroker!=null){
                     masterBroker.stop();
                 }
+                if (duplexBridge != null) {
+                    duplexBridge.stop();
+                }
                 // If the transport has not failed yet,
                 // notify the peer that we are doing a normal shutdown.
                 if(transportException==null){
@@ -1081,9 +1086,10 @@
                 NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
                 IntrospectionSupport.setProperties(config,props,null);
                 config.setLocalBrokerName(broker.getBrokerName());
-                
-               
-            }catch(IOException e){
+                Transport localTransport = TransportFactory.connect(broker.getVmConnectorURI());
+                localTransport.start();
+                duplexBridge = NetworkBridgeFactory.createBridge(config,localTransport,transport);
+            }catch(Exception e){
                log.error("Creating duplex network bridge",e);
             }
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java Tue Mar 13 09:19:58 2007
@@ -39,6 +39,7 @@
     String brokerName;
     long connectionId;
     String brokerUploadUrl;
+    String networkProperties;
 
 
     public boolean isBrokerInfo(){
@@ -199,4 +200,21 @@
     public void setBrokerUploadUrl(String brokerUploadUrl) {
         this.brokerUploadUrl = brokerUploadUrl;
     }
+
+    
+    /**
+     *  @openwire:property version=3 cache=false
+     * @return the networkProperties
+     */
+    public String getNetworkProperties(){
+        return this.networkProperties;
+    }
+
+    
+    /**
+     * @param networkProperties the networkProperties to set
+     */
+    public void setNetworkProperties(String networkProperties){
+        this.networkProperties=networkProperties;
+    }    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java Tue Mar 13 09:19:58 2007
@@ -41,8 +41,8 @@
     protected final BrokerId remoteBrokerPath[] = new BrokerId[] { null };
     protected Object brokerInfoMutex = new Object();
 
-    public CompositeDemandForwardingBridge(Transport localBroker, Transport remoteBroker) {
-        super(localBroker, remoteBroker);
+    public CompositeDemandForwardingBridge(NetworkBridgeConfiguration configuration,Transport localBroker, Transport remoteBroker) {
+        super(configuration,localBroker, remoteBroker);
         remoteBrokerName = remoteBroker.toString();
     	remoteBrokerNameKnownLatch.countDown();
     }
@@ -102,7 +102,7 @@
     }
 
     protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
-        return new NetworkBridgeFilter(getFromBrokerId(info), networkTTL);
+        return new NetworkBridgeFilter(getFromBrokerId(info), configuration.getNetworkTTL());
     }
     
     protected BrokerId[] getRemoteBrokerPath(){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java Tue Mar 13 09:19:58 2007
@@ -42,8 +42,8 @@
      * @param localBroker
      * @param remoteBroker
      */
-    public ConduitBridge(Transport localBroker,Transport remoteBroker){
-        super(localBroker,remoteBroker);
+    public ConduitBridge(NetworkBridgeConfiguration configuration,Transport localBroker,Transport remoteBroker){
+        super(configuration,localBroker,remoteBroker);
     }
     
     protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException{

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java Tue Mar 13 09:19:58 2007
@@ -40,8 +40,8 @@
     protected Object brokerInfoMutex = new Object();
     protected BrokerId remoteBrokerId;
 
-    public DemandForwardingBridge(Transport localBroker,Transport remoteBroker){
-        super(localBroker, remoteBroker);
+    public DemandForwardingBridge(NetworkBridgeConfiguration configuration,Transport localBroker,Transport remoteBroker){
+        super(configuration,localBroker, remoteBroker);
     }
 
     protected void serviceRemoteBrokerInfo(Command command) throws IOException {
@@ -80,7 +80,7 @@
     }
     
     protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
-        return new NetworkBridgeFilter(remoteBrokerPath[0], networkTTL);
+        return new NetworkBridgeFilter(remoteBrokerPath[0], configuration.getNetworkTTL());
     }
     
     protected BrokerId[] getRemoteBrokerPath(){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Tue Mar 13 09:19:58 2007
@@ -53,13 +53,16 @@
 import org.apache.activemq.transport.TransportDisposedIOException;
 import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.LongSequenceGenerator;
+import org.apache.activemq.util.MarshallingSupport;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.util.ServiceSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import java.security.GeneralSecurityException;
+import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -69,7 +72,7 @@
  * 
  * @version $Revision$
  */
-public abstract class DemandForwardingBridgeSupport implements Bridge {
+public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
     protected static final Log log = LogFactory.getLog(DemandForwardingBridge.class);
     protected final Transport localBroker;
     protected final Transport remoteBroker;
@@ -79,15 +82,8 @@
     protected ConnectionInfo remoteConnectionInfo;
     protected SessionInfo localSessionInfo;
     protected ProducerInfo producerInfo;
-    protected String localBrokerName = "Unknown";
     protected String remoteBrokerName = "Unknown";
     protected String localClientId;
-    protected String userName;
-    protected String password;
-    protected int prefetchSize = 1000;
-    protected boolean dispatchAsync;
-    protected String destinationFilter = ">";
-    protected boolean bridgeTempDestinations = true;
     protected String name = "bridge";
     protected ConsumerInfo demandConsumerInfo;
     protected int demandConsumerDispatched;
@@ -104,14 +100,14 @@
     protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
     protected CountDownLatch startedLatch = new CountDownLatch(2);
     protected CountDownLatch remoteBrokerNameKnownLatch = new CountDownLatch(1);
-    protected boolean decreaseNetworkConsumerPriority;
-    protected int networkTTL = 1;
     protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
     protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
-    protected boolean duplex = false;
+    protected NetworkBridgeConfiguration configuration;
+    private NetworkBridgeFailedListener bridgeFailedListener;
 
     
-    public DemandForwardingBridgeSupport(final Transport localBroker, final Transport remoteBroker) {
+    public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
+        this.configuration=configuration;
         this.localBroker = localBroker;
         this.remoteBroker = remoteBroker;
     }
@@ -236,8 +232,8 @@
 	            localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
 	            localClientId="NC_"+remoteBrokerName+"_inbound"+name;
 	            localConnectionInfo.setClientId(localClientId);
-	            localConnectionInfo.setUserName(userName);
-	            localConnectionInfo.setPassword(password);
+	            localConnectionInfo.setUserName(configuration.getUserName());
+	            localConnectionInfo.setPassword(configuration.getPassword());
 	            localBroker.oneway(localConnectionInfo);
 	
 	            localSessionInfo=new SessionInfo(localConnectionInfo,1);
@@ -263,15 +259,20 @@
             	
                 remoteConnectionInfo=new ConnectionInfo();
                 remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
-                remoteConnectionInfo.setClientId("NC_"+localBrokerName+"_outbound"+name);
-                remoteConnectionInfo.setUserName(userName);
-                remoteConnectionInfo.setPassword(password);
+                remoteConnectionInfo.setClientId("NC_"+configuration.getLocalBrokerName()+"_outbound"+name);
+                remoteConnectionInfo.setUserName(configuration.getUserName());
+                remoteConnectionInfo.setPassword(configuration.getPassword());
                 remoteBroker.oneway(remoteConnectionInfo);
 
                 BrokerInfo brokerInfo=new BrokerInfo();
-                brokerInfo.setBrokerName(localBrokerName);
+                brokerInfo.setBrokerName(configuration.getLocalBrokerName());
                 brokerInfo.setNetworkConnection(true);
-                brokerInfo.setDuplexConnection(isDuplex());
+                brokerInfo.setDuplexConnection(configuration.isDuplex());
+                //set our properties
+                Properties props = new Properties();
+                IntrospectionSupport.getProperties(this,props,null); 
+                String str = MarshallingSupport.propertiesToString(props);
+                brokerInfo.setNetworkProperties(str);
                 remoteBroker.oneway(brokerInfo);
 
                 SessionInfo remoteSessionInfo=new SessionInfo(remoteConnectionInfo,1);
@@ -283,13 +284,13 @@
 
                 // Listen to consumer advisory messages on the remote broker to determine demand.
                 demandConsumerInfo=new ConsumerInfo(remoteSessionInfo,1);
-                demandConsumerInfo.setDispatchAsync(dispatchAsync);
-                String advisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX+destinationFilter;
-                if( bridgeTempDestinations ) {
+                demandConsumerInfo.setDispatchAsync(configuration.isDispatchAsync());
+                String advisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX+configuration.getDestinationFilter();
+                if( configuration.isBridgeTempDestinations() ) {
                 	advisoryTopic += ","+AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
                 }
                 demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
-                demandConsumerInfo.setPrefetchSize(prefetchSize);
+                demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
                 remoteBroker.oneway(demandConsumerInfo);                
                 startedLatch.countDown();
                 
@@ -302,7 +303,7 @@
     }
 
     public void stop() throws Exception{
-        log.debug(" stopping "+localBrokerName+" bridge to "+remoteBrokerName+" is disposed already ? "+disposed);
+        log.debug(" stopping "+configuration.getLocalBrokerName()+" bridge to "+remoteBrokerName+" is disposed already ? "+disposed);
         boolean wasDisposedAlready=disposed;
         if(!disposed){
             try{
@@ -320,13 +321,13 @@
             }
         }
         if(wasDisposedAlready){
-            log.debug(localBrokerName+" bridge to "+remoteBrokerName+" stopped");
+            log.debug(configuration.getLocalBrokerName()+" bridge to "+remoteBrokerName+" stopped");
         }else{
-            log.info(localBrokerName+" bridge to "+remoteBrokerName+" stopped");
+            log.info(configuration.getLocalBrokerName()+" bridge to "+remoteBrokerName+" stopped");
         }
     }
     
-    protected void serviceRemoteException(Throwable error){
+    public void serviceRemoteException(Throwable error){
         if(!disposed){
             if(error instanceof SecurityException||error instanceof GeneralSecurityException){
                 log.error("Network connection between "+localBroker+" and "+remoteBroker
@@ -342,6 +343,7 @@
                     ServiceSupport.dispose(DemandForwardingBridgeSupport.this);
                 }
             }.start();
+            fireBridgeFailed();
         }
     }
 
@@ -384,25 +386,26 @@
     }
 
     private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
+        final int networkTTL = configuration.getNetworkTTL();
         if(data.getClass()==ConsumerInfo.class){
             // Create a new local subscription
             ConsumerInfo info=(ConsumerInfo) data;
             BrokerId[] path=info.getBrokerPath();
             if((path!=null&&path.length>= networkTTL)){
                 if(log.isDebugEnabled())
-                    log.debug(localBrokerName + " Ignoring Subscription " + info + " restricted to " + networkTTL + " network hops only");
+                    log.debug(configuration.getLocalBrokerName() + " Ignoring Subscription " + info + " restricted to " + networkTTL + " network hops only");
                 return;
             }
             if(contains(info.getBrokerPath(),localBrokerPath[0])){
                 // Ignore this consumer as it's a consumer we locally sent to the broker.
                 if(log.isDebugEnabled())
-                    log.debug(localBrokerName  + " Ignoring sub " + info + " already routed through this broker once");
+                    log.debug(configuration.getLocalBrokerName()  + " Ignoring sub " + info + " already routed through this broker once");
                 return;
             }
             if (!isPermissableDestination(info.getDestination())){
                 //ignore if not in the permited or in the excluded list
                 if(log.isDebugEnabled())
-                    log.debug(localBrokerName  + " Ignoring sub " + info + " destination " + info.getDestination() + " is not permiited");
+                    log.debug(configuration.getLocalBrokerName()  + " Ignoring sub " + info + " destination " + info.getDestination() + " is not permiited");
                 return;
             }
             // Update the packet to show where it came from.
@@ -412,10 +415,10 @@
             if (sub != null){
                 addSubscription(sub);
                 if(log.isDebugEnabled())
-                    log.debug(localBrokerName + " Forwarding sub on "+localBroker+" from "+remoteBrokerName+" :  "+info);
+                    log.debug(configuration.getLocalBrokerName() + " Forwarding sub on "+localBroker+" from "+remoteBrokerName+" :  "+info);
             }else {
                 if(log.isDebugEnabled())
-                    log.debug(localBrokerName  + " Ignoring sub " + info + " already subscribed to matching destination");
+                    log.debug(configuration.getLocalBrokerName()  + " Ignoring sub " + info + " already subscribed to matching destination");
             }
         }
         else if (data.getClass()==DestinationInfo.class){
@@ -454,7 +457,7 @@
         }
     }
 
-    protected void serviceLocalException(Throwable error) {
+    public void serviceLocalException(Throwable error) {
     	if( !disposed ) {
 	        log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown due to a local error: "+error);
 	        log.debug("The local Exception was:"+error,error);
@@ -463,6 +466,7 @@
 	                ServiceSupport.dispose(DemandForwardingBridgeSupport.this);
 	        	}
 	        }.start();
+            fireBridgeFailed();
     	}
     }
 
@@ -507,7 +511,7 @@
                     if(sub!=null){
                         Message message= configureMessage(md);
                         if(trace)
-                            log.trace("bridging "+localBrokerName+" -> "+remoteBrokerName+": "+message);
+                            log.trace("bridging "+configuration.getLocalBrokerName()+" -> "+remoteBrokerName+": "+message);
                         
                         
                         
@@ -547,7 +551,7 @@
                 }else if(command.isBrokerInfo()){
                     serviceLocalBrokerInfo(command);
                 }else if(command.isShutdownInfo()){
-                    log.info(localBrokerName+" Shutting down");
+                    log.info(configuration.getLocalBrokerName()+" Shutting down");
                     // Don't shut down the whole connector if the remote side was interrupted.
                     // the local transport is just shutting down temporarily until the remote side
                     // is restored.
@@ -572,34 +576,6 @@
     }
 
     /**
-     * @return prefetch size
-     */
-    public int getPrefetchSize() {
-        return prefetchSize;
-    }
-
-    /**
-     * @param prefetchSize
-     */
-    public void setPrefetchSize(int prefetchSize) {
-        this.prefetchSize=prefetchSize;
-    }
-
-    /**
-     * @return true if dispatch async
-     */
-    public boolean isDispatchAsync() {
-        return dispatchAsync;
-    }
-
-    /**
-     * @param dispatchAsync
-     */
-    public void setDispatchAsync(boolean dispatchAsync) {
-        this.dispatchAsync=dispatchAsync;
-    }
-
-    /**
      * @return Returns the dynamicallyIncludedDestinations.
      */
     public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
@@ -656,21 +632,6 @@
     }
 
     /**
-     * @return Returns the localBrokerName.
-     */
-    public String getLocalBrokerName() {
-        return localBrokerName;
-    }
-
-    /**
-     * @param localBrokerName
-     *            The localBrokerName to set.
-     */
-    public void setLocalBrokerName(String localBrokerName) {
-        this.localBrokerName=localBrokerName;
-    }
-
-    /**
      * @return Returns the localBroker.
      */
     public Transport getLocalBroker() {
@@ -697,34 +658,6 @@
     public void setName(String name) {
         this.name=name;
     }
-
-    /**
-     * @return Returns the decreaseNetworkConsumerPriority.
-     */
-    public boolean isDecreaseNetworkConsumerPriority() {
-        return decreaseNetworkConsumerPriority;
-    }
-
-    /**
-     * @param decreaseNetworkConsumerPriority The decreaseNetworkConsumerPriority to set.
-     */
-    public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority) {
-        this.decreaseNetworkConsumerPriority=decreaseNetworkConsumerPriority;
-    }
-
-    /**
-     * @return Returns the networkTTL.
-     */
-    public int getNetworkTTL() {
-        return networkTTL;
-    }
-
-    /**
-     * @param networkTTL The networkTTL to set.
-     */
-    public void setNetworkTTL(int networkTTL) {
-        this.networkTTL=networkTTL;
-    }
   
     public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
         if(brokerPath!=null){
@@ -757,7 +690,7 @@
     protected boolean isPermissableDestination(ActiveMQDestination destination) {
     	
     	// Are we not bridging temp destinations?
-    	if( destination.isTemporary() && !bridgeTempDestinations )
+    	if( destination.isTemporary() && !configuration.isBridgeTempDestinations() )
     		return false;
     	
         DestinationFilter filter=DestinationFilter.parseFilter(destination);
@@ -814,7 +747,7 @@
         result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
                         .getNextSequenceId()));
         
-        if( decreaseNetworkConsumerPriority ) {
+        if( configuration.isDecreaseNetworkConsumerPriority() ) {
             byte priority=ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
             if(priority>Byte.MIN_VALUE&&info.getBrokerPath()!=null&&info.getBrokerPath().length>1){
                 // The longer the path to the consumer, the less it's consumer priority.
@@ -840,8 +773,8 @@
     }
 
     protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
-        sub.getLocalInfo().setDispatchAsync(dispatchAsync);
-        sub.getLocalInfo().setPrefetchSize(prefetchSize);
+        sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
+        sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize());
         subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(),sub);
         subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(),sub);
     
@@ -877,37 +810,16 @@
     protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException;
     
     protected abstract BrokerId[] getRemoteBrokerPath();
-
-	public String getPassword() {
-		return password;
-	}
-
-	public void setPassword(String password) {
-		this.password = password;
-	}
-
-	public String getUserName() {
-		return userName;
-	}
-
-	public void setUserName(String userName) {
-		this.userName = userName;
-	}
-
-	public boolean isBridgeTempDestinations() {
-		return bridgeTempDestinations;
-	}
-
-	public void setBridgeTempDestinations(boolean bridgeTempDestinations) {
-		this.bridgeTempDestinations = bridgeTempDestinations;
-	}
-
-    public boolean isDuplex(){
-        return this.duplex;
-    }
-
-    public void setDuplex(boolean duplex){
-        this.duplex=duplex;
-    }
+    
+    public void setNetworkBridgeFailedListener(NetworkBridgeFailedListener listener){
+        this.bridgeFailedListener=listener;  
+      }
+      
+      private void fireBridgeFailed() {
+          NetworkBridgeFailedListener l = this.bridgeFailedListener;
+          if (l!=null) {
+              l.bridgeFailed();
+          }
+      }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java Tue Mar 13 09:19:58 2007
@@ -60,7 +60,7 @@
     public void onServiceAdd(DiscoveryEvent event) {
     	
     	// Ignore events once we start stopping.
-    	if( isStopped() || isStopping() )
+    	if( serviceSupport.isStopped() || serviceSupport.isStopping() )
     		return;
     	
         String url = event.getServiceName();
@@ -106,7 +106,7 @@
                 return;
             }
 
-            Bridge bridge = createBridge(localTransport, remoteTransport, event);
+            NetworkBridge bridge = createBridge(localTransport, remoteTransport, event);
             bridges.put(uri, bridge);
             try {
                 bridge.start();
@@ -138,7 +138,7 @@
                 return;
             }
 
-            Bridge bridge = (Bridge) bridges.remove(uri);
+            NetworkBridge bridge = (NetworkBridge) bridges.remove(uri);
             if (bridge == null)
                 return;
 
@@ -158,17 +158,17 @@
         }
     }
 
-    protected void doStart() throws Exception {
+    protected void handleStart() throws Exception {
         if (discoveryAgent == null) {
             throw new IllegalStateException("You must configure the 'discoveryAgent' property");
         }
         this.discoveryAgent.start();
-        super.doStart();
+        super.handleStart();
     }
 
-    protected void doStop(ServiceStopper stopper) throws Exception {
+    protected void handleStop(ServiceStopper stopper) throws Exception {
         for (Iterator i = bridges.values().iterator(); i.hasNext();) {
-            Bridge bridge = (Bridge) i.next();
+            NetworkBridge bridge = (NetworkBridge) i.next();
             try {
                 bridge.stop();
             }
@@ -183,96 +183,31 @@
             stopper.onException(this, e);
         }
 
-        super.doStop(stopper);
+        super.handleStop(stopper);
     }
 
-    protected Bridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) {
-        DemandForwardingBridge result = null;
-        if (conduitSubscriptions) {
-            if (dynamicOnly) {
-                result = new ConduitBridge(localTransport, remoteTransport) {
-                	protected void serviceLocalException(Throwable error) {
-                		try {
-                			super.serviceLocalException(error);
-                		} finally {
-                			fireServiceFailed();
-                		}
-                	}
-                	protected void serviceRemoteException(Throwable error) {
-                		try {
-                    		super.serviceRemoteException(error);
-                		} finally {
-                			fireServiceFailed();
-                		}
-                	}
-                	public void fireServiceFailed() {
-                		if( !isStopped() ) {
-                            try {
-                                discoveryAgent.serviceFailed(event);
-                            } catch (IOException e) {
-                            }
-                		}
-                	}
-                };
-            }
-            else {
-                result = new DurableConduitBridge(localTransport, remoteTransport) {
-                	protected void serviceLocalException(Throwable error) {
-                		try {
-                			super.serviceLocalException(error);
-                		} finally {
-                			fireServiceFailed();
-                		}
-                	}
-                	protected void serviceRemoteException(Throwable error) {
-                		try {
-                    		super.serviceRemoteException(error);
-                		} finally {
-                			fireServiceFailed();
-                		}
-                	}
-                	public void fireServiceFailed() {
-                		if( !isStopped() ) {
-                            try {
-                                discoveryAgent.serviceFailed(event);
-                            } catch (IOException e) {
-                            }
-                		}
-                	}
-                };
-            }
-        }
-        else {
-            result = new DemandForwardingBridge(localTransport, remoteTransport) {            	
-            	protected void serviceLocalException(Throwable error) {
-            		try {
-            			super.serviceLocalException(error);
-            		} finally {
-            			fireServiceFailed();
-            		}
-            	}
-            	protected void serviceRemoteException(Throwable error) {
-            		try {
-                		super.serviceRemoteException(error);
-            		} finally {
-            			fireServiceFailed();
-            		}
-            	}
-            	public void fireServiceFailed() {
-            		if( !isStopped() ) {
-                        try {
-                            discoveryAgent.serviceFailed(event);
-                        } catch (IOException e) {
-                        }
-            		}
-            	}
-            };
-        }
+    protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) {
+        NetworkBridgeFailedListener listener = new NetworkBridgeFailedListener() {
+
+            public void bridgeFailed(){
+                if( !serviceSupport.isStopped() ) {
+                    try {
+                        discoveryAgent.serviceFailed(event);
+                    } catch (IOException e) {
+                    }
+                }
+                
+            }
+            
+        };
+        DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this,localTransport,remoteTransport,listener);
         return configureBridge(result);
     }
 
     protected String createName() {
         return discoveryAgent.toString();
     }
+
+   
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java Tue Mar 13 09:19:58 2007
@@ -37,12 +37,13 @@
 
     /**
      * Constructor
+     * @param configuration 
      * 
      * @param localBroker
      * @param remoteBroker
      */
-    public DurableConduitBridge(Transport localBroker,Transport remoteBroker){
-        super(localBroker,remoteBroker);
+    public DurableConduitBridge(NetworkBridgeConfiguration configuration,Transport localBroker,Transport remoteBroker){
+        super(configuration,localBroker,remoteBroker);
     }
 
     /**
@@ -92,7 +93,7 @@
     }
     
     protected String getSubscriberName(ActiveMQDestination dest){
-        String subscriberName = getLocalBrokerName()+"_"+dest.getPhysicalName();
+        String subscriberName = configuration.getLocalBrokerName()+"_"+dest.getPhysicalName();
         return subscriberName;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java Tue Mar 13 09:19:58 2007
@@ -19,6 +19,7 @@
 
 import java.io.IOException;
 
+import org.apache.activemq.Service;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.BrokerId;
@@ -52,7 +53,7 @@
  * 
  * @version $Revision$
  */
-public class ForwardingBridge implements Bridge {
+public class ForwardingBridge  implements Service{
     
     static final private Log log = LogFactory.getLog(ForwardingBridge.class);
 
@@ -76,6 +77,7 @@
     
     BrokerId localBrokerId;
     BrokerId remoteBrokerId;
+    private NetworkBridgeFailedListener bridgeFailedListener;
 
     public ForwardingBridge(Transport localBroker, Transport remoteBroker) {
         this.localBroker = localBroker;
@@ -179,7 +181,7 @@
         }
     }
     
-    protected void serviceRemoteException(IOException error) {
+    public void serviceRemoteException(Throwable error) {
         log.info("Unexpected remote exception: "+error);
         log.debug("Exception trace: ", error);
     }
@@ -206,9 +208,10 @@
         }
     }
 
-    protected void serviceLocalException(Throwable error) {
+    public void serviceLocalException(Throwable error) {
         log.info("Unexpected local exception: "+error);
         log.debug("Exception trace: ", error);
+        fireBridgeFailed();
     }    
     protected void serviceLocalCommand(Command command) {
         try {
@@ -318,5 +321,17 @@
     }
     public void setDestinationFilter(String destinationFilter) {
         this.destinationFilter = destinationFilter;
+    }
+
+   
+    public void setNetworkBridgeFailedListener(NetworkBridgeFailedListener listener){
+      this.bridgeFailedListener=listener;  
+    }
+    
+    private void fireBridgeFailed() {
+        NetworkBridgeFailedListener l = this.bridgeFailedListener;
+        if (l!=null) {
+            l.bridgeFailed();
+        }
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java Tue Mar 13 09:19:58 2007
@@ -94,7 +94,7 @@
     // Implementation methods
     // -------------------------------------------------------------------------
 
-    protected void doStart() throws Exception {
+    protected void handleStart() throws Exception {
         if (remoteTransport == null) {
             if (remoteURI == null) {
                 throw new IllegalArgumentException("You must specify the remoteURI property");
@@ -114,11 +114,11 @@
         remoteTransport.start();
         localTransport.start();
 
-        super.doStart();
+        super.handleStart();
     }
 
-    protected void doStop(ServiceStopper stopper) throws Exception {
-        super.doStop(stopper);
+    protected void handleStop(ServiceStopper stopper) throws Exception {
+        super.handleStop(stopper);
         if (bridge != null) {
             try {
                 bridge.stop();
@@ -150,7 +150,7 @@
     }
 
     protected DemandForwardingBridgeSupport createBridge(Transport local, Transport remote) {
-        return new CompositeDemandForwardingBridge(local, remote);
+        return new CompositeDemandForwardingBridge(this,local, remote);
     }
 
 }

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java (from r516655, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/Bridge.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java?view=diff&rev=517753&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/Bridge.java&r1=516655&p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/Bridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java Tue Mar 13 09:19:58 2007
@@ -25,6 +25,23 @@
  * 
  * @version $Revision: 1.1 $
  */
-public interface Bridge extends Service {
-
+public interface NetworkBridge extends Service {
+    
+    /**
+     * Service an exception
+     * @param error
+     */
+    public void serviceRemoteException(Throwable error);
+    
+    /**
+     * servicee an exception
+     * @param error
+     */
+    public void serviceLocalException(Throwable error);
+    
+    /**
+     * Set the NetworkBridgeFailedListener
+     * @param listener
+     */
+    public void setNetworkBridgeFailedListener(NetworkBridgeFailedListener listener);
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java?view=auto&rev=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java Tue Mar 13 09:19:58 2007
@@ -0,0 +1,224 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.network;
+
+/**
+ * Configuration for a NetworkBridge
+ * 
+ * @version $Revision: 1.1 $
+ */
+public class NetworkBridgeConfiguration{
+
+    private boolean conduitSubscriptions=true;
+    private boolean dynamicOnly=false;
+    private boolean dispatchAsync=true;
+    private boolean decreaseNetworkConsumerPriority=false;
+    private boolean duplex=false;
+    private boolean bridgeTempDestinations=true;
+    private int prefetchSize=1000;
+    private int networkTTL=1;
+    private String localBrokerName="Unknow";
+    private String userName;
+    private String password;
+    private String destinationFilter = ">";
+
+    /**
+     * @return the conduitSubscriptions
+     */
+    public boolean isConduitSubscriptions(){
+        return this.conduitSubscriptions;
+    }
+
+    /**
+     * @param conduitSubscriptions the conduitSubscriptions to set
+     */
+    public void setConduitSubscriptions(boolean conduitSubscriptions){
+        this.conduitSubscriptions=conduitSubscriptions;
+    }
+
+    /**
+     * @return the dynamicOnly
+     */
+    public boolean isDynamicOnly(){
+        return this.dynamicOnly;
+    }
+
+    /**
+     * @param dynamicOnly the dynamicOnly to set
+     */
+    public void setDynamicOnly(boolean dynamicOnly){
+        this.dynamicOnly=dynamicOnly;
+    }
+
+    
+    /**
+     * @return the bridgeTempDestinations
+     */
+    public boolean isBridgeTempDestinations(){
+        return this.bridgeTempDestinations;
+    }
+
+    
+    /**
+     * @param bridgeTempDestinations the bridgeTempDestinations to set
+     */
+    public void setBridgeTempDestinations(boolean bridgeTempDestinations){
+        this.bridgeTempDestinations=bridgeTempDestinations;
+    }
+
+    
+    /**
+     * @return the decreaseNetworkConsumerPriority
+     */
+    public boolean isDecreaseNetworkConsumerPriority(){
+        return this.decreaseNetworkConsumerPriority;
+    }
+
+    
+    /**
+     * @param decreaseNetworkConsumerPriority the decreaseNetworkConsumerPriority to set
+     */
+    public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority){
+        this.decreaseNetworkConsumerPriority=decreaseNetworkConsumerPriority;
+    }
+
+    
+    /**
+     * @return the dispatchAsync
+     */
+    public boolean isDispatchAsync(){
+        return this.dispatchAsync;
+    }
+
+    
+    /**
+     * @param dispatchAsync the dispatchAsync to set
+     */
+    public void setDispatchAsync(boolean dispatchAsync){
+        this.dispatchAsync=dispatchAsync;
+    }
+
+    
+    /**
+     * @return the duplex
+     */
+    public boolean isDuplex(){
+        return this.duplex;
+    }
+
+    
+    /**
+     * @param duplex the duplex to set
+     */
+    public void setDuplex(boolean duplex){
+        this.duplex=duplex;
+    }
+
+    
+    /**
+     * @return the localBrokerName
+     */
+    public String getLocalBrokerName(){
+        return this.localBrokerName;
+    }
+
+    
+    /**
+     * @param localBrokerName the localBrokerName to set
+     */
+    public void setLocalBrokerName(String localBrokerName){
+        this.localBrokerName=localBrokerName;
+    }
+
+    
+    /**
+     * @return the networkTTL
+     */
+    public int getNetworkTTL(){
+        return this.networkTTL;
+    }
+
+    
+    /**
+     * @param networkTTL the networkTTL to set
+     */
+    public void setNetworkTTL(int networkTTL){
+        this.networkTTL=networkTTL;
+    }
+
+    
+    /**
+     * @return the password
+     */
+    public String getPassword(){
+        return this.password;
+    }
+
+    
+    /**
+     * @param password the password to set
+     */
+    public void setPassword(String password){
+        this.password=password;
+    }
+
+    
+    /**
+     * @return the prefetchSize
+     */
+    public int getPrefetchSize(){
+        return this.prefetchSize;
+    }
+
+    
+    /**
+     * @param prefetchSize the prefetchSize to set
+     */
+    public void setPrefetchSize(int prefetchSize){
+        this.prefetchSize=prefetchSize;
+    }
+
+    
+    /**
+     * @return the userName
+     */
+    public String getUserName(){
+        return this.userName;
+    }
+
+    
+    /**
+     * @param userName the userName to set
+     */
+    public void setUserName(String userName){
+        this.userName=userName;
+    }
+
+    
+    /**
+     * @return the destinationFilter
+     */
+    public String getDestinationFilter(){
+        return this.destinationFilter;
+    }
+
+    
+    /**
+     * @param destinationFilter the destinationFilter to set
+     */
+    public void setDestinationFilter(String destinationFilter){
+        this.destinationFilter=destinationFilter;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java?view=auto&rev=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java Tue Mar 13 09:19:58 2007
@@ -0,0 +1,65 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.network;
+
+import org.apache.activemq.transport.Transport;
+
+/**
+ * Factory for network bridges
+ * 
+ * @version $Revision: 1.1 $
+ */
+public class NetworkBridgeFactory{
+
+    /**
+     * Create a network bridge
+     * 
+     * @param config
+     * @param localTransport
+     * @param remoteTransport
+     * @return the NetworkBridge
+     */
+    public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration config,Transport localTransport,
+            Transport remoteTransport){
+        return createBridge(config,localTransport,remoteTransport,null);
+    }
+
+    /**
+     * create a network bridge
+     * 
+     * @param configuration
+     * @param localTransport
+     * @param remoteTransport
+     * @param listener
+     * @return the NetworkBridge
+     */
+    public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration configuration,Transport localTransport,
+            Transport remoteTransport,NetworkBridgeFailedListener listener){
+        DemandForwardingBridge result=null;
+        if(configuration.isConduitSubscriptions()){
+            if(configuration.isDynamicOnly()){
+                result=new ConduitBridge(configuration,localTransport,remoteTransport);
+            }else{
+                result=new DurableConduitBridge(configuration,localTransport,remoteTransport);
+            }
+        }else{
+            result=new DemandForwardingBridge(configuration,localTransport,remoteTransport);
+        }
+        if(listener!=null){
+            result.setNetworkBridgeFailedListener(listener);
+        }
+        return result;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFailedListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFailedListener.java?view=auto&rev=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFailedListener.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFailedListener.java Tue Mar 13 09:19:58 2007
@@ -0,0 +1,34 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+
+
+/**
+ *called when a bridge fails
+ * 
+ * @version $Revision: 1.1 $
+ */
+public interface NetworkBridgeFailedListener{
+    
+    /**
+     * called when the transport fails
+     *
+     */
+    public void bridgeFailed();
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFailedListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java Tue Mar 13 09:19:58 2007
@@ -1,27 +1,26 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.network;
 
+import static org.apache.activemq.network.NetworkConnector.log;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.List;
 import java.util.Set;
-
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.activemq.Service;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
@@ -30,242 +29,167 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.util.concurrent.CopyOnWriteArrayList;
-
 /**
  * @version $Revision$
  */
-public abstract class NetworkConnector extends ServiceSupport {
+public abstract class NetworkConnector extends NetworkBridgeConfiguration implements Service{
 
-    protected static final Log log = LogFactory.getLog(NetworkConnector.class);
+    protected static final Log log=LogFactory.getLog(NetworkConnector.class);
     protected URI localURI;
-    private String brokerName = "localhost";
-
+    private String brokerName="localhost";
     private Set durableDestinations;
-    private List excludedDestinations = new CopyOnWriteArrayList();
-    private List dynamicallyIncludedDestinations = new CopyOnWriteArrayList();
-    private List staticallyIncludedDestinations = new CopyOnWriteArrayList();
-    protected boolean dynamicOnly = false;
-    protected boolean conduitSubscriptions = true;
-    private boolean decreaseNetworkConsumerPriority;
-    private int networkTTL = 1;
-    private String name = "bridge";
-    private int prefetchSize = 1000;
-    private boolean dispatchAsync = true;
-    private String userName;
-    private String password;
-    private boolean bridgeTempDestinations=true;
-    private boolean duplex = false;
-    
+    private List excludedDestinations=new CopyOnWriteArrayList();
+    private List dynamicallyIncludedDestinations=new CopyOnWriteArrayList();
+    private List staticallyIncludedDestinations=new CopyOnWriteArrayList();
+    private String name="bridge";
     protected ConnectionFilter connectionFilter;
+    protected ServiceSupport serviceSupport=new ServiceSupport(){
+
+        protected void doStart() throws Exception{
+           handleStart();
+        }
+
+        protected void doStop(ServiceStopper stopper) throws Exception{
+            handleStop(stopper);
+        }
+    };
 
-    public NetworkConnector() {
+    public NetworkConnector(){
     }
 
-    public NetworkConnector(URI localURI) {
-        this.localURI = localURI;
+    public NetworkConnector(URI localURI){
+        this.localURI=localURI;
     }
 
-    public URI getLocalUri() throws URISyntaxException {
+    public URI getLocalUri() throws URISyntaxException{
         return localURI;
     }
 
-    public void setLocalUri(URI localURI) {
-        this.localURI = localURI;
+    public void setLocalUri(URI localURI){
+        this.localURI=localURI;
     }
 
     /**
      * @return Returns the name.
      */
-    public String getName() {
-        if (name == null) {
-            name = createName();
+    public String getName(){
+        if(name==null){
+            name=createName();
         }
         return name;
     }
 
     /**
-     * @param name
-     *            The name to set.
+     * @param name The name to set.
      */
-    public void setName(String name) {
-        this.name = name;
+    public void setName(String name){
+        this.name=name;
     }
 
-    public String getBrokerName() {
+    public String getBrokerName(){
         return brokerName;
     }
 
     /**
-     * @param brokerName
-     *            The brokerName to set.
+     * @param brokerName The brokerName to set.
      */
-    public void setBrokerName(String brokerName) {
-        this.brokerName = brokerName;
+    public void setBrokerName(String brokerName){
+        this.brokerName=brokerName;
     }
 
     /**
      * @return Returns the durableDestinations.
      */
-    public Set getDurableDestinations() {
+    public Set getDurableDestinations(){
         return durableDestinations;
     }
 
     /**
-     * @param durableDestinations
-     *            The durableDestinations to set.
-     */
-    public void setDurableDestinations(Set durableDestinations) {
-        this.durableDestinations = durableDestinations;
-    }
-
-    /**
-     * @return Returns the dynamicOnly.
-     */
-    public boolean isDynamicOnly() {
-        return dynamicOnly;
-    }
-
-    /**
-     * @param dynamicOnly
-     *            The dynamicOnly to set.
-     */
-    public void setDynamicOnly(boolean dynamicOnly) {
-        this.dynamicOnly = dynamicOnly;
-    }
-
-    /**
-     * @return Returns the conduitSubscriptions.
-     */
-    public boolean isConduitSubscriptions() {
-        return conduitSubscriptions;
-    }
-
-    /**
-     * @param conduitSubscriptions
-     *            The conduitSubscriptions to set.
-     */
-    public void setConduitSubscriptions(boolean conduitSubscriptions) {
-        this.conduitSubscriptions = conduitSubscriptions;
-    }
-
-    /**
-     * @return Returns the decreaseNetworkConsumerPriority.
-     */
-    public boolean isDecreaseNetworkConsumerPriority() {
-        return decreaseNetworkConsumerPriority;
-    }
-
-    /**
-     * @param decreaseNetworkConsumerPriority
-     *            The decreaseNetworkConsumerPriority to set.
-     */
-    public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority) {
-        this.decreaseNetworkConsumerPriority = decreaseNetworkConsumerPriority;
-    }
-
-    /**
-     * @return Returns the networkTTL.
-     */
-    public int getNetworkTTL() {
-        return networkTTL;
-    }
-
-    /**
-     * @param networkTTL
-     *            The networkTTL to set.
+     * @param durableDestinations The durableDestinations to set.
      */
-    public void setNetworkTTL(int networkTTL) {
-        this.networkTTL = networkTTL;
+    public void setDurableDestinations(Set durableDestinations){
+        this.durableDestinations=durableDestinations;
     }
 
     /**
      * @return Returns the excludedDestinations.
      */
-    public List getExcludedDestinations() {
+    public List getExcludedDestinations(){
         return excludedDestinations;
     }
 
     /**
-     * @param excludedDestinations
-     *            The excludedDestinations to set.
+     * @param excludedDestinations The excludedDestinations to set.
      */
-    public void setExcludedDestinations(List exludedDestinations) {
-        this.excludedDestinations = exludedDestinations;
+    public void setExcludedDestinations(List exludedDestinations){
+        this.excludedDestinations=exludedDestinations;
     }
 
-    public void addExcludedDestination(ActiveMQDestination destiantion) {
+    public void addExcludedDestination(ActiveMQDestination destiantion){
         this.excludedDestinations.add(destiantion);
     }
 
     /**
      * @return Returns the staticallyIncludedDestinations.
      */
-    public List getStaticallyIncludedDestinations() {
+    public List getStaticallyIncludedDestinations(){
         return staticallyIncludedDestinations;
     }
 
     /**
-     * @param staticallyIncludedDestinations
-     *            The staticallyIncludedDestinations to set.
+     * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set.
      */
-    public void setStaticallyIncludedDestinations(List staticallyIncludedDestinations) {
-        this.staticallyIncludedDestinations = staticallyIncludedDestinations;
+    public void setStaticallyIncludedDestinations(List staticallyIncludedDestinations){
+        this.staticallyIncludedDestinations=staticallyIncludedDestinations;
     }
 
-    public void addStaticallyIncludedDestination(ActiveMQDestination destiantion) {
+    public void addStaticallyIncludedDestination(ActiveMQDestination destiantion){
         this.staticallyIncludedDestinations.add(destiantion);
     }
 
     /**
      * @return Returns the dynamicallyIncludedDestinations.
      */
-    public List getDynamicallyIncludedDestinations() {
+    public List getDynamicallyIncludedDestinations(){
         return dynamicallyIncludedDestinations;
     }
 
     /**
-     * @param dynamicallyIncludedDestinations
-     *            The dynamicallyIncludedDestinations to set.
+     * @param dynamicallyIncludedDestinations The dynamicallyIncludedDestinations to set.
      */
-    public void setDynamicallyIncludedDestinations(List dynamicallyIncludedDestinations) {
-        this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
+    public void setDynamicallyIncludedDestinations(List dynamicallyIncludedDestinations){
+        this.dynamicallyIncludedDestinations=dynamicallyIncludedDestinations;
     }
 
-    public void addDynamicallyIncludedDestination(ActiveMQDestination destiantion) {
+    public void addDynamicallyIncludedDestination(ActiveMQDestination destiantion){
         this.dynamicallyIncludedDestinations.add(destiantion);
     }
+    
+    public ConnectionFilter getConnectionFilter(){
+        return connectionFilter;
+    }
+
+    public void setConnectionFilter(ConnectionFilter connectionFilter){
+        this.connectionFilter=connectionFilter;
+    }
+
 
     // Implementation methods
     // -------------------------------------------------------------------------
-    protected Bridge configureBridge(DemandForwardingBridgeSupport result) {
-        result.setLocalBrokerName(getBrokerName());
+    protected NetworkBridge configureBridge(DemandForwardingBridgeSupport result){
         result.setName(getBrokerName());
-        result.setNetworkTTL(getNetworkTTL());
-        result.setUserName(userName);
-        result.setPassword(password);
-        result.setPrefetchSize(prefetchSize);
-        result.setDispatchAsync(dispatchAsync);
-        result.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority());
-        result.setDuplex(isDuplex());
-
-        List destsList = getDynamicallyIncludedDestinations();
-        ActiveMQDestination dests[] = (ActiveMQDestination[]) destsList.toArray(new ActiveMQDestination[destsList.size()]);
+        List destsList=getDynamicallyIncludedDestinations();
+        ActiveMQDestination dests[]=(ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]);
         result.setDynamicallyIncludedDestinations(dests);
-
-        destsList = getExcludedDestinations();
-        dests = (ActiveMQDestination[]) destsList.toArray(new ActiveMQDestination[destsList.size()]);
+        destsList=getExcludedDestinations();
+        dests=(ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]);
         result.setExcludedDestinations(dests);
-
-        destsList = getStaticallyIncludedDestinations();
-        dests = (ActiveMQDestination[]) destsList.toArray(new ActiveMQDestination[destsList.size()]);
+        destsList=getStaticallyIncludedDestinations();
+        dests=(ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]);
         result.setStaticallyIncludedDestinations(dests);
-        
-        result.setBridgeTempDestinations(bridgeTempDestinations);
-
-        if (durableDestinations != null) {
-            ActiveMQDestination[] dest = new ActiveMQDestination[durableDestinations.size()];
-            dest = (ActiveMQDestination[]) durableDestinations.toArray(dest);
+        if(durableDestinations!=null){
+            ActiveMQDestination[] dest=new ActiveMQDestination[durableDestinations.size()];
+            dest=(ActiveMQDestination[])durableDestinations.toArray(dest);
             result.setDurableDestinations(dest);
         }
         return result;
@@ -273,82 +197,26 @@
 
     protected abstract String createName();
 
-    protected void doStart() throws Exception {
-        if (localURI == null) {
-            throw new IllegalStateException("You must configure the 'localURI' property");
-        }
-        log.info("Network Connector "+getName()+" Started");
-    }
-
-    protected void doStop(ServiceStopper stopper) throws Exception {
-        log.info("Network Connector "+getName()+" Stopped");
-    }
-
-    protected Transport createLocalTransport() throws Exception {
+    protected Transport createLocalTransport() throws Exception{
         return TransportFactory.connect(localURI);
     }
 
-    public boolean isDispatchAsync() {
-        return dispatchAsync;
-    }
-
-    public void setDispatchAsync(boolean dispatchAsync) {
-        this.dispatchAsync = dispatchAsync;
-    }
-
-    public int getPrefetchSize() {
-        return prefetchSize;
+    public void start() throws Exception{
+        serviceSupport.start();
     }
 
-    public void setPrefetchSize(int prefetchSize) {
-        this.prefetchSize = prefetchSize;
+    public void stop() throws Exception{
+        serviceSupport.stop();
     }
-
-    public ConnectionFilter getConnectionFilter() {
-        return connectionFilter;
-    }
-
-    public void setConnectionFilter(ConnectionFilter connectionFilter) {
-        this.connectionFilter = connectionFilter;
-    }
-
-	public String getPassword() {
-		return password;
-	}
-
-	public void setPassword(String password) {
-		this.password = password;
-	}
-
-	public String getUserName() {
-		return userName;
-	}
-
-	public void setUserName(String userName) {
-		this.userName = userName;
-	}
-
-	public boolean isBridgeTempDestinations() {
-		return bridgeTempDestinations;
-	}
-
-	public void setBridgeTempDestinations(boolean bridgeTempDestinations) {
-		this.bridgeTempDestinations = bridgeTempDestinations;
-	}
-
     
-    /**
-     * @return the duplex
-     */
-    public boolean isDuplex(){
-        return this.duplex;
+    protected void handleStart() throws Exception{
+        if(localURI==null){
+            throw new IllegalStateException("You must configure the 'localURI' property");
+        }
+        log.info("Network Connector "+getName()+" Started");
     }
 
-    
-    /**
-     * @param duplex the duplex to set
-     */
-    public void setDuplex(boolean duplex){
-        this.duplex=duplex;
+    protected void handleStop(ServiceStopper stopper) throws Exception{
+        log.info("Network Connector "+getName()+" Stopped");
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/BrokerInfoMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/BrokerInfoMarshaller.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/BrokerInfoMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/BrokerInfoMarshaller.java Tue Mar 13 09:19:58 2007
@@ -88,6 +88,7 @@
         info.setNetworkConnection(bs.readBoolean());
         info.setConnectionId(tightUnmarshalLong(wireFormat, dataIn, bs));
         info.setBrokerUploadUrl(tightUnmarshalString(dataIn, bs));
+        info.setNetworkProperties(tightUnmarshalString(dataIn, bs));
 
     }
 
@@ -111,6 +112,7 @@
         bs.writeBoolean(info.isNetworkConnection());
         rc+=tightMarshalLong1(wireFormat, info.getConnectionId(), bs);
         rc += tightMarshalString1(info.getBrokerUploadUrl(), bs);
+        rc += tightMarshalString1(info.getNetworkProperties(), bs);
 
         return rc + 0;
     }
@@ -137,6 +139,7 @@
         bs.readBoolean();
         tightMarshalLong2(wireFormat, info.getConnectionId(), dataOut, bs);
         tightMarshalString2(info.getBrokerUploadUrl(), dataOut, bs);
+        tightMarshalString2(info.getNetworkProperties(), dataOut, bs);
 
     }
 
@@ -173,6 +176,7 @@
         info.setNetworkConnection(dataIn.readBoolean());
         info.setConnectionId(looseUnmarshalLong(wireFormat, dataIn));
         info.setBrokerUploadUrl(looseUnmarshalString(dataIn));
+        info.setNetworkProperties(looseUnmarshalString(dataIn));
 
     }
 
@@ -196,6 +200,7 @@
         dataOut.writeBoolean(info.isNetworkConnection());
         looseMarshalLong(wireFormat, info.getConnectionId(), dataOut);
         looseMarshalString(info.getBrokerUploadUrl(), dataOut);
+        looseMarshalString(info.getNetworkProperties(), dataOut);
 
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MessagePullMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MessagePullMarshaller.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MessagePullMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MessagePullMarshaller.java Tue Mar 13 09:19:58 2007
@@ -69,6 +69,8 @@
         info.setConsumerId((org.apache.activemq.command.ConsumerId) tightUnmarsalCachedObject(wireFormat, dataIn, bs));
         info.setDestination((org.apache.activemq.command.ActiveMQDestination) tightUnmarsalCachedObject(wireFormat, dataIn, bs));
         info.setTimeout(tightUnmarshalLong(wireFormat, dataIn, bs));
+        info.setCorrelationId(tightUnmarshalString(dataIn, bs));
+        info.setMessageId((org.apache.activemq.command.MessageId) tightUnmarsalNestedObject(wireFormat, dataIn, bs));
 
     }
 
@@ -84,6 +86,8 @@
         rc += tightMarshalCachedObject1(wireFormat, (DataStructure)info.getConsumerId(), bs);
         rc += tightMarshalCachedObject1(wireFormat, (DataStructure)info.getDestination(), bs);
         rc+=tightMarshalLong1(wireFormat, info.getTimeout(), bs);
+        rc += tightMarshalString1(info.getCorrelationId(), bs);
+        rc += tightMarshalNestedObject1(wireFormat, (DataStructure)info.getMessageId(), bs);
 
         return rc + 0;
     }
@@ -102,6 +106,8 @@
         tightMarshalCachedObject2(wireFormat, (DataStructure)info.getConsumerId(), dataOut, bs);
         tightMarshalCachedObject2(wireFormat, (DataStructure)info.getDestination(), dataOut, bs);
         tightMarshalLong2(wireFormat, info.getTimeout(), dataOut, bs);
+        tightMarshalString2(info.getCorrelationId(), dataOut, bs);
+        tightMarshalNestedObject2(wireFormat, (DataStructure)info.getMessageId(), dataOut, bs);
 
     }
 
@@ -119,6 +125,8 @@
         info.setConsumerId((org.apache.activemq.command.ConsumerId) looseUnmarsalCachedObject(wireFormat, dataIn));
         info.setDestination((org.apache.activemq.command.ActiveMQDestination) looseUnmarsalCachedObject(wireFormat, dataIn));
         info.setTimeout(looseUnmarshalLong(wireFormat, dataIn));
+        info.setCorrelationId(looseUnmarshalString(dataIn));
+        info.setMessageId((org.apache.activemq.command.MessageId) looseUnmarsalNestedObject(wireFormat, dataIn));
 
     }
 
@@ -134,6 +142,8 @@
         looseMarshalCachedObject(wireFormat, (DataStructure)info.getConsumerId(), dataOut);
         looseMarshalCachedObject(wireFormat, (DataStructure)info.getDestination(), dataOut);
         looseMarshalLong(wireFormat, info.getTimeout(), dataOut);
+        looseMarshalString(info.getCorrelationId(), dataOut);
+        looseMarshalNestedObject(wireFormat, (DataStructure)info.getMessageId(), dataOut);
 
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MarshallingSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MarshallingSupport.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MarshallingSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MarshallingSupport.java Tue Mar 13 09:19:58 2007
@@ -22,12 +22,15 @@
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.StringWriter;
 import java.io.UTFDataFormatException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 /**
  * 
@@ -367,6 +370,25 @@
         } else {
             return null;
         }
+    }
+    
+    public static String propertiesToString(Properties props) throws IOException{
+        String result="";
+        if(props!=null){
+            DataByteArrayOutputStream dataOut=new DataByteArrayOutputStream();
+            props.store(dataOut,"");
+            result=new String(dataOut.getData(),0,dataOut.size());
+        }
+        return result;
+    }
+    
+    public static Properties stringToProperties(String str) throws IOException {
+        Properties result = new Properties();
+        if (str != null && str.length() > 0 ) {
+            DataByteArrayInputStream dataIn = new DataByteArrayInputStream(str.getBytes());
+            result.load(dataIn);
+        }
+        return result;
     }
 
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java Tue Mar 13 09:19:58 2007
@@ -125,9 +125,10 @@
 
     protected void setUp() throws Exception {
         super.setUp();
-        bridge = new DemandForwardingBridge(createTransport(), createRemoteTransport());
-        bridge.setLocalBrokerName("local");
-        bridge.setDispatchAsync(false);
+        NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
+        config.setLocalBrokerName("local");
+        config.setDispatchAsync(false);
+        bridge = new DemandForwardingBridge(config,createTransport(), createRemoteTransport()); 
         bridge.start();
         
         // PATCH: Give demand forwarding bridge a chance to finish setting up

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/BrokerInfoTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/BrokerInfoTest.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/BrokerInfoTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/BrokerInfoTest.java Tue Mar 13 09:19:58 2007
@@ -68,5 +68,6 @@
         info.setNetworkConnection(true);
         info.setConnectionId(1);
         info.setBrokerUploadUrl("BrokerUploadUrl:5");
+        info.setNetworkProperties("NetworkProperties:6");
     }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/MessagePullTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/MessagePullTest.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/MessagePullTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/MessagePullTest.java Tue Mar 13 09:19:58 2007
@@ -54,5 +54,7 @@
         info.setConsumerId(createConsumerId("ConsumerId:1"));
         info.setDestination(createActiveMQDestination("Destination:2"));
         info.setTimeout(1);
+        info.setCorrelationId("CorrelationId:3");
+        info.setMessageId(createMessageId("MessageId:4"));
     }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsUsingTcpTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsUsingTcpTest.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsUsingTcpTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsUsingTcpTest.java Tue Mar 13 09:19:58 2007
@@ -20,6 +20,7 @@
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.network.DemandForwardingBridge;
+import org.apache.activemq.network.NetworkBridgeConfiguration;
 import org.apache.activemq.transport.TransportFactory;
 
 import java.util.List;
@@ -56,9 +57,10 @@
 
             // Ensure that we are connecting using tcp
             if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
-                DemandForwardingBridge bridge = new DemandForwardingBridge(TransportFactory.connect(localURI),
+                NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
+                config.setLocalBrokerName(localBroker.getBrokerName());
+                DemandForwardingBridge bridge = new DemandForwardingBridge(config,TransportFactory.connect(localURI),
                                                                            TransportFactory.connect(remoteURI));
-                bridge.setLocalBrokerName(localBroker.getBrokerName());
                 bridges.add(bridge);
 
                 bridge.start();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkUsingTcpTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkUsingTcpTest.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkUsingTcpTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkUsingTcpTest.java Tue Mar 13 09:19:58 2007
@@ -20,6 +20,7 @@
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.network.DemandForwardingBridge;
+import org.apache.activemq.network.NetworkBridgeConfiguration;
 import org.apache.activemq.transport.TransportFactory;
 
 import java.util.List;
@@ -43,9 +44,10 @@
 
             // Ensure that we are connecting using tcp
             if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
-                DemandForwardingBridge bridge = new DemandForwardingBridge(TransportFactory.connect(localURI),
+                NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
+                config.setLocalBrokerName(localBroker.getBrokerName());
+                DemandForwardingBridge bridge = new DemandForwardingBridge(config,TransportFactory.connect(localURI),
                                                                            TransportFactory.connect(remoteURI));
-                bridge.setLocalBrokerName(localBroker.getBrokerName());
                 bridges.add(bridge);
 
                 bridge.start();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkUsingTcpTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkUsingTcpTest.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkUsingTcpTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkUsingTcpTest.java Tue Mar 13 09:19:58 2007
@@ -20,6 +20,7 @@
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.network.DemandForwardingBridge;
+import org.apache.activemq.network.NetworkBridgeConfiguration;
 import org.apache.activemq.transport.TransportFactory;
 
 import java.util.List;
@@ -43,9 +44,10 @@
 
             // Ensure that we are connecting using tcp
             if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
-                DemandForwardingBridge bridge = new DemandForwardingBridge(TransportFactory.connect(localURI),
+                NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
+                config.setLocalBrokerName(localBroker.getBrokerName());
+                DemandForwardingBridge bridge = new DemandForwardingBridge(config,TransportFactory.connect(localURI),
                                                                            TransportFactory.connect(remoteURI));
-                bridge.setLocalBrokerName(localBroker.getBrokerName());
                 bridges.add(bridge);
 
                 bridge.start();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java?view=diff&rev=517753&r1=517752&r2=517753
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java Tue Mar 13 09:19:58 2007
@@ -20,6 +20,7 @@
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.network.DemandForwardingBridge;
+import org.apache.activemq.network.NetworkBridgeConfiguration;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.JmsMultipleBrokersTestSupport;
 import org.apache.activemq.command.Command;
@@ -115,7 +116,9 @@
 
             // Ensure that we are connecting using tcp
             if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
-                DemandForwardingBridge bridge = new DemandForwardingBridge(TransportFactory.connect(localURI),
+                NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
+                config.setLocalBrokerName(localBroker.getBrokerName());
+                DemandForwardingBridge bridge = new DemandForwardingBridge(config,TransportFactory.connect(localURI),
                                                                            TransportFactory.connect(remoteURI)) {
                     protected void serviceLocalCommand(Command command) {
                         if (command.isMessageDispatch()) {
@@ -126,7 +129,6 @@
                         super.serviceLocalCommand(command);
                     }
                 };
-                bridge.setLocalBrokerName(localBroker.getBrokerName());
                 bridges.add(bridge);
 
                 bridge.start();