You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/06/28 06:34:59 UTC

svn commit: r551443 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/ broker/jmx/ network/

Author: chirino
Date: Wed Jun 27 21:34:57 2007
New Revision: 551443

URL: http://svn.apache.org/viewvc?view=rev&rev=551443
Log:
Display the establised Neteowork Connector Bridges via JMX - https://issues.apache.org/activemq/browse/AMQ-1299

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeViewMBean.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java
Removed:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFailedListener.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?view=diff&rev=551443&r1=551442&r2=551443
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Wed Jun 27 21:34:57 2007
@@ -278,6 +278,7 @@
      * network
      */
     public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
+    	connector.setBrokerService(this);
         URI uri = getVmConnectorURI();
         HashMap map = new HashMap(URISupport.parseParamters(uri));
         map.put("network", "true");
@@ -1217,6 +1218,7 @@
             NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
             try {
                 ObjectName objectName = createNetworkConnectorObjectName(connector);
+                connector.setObjectName(objectName);
                 mbeanServer.registerMBean(view, objectName);
                 registeredMBeanNames.add(objectName);
             }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java?view=auto&rev=551443
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java Wed Jun 27 21:34:57 2007
@@ -0,0 +1,61 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import org.apache.activemq.network.NetworkBridge;
+
+public class NetworkBridgeView implements NetworkBridgeViewMBean {
+
+    private final NetworkBridge bridge;
+
+	public NetworkBridgeView(NetworkBridge bridge) {
+		this.bridge = bridge;
+    }
+    
+    public void start() throws Exception {
+    	bridge.start();
+    }
+
+    public void stop() throws Exception {
+    	bridge.stop();
+    }
+    
+    public String getLocalAddress() {
+    	return bridge.getLocalAddress();
+    }
+
+    public String getRemoteAddress() {
+    	return bridge.getRemoteAddress();
+    }
+
+    public String getRemoteBrokerName() {
+    	return bridge.getRemoteBrokerName();
+    }
+
+    public String getLocalBrokerName() {
+    	return bridge.getLocalBrokerName();
+    }
+
+    public long getEnqueueCounter() {
+    	return bridge.getEnqueueCounter();
+    }
+
+    public long getDequeueCounter() {
+    	return bridge.getDequeueCounter();
+    }
+
+}

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeViewMBean.java?view=auto&rev=551443
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeViewMBean.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeViewMBean.java Wed Jun 27 21:34:57 2007
@@ -0,0 +1,30 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import org.apache.activemq.Service;
+
+public interface NetworkBridgeViewMBean extends Service {
+	
+    public String getLocalAddress();
+    public String getRemoteAddress();
+    public String getRemoteBrokerName();
+    public String getLocalBrokerName();
+    public long getEnqueueCounter();
+    public long getDequeueCounter();
+
+}
\ No newline at end of file

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java?view=diff&rev=551443&r1=551442&r2=551443
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java Wed Jun 27 21:34:57 2007
@@ -32,6 +32,86 @@
     }
 
     public void stop() throws Exception {
-        connector.stop();
+        connector.stop();       
     }
+
+	public String getName() {
+		return connector.getName();
+	}
+
+	public int getNetworkTTL() {
+		return connector.getNetworkTTL();
+	}
+
+	public int getPrefetchSize() {
+		return connector.getPrefetchSize();
+	}
+
+	public String getUserName() {
+		return connector.getUserName();
+	}
+
+	public boolean isBridgeTempDestinations() {
+		return connector.isBridgeTempDestinations();
+	}
+
+	public boolean isConduitSubscriptions() {
+		return connector.isConduitSubscriptions();
+	}
+
+	public boolean isDecreaseNetworkConsumerPriority() {
+		return connector.isDecreaseNetworkConsumerPriority();
+	}
+
+	public boolean isDispatchAsync() {
+		return connector.isDispatchAsync();
+	}
+
+	public boolean isDynamicOnly() {
+		return connector.isDynamicOnly();
+	}
+
+	public void setBridgeTempDestinations(boolean bridgeTempDestinations) {
+		connector.setBridgeTempDestinations(bridgeTempDestinations);
+	}
+
+	public void setConduitSubscriptions(boolean conduitSubscriptions) {
+		connector.setConduitSubscriptions(conduitSubscriptions);
+	}
+
+	public void setDispatchAsync(boolean dispatchAsync) {
+		connector.setDispatchAsync(dispatchAsync);
+	}
+
+	public void setDynamicOnly(boolean dynamicOnly) {
+		connector.setDynamicOnly(dynamicOnly);
+	}
+
+	public void setNetworkTTL(int networkTTL) {
+		connector.setNetworkTTL(networkTTL);
+	}
+
+	public void setPassword(String password) {
+		connector.setPassword(password);
+	}
+
+	public void setPrefetchSize(int prefetchSize) {
+		connector.setPrefetchSize(prefetchSize);
+	}
+
+	public void setUserName(String userName) {
+		connector.setUserName(userName);
+	}
+
+	public String getPassword() {
+		String pw = connector.getPassword();
+		// Hide the password for security reasons.
+		if( pw!= null ) 
+			pw = pw.replaceAll(".", "*");
+		return pw;
+	}
+
+	public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority) {
+		connector.setDecreaseNetworkConsumerPriority(decreaseNetworkConsumerPriority);
+	}
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java?view=diff&rev=551443&r1=551442&r2=551443
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java Wed Jun 27 21:34:57 2007
@@ -21,4 +21,24 @@
 
 public interface NetworkConnectorViewMBean extends Service {
 
+      public String getName();
+      public int getNetworkTTL();
+      public int getPrefetchSize();
+      public String getUserName();
+      public boolean isBridgeTempDestinations();
+      public boolean isConduitSubscriptions();
+      public boolean isDecreaseNetworkConsumerPriority();
+      public boolean isDispatchAsync();
+      public boolean isDynamicOnly();
+      public void setBridgeTempDestinations(boolean bridgeTempDestinations);
+      public void setConduitSubscriptions(boolean conduitSubscriptions);
+      public void setDispatchAsync(boolean dispatchAsync);
+      public void setDynamicOnly(boolean dynamicOnly);
+      public void setNetworkTTL(int networkTTL);
+      public void setPassword(String password);
+      public void setPrefetchSize(int prefetchSize);
+      public void setUserName(String userName);
+      public String getPassword();
+      public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority);
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?view=diff&rev=551443&r1=551442&r2=551443
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Wed Jun 27 21:34:57 2007
@@ -18,6 +18,12 @@
 package org.apache.activemq.network;
 
 import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -61,12 +67,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.security.GeneralSecurityException;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 /**
  * A useful base class for implementing demand forwarding bridges.
  * 
@@ -102,9 +102,14 @@
     protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
     protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
     protected NetworkBridgeConfiguration configuration;
-    private NetworkBridgeFailedListener bridgeFailedListener;
+    private NetworkBridgeListener networkBridgeListener;
     private boolean createdByDuplex;
 
+    private BrokerInfo localBrokerInfo;
+    private BrokerInfo remoteBrokerInfo;
+
+    final AtomicLong enqueueCounter = new AtomicLong();
+    final AtomicLong dequeueCounter = new AtomicLong();
     
     public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
         this.configuration=configuration;
@@ -194,6 +199,12 @@
         }catch(IOException e){
             log.warn("Caught exception from remote start",e);
         }
+        
+        NetworkBridgeListener l = this.networkBridgeListener;
+        if (l!=null) {
+            l.onStart(this);
+        }
+
     }
 
     protected void triggerLocalStartBridge() throws IOException {
@@ -308,6 +319,11 @@
         log.debug(" stopping "+configuration.getBrokerName()+" bridge to "+remoteBrokerName+" is disposed already ? "+disposed);
         boolean wasDisposedAlready=disposed;
         if(!disposed){
+            NetworkBridgeListener l = this.networkBridgeListener;
+            if (l!=null) {
+                l.onStop(this);
+            }
+
             try{
                 disposed=true;
                 remoteBridgeStarted.set(false);
@@ -364,6 +380,8 @@
                 }else if(command.isBrokerInfo()){
                 	
                 	lastConnectSucceeded.set(true);
+                        remoteBrokerInfo = ((BrokerInfo)command);
+
                 	serviceRemoteBrokerInfo(command);
                 	// Let the local broker know the remote broker's ID.
                 	localBroker.oneway(command);
@@ -507,6 +525,7 @@
             final boolean trace=log.isTraceEnabled();
             try{
                 if(command.isMessageDispatch()){
+                	enqueueCounter.incrementAndGet();
                     waitStarted();
                     final MessageDispatch md=(MessageDispatch) command;
                     DemandSubscription sub=(DemandSubscription) subscriptionMapByLocalId.get(md.getConsumerId());
@@ -523,6 +542,7 @@
                             // by bridging it using an async send (small chance of message loss).
                             remoteBroker.oneway(message);
                             localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
+                            dequeueCounter.incrementAndGet();
                             
                         } else {
                             
@@ -537,6 +557,7 @@
                                             serviceLocalException(er.getException());
                                         } else {
                                             localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
+                                            dequeueCounter.incrementAndGet();
                                         }
                                     } catch (IOException e) {
                                         serviceLocalException(e);
@@ -551,6 +572,7 @@
                         if (trace)log.trace("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage());
                     } 
                 }else if(command.isBrokerInfo()){
+                	localBrokerInfo = ((BrokerInfo)command);
                     serviceLocalBrokerInfo(command);
                 }else if(command.isShutdownInfo()){
                     log.info(configuration.getBrokerName()+" Shutting down");
@@ -812,14 +834,39 @@
     
     protected abstract BrokerId[] getRemoteBrokerPath();
     
-    public void setNetworkBridgeFailedListener(NetworkBridgeFailedListener listener){
-        this.bridgeFailedListener=listener;  
+    public void setNetworkBridgeListener(NetworkBridgeListener listener){
+        this.networkBridgeListener=listener;  
       }
       
       private void fireBridgeFailed() {
-          NetworkBridgeFailedListener l = this.bridgeFailedListener;
+          NetworkBridgeListener l = this.networkBridgeListener;
           if (l!=null) {
               l.bridgeFailed();
           }
       }
+
+	public String getRemoteAddress() {
+ 		return remoteBroker.getRemoteAddress();
+ 	}
+ 
+ 	public String getLocalAddress() {
+ 		return localBroker.getRemoteAddress();
+ 	}
+ 
+ 	public String getRemoteBrokerName() {
+ 		return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
+ 	}
+ 	
+	public String getLocalBrokerName() {
+ 		return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
+	}
+
+ 	public long getDequeueCounter() {
+ 		return dequeueCounter.get();
+ 	}
+ 
+ 	public long getEnqueueCounter() {
+ 		return enqueueCounter.get();
+	}
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?view=diff&rev=551443&r1=551442&r2=551443
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java Wed Jun 27 21:34:57 2007
@@ -124,7 +124,7 @@
             }
         }
     }
-
+    
     public void onServiceRemove(DiscoveryEvent event) {
         String url = event.getServiceName();
         if (url != null) {
@@ -186,7 +186,7 @@
     }
 
     protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) {
-        NetworkBridgeFailedListener listener = new NetworkBridgeFailedListener() {
+        NetworkBridgeListener listener = new NetworkBridgeListener() {
 
             public void bridgeFailed(){
                 if( !serviceSupport.isStopped() ) {
@@ -197,6 +197,15 @@
                 }
                 
             }
+
+			public void onStart(NetworkBridge bridge) {
+				 registerNetworkBridgeMBean(bridge);
+			}
+
+			public void onStop(NetworkBridge bridge) {
+				unregisterNetworkBridgeMBean(bridge);
+			}
+
             
         };
         DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this,localTransport,remoteTransport,listener);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java?view=diff&rev=551443&r1=551442&r2=551443
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java Wed Jun 27 21:34:57 2007
@@ -18,6 +18,7 @@
 package org.apache.activemq.network;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.Service;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -46,6 +47,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+
 /**
  * Forwards all messages from the local broker to the remote broker.
  * 
@@ -74,7 +76,13 @@
     
     BrokerId localBrokerId;
     BrokerId remoteBrokerId;
-    private NetworkBridgeFailedListener bridgeFailedListener;
+    private NetworkBridgeListener bridgeFailedListener;
+
+	BrokerInfo localBrokerInfo;
+	BrokerInfo remoteBrokerInfo;
+	
+	final AtomicLong enqueueCounter = new AtomicLong();
+	final AtomicLong dequeueCounter = new AtomicLong();
 
     public ForwardingBridge(Transport localBroker, Transport remoteBroker) {
         this.localBroker = localBroker;
@@ -187,7 +195,8 @@
         try {
             if(command.isBrokerInfo() ) {
                 synchronized( this ) {
-                    remoteBrokerId = ((BrokerInfo)command).getBrokerId();
+                	remoteBrokerInfo = ((BrokerInfo)command);
+                    remoteBrokerId = remoteBrokerInfo.getBrokerId();
                     if( localBrokerId !=null) {
                         if( localBrokerId.equals(remoteBrokerId) ) {
                             log.info("Disconnecting loop back connection.");
@@ -213,6 +222,9 @@
     protected void serviceLocalCommand(Command command) {
         try {
             if( command.isMessageDispatch() ) {
+            	
+            	enqueueCounter.incrementAndGet();
+            	
                 final MessageDispatch md = (MessageDispatch) command;
                 Message message = md.getMessage();
                 message.setProducerId(producerInfo.getProducerId());
@@ -223,10 +235,10 @@
 
                 
                 if( !message.isResponseRequired() ) {
-                    
                     // If the message was originally sent using async send, we will preserve that QOS
                     // by bridging it using an async send (small chance of message loss).
                     remoteBroker.oneway(message);
+                	dequeueCounter.incrementAndGet();
                     localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
                     
                 } else {
@@ -241,6 +253,7 @@
                                     ExceptionResponse er=(ExceptionResponse) response;
                                     serviceLocalException(er.getException());
                                 } else {
+                                	dequeueCounter.incrementAndGet();
                                     localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
                                 }
                             } catch (IOException e) {
@@ -273,7 +286,8 @@
 //                }
             } else if(command.isBrokerInfo() ) {
                 synchronized( this ) {
-                    localBrokerId = ((BrokerInfo)command).getBrokerId();
+                	localBrokerInfo = ((BrokerInfo)command);
+                    localBrokerId = localBrokerInfo.getBrokerId();
                     if( remoteBrokerId !=null) {
                         if( remoteBrokerId.equals(localBrokerId) ) {
                             log.info("Disconnecting loop back connection.");
@@ -320,14 +334,39 @@
     }
 
    
-    public void setNetworkBridgeFailedListener(NetworkBridgeFailedListener listener){
+    public void setNetworkBridgeFailedListener(NetworkBridgeListener listener){
       this.bridgeFailedListener=listener;  
     }
     
     private void fireBridgeFailed() {
-        NetworkBridgeFailedListener l = this.bridgeFailedListener;
+        NetworkBridgeListener l = this.bridgeFailedListener;
         if (l!=null) {
             l.bridgeFailed();
         }
     }
+
+	public String getRemoteAddress() {
+		return remoteBroker.getRemoteAddress();
+	}
+
+	public String getLocalAddress() {
+		return localBroker.getRemoteAddress();
+	}
+
+	public String getLocalBrokerName() {
+		return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
+	}
+
+	public String getRemoteBrokerName() {
+		return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
+	}
+	
+	public long getDequeueCounter() {
+		return dequeueCounter.get();
+	}
+
+	public long getEnqueueCounter() {
+		return enqueueCounter.get();
+	}
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java?view=diff&rev=551443&r1=551442&r2=551443
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java Wed Jun 27 21:34:57 2007
@@ -43,5 +43,18 @@
      * Set the NetworkBridgeFailedListener
      * @param listener
      */
-    public void setNetworkBridgeFailedListener(NetworkBridgeFailedListener listener);
+    public void setNetworkBridgeListener(NetworkBridgeListener listener);
+    
+    
+    public String getRemoteAddress();
+
+    public String getRemoteBrokerName();
+
+    public String getLocalAddress();
+
+    public String getLocalBrokerName();
+
+    public long getEnqueueCounter();
+
+    public long getDequeueCounter();
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java?view=diff&rev=551443&r1=551442&r2=551443
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java Wed Jun 27 21:34:57 2007
@@ -21,45 +21,50 @@
  * 
  * @version $Revision: 1.1 $
  */
-public class NetworkBridgeFactory{
+public class NetworkBridgeFactory {
 
-    /**
-     * Create a network bridge
-     * 
-     * @param config
-     * @param localTransport
-     * @param remoteTransport
-     * @return the NetworkBridge
-     */
-    public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration config,Transport localTransport,
-            Transport remoteTransport){
-        return createBridge(config,localTransport,remoteTransport,null);
-    }
+	/**
+	 * Create a network bridge
+	 * 
+	 * @param config
+	 * @param localTransport
+	 * @param remoteTransport
+	 * @return the NetworkBridge
+	 */
+	public static DemandForwardingBridge createBridge(
+			NetworkBridgeConfiguration config, Transport localTransport,
+			Transport remoteTransport) {
+		return createBridge(config, localTransport, remoteTransport, null);
+	}
 
-    /**
-     * create a network bridge
-     * 
-     * @param configuration
-     * @param localTransport
-     * @param remoteTransport
-     * @param listener
-     * @return the NetworkBridge
-     */
-    public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration configuration,Transport localTransport,
-            Transport remoteTransport,NetworkBridgeFailedListener listener){
-        DemandForwardingBridge result=null;
-        if(configuration.isConduitSubscriptions()){
-            if(configuration.isDynamicOnly()){
-                result=new ConduitBridge(configuration,localTransport,remoteTransport);
-            }else{
-                result=new DurableConduitBridge(configuration,localTransport,remoteTransport);
-            }
-        }else{
-            result=new DemandForwardingBridge(configuration,localTransport,remoteTransport);
-        }
-        if(listener!=null){
-            result.setNetworkBridgeFailedListener(listener);
-        }
-        return result;
-    }
+	/**
+	 * create a network bridge
+	 * 
+	 * @param configuration
+	 * @param localTransport
+	 * @param remoteTransport
+	 * @param listener
+	 * @return the NetworkBridge
+	 */
+	public static DemandForwardingBridge createBridge(
+			NetworkBridgeConfiguration configuration, Transport localTransport,
+			Transport remoteTransport, final NetworkBridgeListener listener) {
+		DemandForwardingBridge result = null;
+		if (configuration.isConduitSubscriptions()) {
+			if (configuration.isDynamicOnly()) {
+				result = new ConduitBridge(configuration, localTransport,
+						remoteTransport);
+			} else {
+				result = new DurableConduitBridge(configuration,
+						localTransport, remoteTransport);
+			}
+		} else {
+			result = new DemandForwardingBridge(configuration, localTransport,
+					remoteTransport);
+		}
+		if (listener != null) {
+			result.setNetworkBridgeListener(listener);
+		}
+		return result;
+	}
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java?view=auto&rev=551443
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java Wed Jun 27 21:34:57 2007
@@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+
+
+/**
+ *called when a bridge fails
+ * 
+ * @version $Revision: 1.1 $
+ */
+public interface NetworkBridgeListener{
+    
+    /**
+     * called when the transport fails
+     *
+     */
+    public void bridgeFailed();
+
+    /**
+     * called after the bridge is started.
+     *
+     */
+	public void onStart(NetworkBridge bridge);
+	
+    /**
+     * called before the bridge is stopped.
+     *
+     */
+	public void onStop(NetworkBridge bridge);
+
+}

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java?view=diff&rev=551443&r1=551442&r2=551443
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java Wed Jun 27 21:34:57 2007
@@ -16,13 +16,23 @@
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Hashtable;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
+
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
 import org.apache.activemq.Service;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.NetworkBridgeView;
+import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.util.JMXSupport;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.util.ServiceSupport;
 import org.apache.commons.logging.Log;
@@ -40,6 +50,9 @@
     private List dynamicallyIncludedDestinations=new CopyOnWriteArrayList();
     private List staticallyIncludedDestinations=new CopyOnWriteArrayList();
     protected ConnectionFilter connectionFilter;
+    private BrokerService brokerService;
+    private ObjectName objectName;
+    
     protected ServiceSupport serviceSupport=new ServiceSupport(){
 
         protected void doStart() throws Exception{
@@ -188,4 +201,76 @@
     protected void handleStop(ServiceStopper stopper) throws Exception{
         log.info("Network Connector "+getName()+" Stopped");
     }
+    
+    public ObjectName getObjectName() {
+		return objectName;
+	}
+
+	public void setObjectName(ObjectName objectName) {
+		this.objectName = objectName;
+	}
+
+	public BrokerService getBrokerService() {
+		return brokerService;
+	}
+
+	public void setBrokerService(BrokerService brokerService) {
+		this.brokerService = brokerService;
+	}
+
+	protected void registerNetworkBridgeMBean(NetworkBridge bridge) {
+		if (!getBrokerService().isUseJmx())
+			return;
+
+		MBeanServer mbeanServer = getBrokerService().getManagementContext()
+				.getMBeanServer();
+		if (mbeanServer != null) {
+			NetworkBridgeViewMBean view = new NetworkBridgeView(bridge);
+			try {
+				ObjectName objectName = createNetworkBridgeObjectName(bridge);
+				mbeanServer.registerMBean(view, objectName);
+			} catch (Throwable e) {
+				log.debug("Network bridge could not be registered in JMX: "
+						+ e.getMessage(), e);
+			}
+		}
+	}
+
+	protected void unregisterNetworkBridgeMBean(NetworkBridge bridge) {
+		if (!getBrokerService().isUseJmx())
+			return;
+
+		MBeanServer mbeanServer = getBrokerService().getManagementContext()
+				.getMBeanServer();
+		if (mbeanServer != null) {
+			try {
+				ObjectName objectName = createNetworkBridgeObjectName(bridge);
+				mbeanServer.unregisterMBean(objectName);
+			} catch (Throwable e) {
+				log.debug("Network bridge could not be unregistered in JMX: "
+						+ e.getMessage(), e);
+			}
+		}
+	}
+
+	protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge)
+			throws MalformedObjectNameException {
+		ObjectName connectorName = getObjectName();
+		Hashtable map = connectorName.getKeyPropertyList();
+		return new ObjectName(connectorName.getDomain()
+				+ ":"
+				+ "BrokerName="
+				+ JMXSupport.encodeObjectNamePart((String) map
+						.get("BrokerName"))
+				+ ","
+				+ "Type=NetworkBridge,"
+				+ "NetworkConnectorName="
+				+ JMXSupport.encodeObjectNamePart((String) map
+						.get("NetworkConnectorName"))
+				+ ","
+				+ "Name="
+				+ JMXSupport.encodeObjectNamePart(JMXSupport
+						.encodeObjectNamePart(bridge.getRemoteAddress())));
+	}
+
 }