You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/06/28 06:34:59 UTC
svn commit: r551443 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/
broker/jmx/ network/
Author: chirino
Date: Wed Jun 27 21:34:57 2007
New Revision: 551443
URL: http://svn.apache.org/viewvc?view=rev&rev=551443
Log:
Display the establised Neteowork Connector Bridges via JMX - https://issues.apache.org/activemq/browse/AMQ-1299
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeViewMBean.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java
Removed:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFailedListener.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?view=diff&rev=551443&r1=551442&r2=551443
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Wed Jun 27 21:34:57 2007
@@ -278,6 +278,7 @@
* network
*/
public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
+ connector.setBrokerService(this);
URI uri = getVmConnectorURI();
HashMap map = new HashMap(URISupport.parseParamters(uri));
map.put("network", "true");
@@ -1217,6 +1218,7 @@
NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
try {
ObjectName objectName = createNetworkConnectorObjectName(connector);
+ connector.setObjectName(objectName);
mbeanServer.registerMBean(view, objectName);
registeredMBeanNames.add(objectName);
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java?view=auto&rev=551443
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java Wed Jun 27 21:34:57 2007
@@ -0,0 +1,61 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import org.apache.activemq.network.NetworkBridge;
+
+public class NetworkBridgeView implements NetworkBridgeViewMBean {
+
+ private final NetworkBridge bridge;
+
+ public NetworkBridgeView(NetworkBridge bridge) {
+ this.bridge = bridge;
+ }
+
+ public void start() throws Exception {
+ bridge.start();
+ }
+
+ public void stop() throws Exception {
+ bridge.stop();
+ }
+
+ public String getLocalAddress() {
+ return bridge.getLocalAddress();
+ }
+
+ public String getRemoteAddress() {
+ return bridge.getRemoteAddress();
+ }
+
+ public String getRemoteBrokerName() {
+ return bridge.getRemoteBrokerName();
+ }
+
+ public String getLocalBrokerName() {
+ return bridge.getLocalBrokerName();
+ }
+
+ public long getEnqueueCounter() {
+ return bridge.getEnqueueCounter();
+ }
+
+ public long getDequeueCounter() {
+ return bridge.getDequeueCounter();
+ }
+
+}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeViewMBean.java?view=auto&rev=551443
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeViewMBean.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeViewMBean.java Wed Jun 27 21:34:57 2007
@@ -0,0 +1,30 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import org.apache.activemq.Service;
+
+public interface NetworkBridgeViewMBean extends Service {
+
+ public String getLocalAddress();
+ public String getRemoteAddress();
+ public String getRemoteBrokerName();
+ public String getLocalBrokerName();
+ public long getEnqueueCounter();
+ public long getDequeueCounter();
+
+}
\ No newline at end of file
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java?view=diff&rev=551443&r1=551442&r2=551443
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java Wed Jun 27 21:34:57 2007
@@ -32,6 +32,86 @@
}
public void stop() throws Exception {
- connector.stop();
+ connector.stop();
}
+
+ public String getName() {
+ return connector.getName();
+ }
+
+ public int getNetworkTTL() {
+ return connector.getNetworkTTL();
+ }
+
+ public int getPrefetchSize() {
+ return connector.getPrefetchSize();
+ }
+
+ public String getUserName() {
+ return connector.getUserName();
+ }
+
+ public boolean isBridgeTempDestinations() {
+ return connector.isBridgeTempDestinations();
+ }
+
+ public boolean isConduitSubscriptions() {
+ return connector.isConduitSubscriptions();
+ }
+
+ public boolean isDecreaseNetworkConsumerPriority() {
+ return connector.isDecreaseNetworkConsumerPriority();
+ }
+
+ public boolean isDispatchAsync() {
+ return connector.isDispatchAsync();
+ }
+
+ public boolean isDynamicOnly() {
+ return connector.isDynamicOnly();
+ }
+
+ public void setBridgeTempDestinations(boolean bridgeTempDestinations) {
+ connector.setBridgeTempDestinations(bridgeTempDestinations);
+ }
+
+ public void setConduitSubscriptions(boolean conduitSubscriptions) {
+ connector.setConduitSubscriptions(conduitSubscriptions);
+ }
+
+ public void setDispatchAsync(boolean dispatchAsync) {
+ connector.setDispatchAsync(dispatchAsync);
+ }
+
+ public void setDynamicOnly(boolean dynamicOnly) {
+ connector.setDynamicOnly(dynamicOnly);
+ }
+
+ public void setNetworkTTL(int networkTTL) {
+ connector.setNetworkTTL(networkTTL);
+ }
+
+ public void setPassword(String password) {
+ connector.setPassword(password);
+ }
+
+ public void setPrefetchSize(int prefetchSize) {
+ connector.setPrefetchSize(prefetchSize);
+ }
+
+ public void setUserName(String userName) {
+ connector.setUserName(userName);
+ }
+
+ public String getPassword() {
+ String pw = connector.getPassword();
+ // Hide the password for security reasons.
+ if( pw!= null )
+ pw = pw.replaceAll(".", "*");
+ return pw;
+ }
+
+ public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority) {
+ connector.setDecreaseNetworkConsumerPriority(decreaseNetworkConsumerPriority);
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java?view=diff&rev=551443&r1=551442&r2=551443
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java Wed Jun 27 21:34:57 2007
@@ -21,4 +21,24 @@
public interface NetworkConnectorViewMBean extends Service {
+ public String getName();
+ public int getNetworkTTL();
+ public int getPrefetchSize();
+ public String getUserName();
+ public boolean isBridgeTempDestinations();
+ public boolean isConduitSubscriptions();
+ public boolean isDecreaseNetworkConsumerPriority();
+ public boolean isDispatchAsync();
+ public boolean isDynamicOnly();
+ public void setBridgeTempDestinations(boolean bridgeTempDestinations);
+ public void setConduitSubscriptions(boolean conduitSubscriptions);
+ public void setDispatchAsync(boolean dispatchAsync);
+ public void setDynamicOnly(boolean dynamicOnly);
+ public void setNetworkTTL(int networkTTL);
+ public void setPassword(String password);
+ public void setPrefetchSize(int prefetchSize);
+ public void setUserName(String userName);
+ public String getPassword();
+ public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority);
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?view=diff&rev=551443&r1=551442&r2=551443
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Wed Jun 27 21:34:57 2007
@@ -18,6 +18,12 @@
package org.apache.activemq.network;
import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.command.ActiveMQDestination;
@@ -61,12 +67,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import java.security.GeneralSecurityException;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-
/**
* A useful base class for implementing demand forwarding bridges.
*
@@ -102,9 +102,14 @@
protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
protected NetworkBridgeConfiguration configuration;
- private NetworkBridgeFailedListener bridgeFailedListener;
+ private NetworkBridgeListener networkBridgeListener;
private boolean createdByDuplex;
+ private BrokerInfo localBrokerInfo;
+ private BrokerInfo remoteBrokerInfo;
+
+ final AtomicLong enqueueCounter = new AtomicLong();
+ final AtomicLong dequeueCounter = new AtomicLong();
public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
this.configuration=configuration;
@@ -194,6 +199,12 @@
}catch(IOException e){
log.warn("Caught exception from remote start",e);
}
+
+ NetworkBridgeListener l = this.networkBridgeListener;
+ if (l!=null) {
+ l.onStart(this);
+ }
+
}
protected void triggerLocalStartBridge() throws IOException {
@@ -308,6 +319,11 @@
log.debug(" stopping "+configuration.getBrokerName()+" bridge to "+remoteBrokerName+" is disposed already ? "+disposed);
boolean wasDisposedAlready=disposed;
if(!disposed){
+ NetworkBridgeListener l = this.networkBridgeListener;
+ if (l!=null) {
+ l.onStop(this);
+ }
+
try{
disposed=true;
remoteBridgeStarted.set(false);
@@ -364,6 +380,8 @@
}else if(command.isBrokerInfo()){
lastConnectSucceeded.set(true);
+ remoteBrokerInfo = ((BrokerInfo)command);
+
serviceRemoteBrokerInfo(command);
// Let the local broker know the remote broker's ID.
localBroker.oneway(command);
@@ -507,6 +525,7 @@
final boolean trace=log.isTraceEnabled();
try{
if(command.isMessageDispatch()){
+ enqueueCounter.incrementAndGet();
waitStarted();
final MessageDispatch md=(MessageDispatch) command;
DemandSubscription sub=(DemandSubscription) subscriptionMapByLocalId.get(md.getConsumerId());
@@ -523,6 +542,7 @@
// by bridging it using an async send (small chance of message loss).
remoteBroker.oneway(message);
localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
+ dequeueCounter.incrementAndGet();
} else {
@@ -537,6 +557,7 @@
serviceLocalException(er.getException());
} else {
localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
+ dequeueCounter.incrementAndGet();
}
} catch (IOException e) {
serviceLocalException(e);
@@ -551,6 +572,7 @@
if (trace)log.trace("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage());
}
}else if(command.isBrokerInfo()){
+ localBrokerInfo = ((BrokerInfo)command);
serviceLocalBrokerInfo(command);
}else if(command.isShutdownInfo()){
log.info(configuration.getBrokerName()+" Shutting down");
@@ -812,14 +834,39 @@
protected abstract BrokerId[] getRemoteBrokerPath();
- public void setNetworkBridgeFailedListener(NetworkBridgeFailedListener listener){
- this.bridgeFailedListener=listener;
+ public void setNetworkBridgeListener(NetworkBridgeListener listener){
+ this.networkBridgeListener=listener;
}
private void fireBridgeFailed() {
- NetworkBridgeFailedListener l = this.bridgeFailedListener;
+ NetworkBridgeListener l = this.networkBridgeListener;
if (l!=null) {
l.bridgeFailed();
}
}
+
+ public String getRemoteAddress() {
+ return remoteBroker.getRemoteAddress();
+ }
+
+ public String getLocalAddress() {
+ return localBroker.getRemoteAddress();
+ }
+
+ public String getRemoteBrokerName() {
+ return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
+ }
+
+ public String getLocalBrokerName() {
+ return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
+ }
+
+ public long getDequeueCounter() {
+ return dequeueCounter.get();
+ }
+
+ public long getEnqueueCounter() {
+ return enqueueCounter.get();
+ }
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?view=diff&rev=551443&r1=551442&r2=551443
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java Wed Jun 27 21:34:57 2007
@@ -124,7 +124,7 @@
}
}
}
-
+
public void onServiceRemove(DiscoveryEvent event) {
String url = event.getServiceName();
if (url != null) {
@@ -186,7 +186,7 @@
}
protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) {
- NetworkBridgeFailedListener listener = new NetworkBridgeFailedListener() {
+ NetworkBridgeListener listener = new NetworkBridgeListener() {
public void bridgeFailed(){
if( !serviceSupport.isStopped() ) {
@@ -197,6 +197,15 @@
}
}
+
+ public void onStart(NetworkBridge bridge) {
+ registerNetworkBridgeMBean(bridge);
+ }
+
+ public void onStop(NetworkBridge bridge) {
+ unregisterNetworkBridgeMBean(bridge);
+ }
+
};
DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this,localTransport,remoteTransport,listener);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java?view=diff&rev=551443&r1=551442&r2=551443
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java Wed Jun 27 21:34:57 2007
@@ -18,6 +18,7 @@
package org.apache.activemq.network;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.Service;
import org.apache.activemq.command.ActiveMQQueue;
@@ -46,6 +47,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
/**
* Forwards all messages from the local broker to the remote broker.
*
@@ -74,7 +76,13 @@
BrokerId localBrokerId;
BrokerId remoteBrokerId;
- private NetworkBridgeFailedListener bridgeFailedListener;
+ private NetworkBridgeListener bridgeFailedListener;
+
+ BrokerInfo localBrokerInfo;
+ BrokerInfo remoteBrokerInfo;
+
+ final AtomicLong enqueueCounter = new AtomicLong();
+ final AtomicLong dequeueCounter = new AtomicLong();
public ForwardingBridge(Transport localBroker, Transport remoteBroker) {
this.localBroker = localBroker;
@@ -187,7 +195,8 @@
try {
if(command.isBrokerInfo() ) {
synchronized( this ) {
- remoteBrokerId = ((BrokerInfo)command).getBrokerId();
+ remoteBrokerInfo = ((BrokerInfo)command);
+ remoteBrokerId = remoteBrokerInfo.getBrokerId();
if( localBrokerId !=null) {
if( localBrokerId.equals(remoteBrokerId) ) {
log.info("Disconnecting loop back connection.");
@@ -213,6 +222,9 @@
protected void serviceLocalCommand(Command command) {
try {
if( command.isMessageDispatch() ) {
+
+ enqueueCounter.incrementAndGet();
+
final MessageDispatch md = (MessageDispatch) command;
Message message = md.getMessage();
message.setProducerId(producerInfo.getProducerId());
@@ -223,10 +235,10 @@
if( !message.isResponseRequired() ) {
-
// If the message was originally sent using async send, we will preserve that QOS
// by bridging it using an async send (small chance of message loss).
remoteBroker.oneway(message);
+ dequeueCounter.incrementAndGet();
localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
} else {
@@ -241,6 +253,7 @@
ExceptionResponse er=(ExceptionResponse) response;
serviceLocalException(er.getException());
} else {
+ dequeueCounter.incrementAndGet();
localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
}
} catch (IOException e) {
@@ -273,7 +286,8 @@
// }
} else if(command.isBrokerInfo() ) {
synchronized( this ) {
- localBrokerId = ((BrokerInfo)command).getBrokerId();
+ localBrokerInfo = ((BrokerInfo)command);
+ localBrokerId = localBrokerInfo.getBrokerId();
if( remoteBrokerId !=null) {
if( remoteBrokerId.equals(localBrokerId) ) {
log.info("Disconnecting loop back connection.");
@@ -320,14 +334,39 @@
}
- public void setNetworkBridgeFailedListener(NetworkBridgeFailedListener listener){
+ public void setNetworkBridgeFailedListener(NetworkBridgeListener listener){
this.bridgeFailedListener=listener;
}
private void fireBridgeFailed() {
- NetworkBridgeFailedListener l = this.bridgeFailedListener;
+ NetworkBridgeListener l = this.bridgeFailedListener;
if (l!=null) {
l.bridgeFailed();
}
}
+
+ public String getRemoteAddress() {
+ return remoteBroker.getRemoteAddress();
+ }
+
+ public String getLocalAddress() {
+ return localBroker.getRemoteAddress();
+ }
+
+ public String getLocalBrokerName() {
+ return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
+ }
+
+ public String getRemoteBrokerName() {
+ return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
+ }
+
+ public long getDequeueCounter() {
+ return dequeueCounter.get();
+ }
+
+ public long getEnqueueCounter() {
+ return enqueueCounter.get();
+ }
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java?view=diff&rev=551443&r1=551442&r2=551443
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java Wed Jun 27 21:34:57 2007
@@ -43,5 +43,18 @@
* Set the NetworkBridgeFailedListener
* @param listener
*/
- public void setNetworkBridgeFailedListener(NetworkBridgeFailedListener listener);
+ public void setNetworkBridgeListener(NetworkBridgeListener listener);
+
+
+ public String getRemoteAddress();
+
+ public String getRemoteBrokerName();
+
+ public String getLocalAddress();
+
+ public String getLocalBrokerName();
+
+ public long getEnqueueCounter();
+
+ public long getDequeueCounter();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java?view=diff&rev=551443&r1=551442&r2=551443
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java Wed Jun 27 21:34:57 2007
@@ -21,45 +21,50 @@
*
* @version $Revision: 1.1 $
*/
-public class NetworkBridgeFactory{
+public class NetworkBridgeFactory {
- /**
- * Create a network bridge
- *
- * @param config
- * @param localTransport
- * @param remoteTransport
- * @return the NetworkBridge
- */
- public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration config,Transport localTransport,
- Transport remoteTransport){
- return createBridge(config,localTransport,remoteTransport,null);
- }
+ /**
+ * Create a network bridge
+ *
+ * @param config
+ * @param localTransport
+ * @param remoteTransport
+ * @return the NetworkBridge
+ */
+ public static DemandForwardingBridge createBridge(
+ NetworkBridgeConfiguration config, Transport localTransport,
+ Transport remoteTransport) {
+ return createBridge(config, localTransport, remoteTransport, null);
+ }
- /**
- * create a network bridge
- *
- * @param configuration
- * @param localTransport
- * @param remoteTransport
- * @param listener
- * @return the NetworkBridge
- */
- public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration configuration,Transport localTransport,
- Transport remoteTransport,NetworkBridgeFailedListener listener){
- DemandForwardingBridge result=null;
- if(configuration.isConduitSubscriptions()){
- if(configuration.isDynamicOnly()){
- result=new ConduitBridge(configuration,localTransport,remoteTransport);
- }else{
- result=new DurableConduitBridge(configuration,localTransport,remoteTransport);
- }
- }else{
- result=new DemandForwardingBridge(configuration,localTransport,remoteTransport);
- }
- if(listener!=null){
- result.setNetworkBridgeFailedListener(listener);
- }
- return result;
- }
+ /**
+ * create a network bridge
+ *
+ * @param configuration
+ * @param localTransport
+ * @param remoteTransport
+ * @param listener
+ * @return the NetworkBridge
+ */
+ public static DemandForwardingBridge createBridge(
+ NetworkBridgeConfiguration configuration, Transport localTransport,
+ Transport remoteTransport, final NetworkBridgeListener listener) {
+ DemandForwardingBridge result = null;
+ if (configuration.isConduitSubscriptions()) {
+ if (configuration.isDynamicOnly()) {
+ result = new ConduitBridge(configuration, localTransport,
+ remoteTransport);
+ } else {
+ result = new DurableConduitBridge(configuration,
+ localTransport, remoteTransport);
+ }
+ } else {
+ result = new DemandForwardingBridge(configuration, localTransport,
+ remoteTransport);
+ }
+ if (listener != null) {
+ result.setNetworkBridgeListener(listener);
+ }
+ return result;
+ }
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java?view=auto&rev=551443
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java Wed Jun 27 21:34:57 2007
@@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+
+
+/**
+ *called when a bridge fails
+ *
+ * @version $Revision: 1.1 $
+ */
+public interface NetworkBridgeListener{
+
+ /**
+ * called when the transport fails
+ *
+ */
+ public void bridgeFailed();
+
+ /**
+ * called after the bridge is started.
+ *
+ */
+ public void onStart(NetworkBridge bridge);
+
+ /**
+ * called before the bridge is stopped.
+ *
+ */
+ public void onStop(NetworkBridge bridge);
+
+}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java?view=diff&rev=551443&r1=551442&r2=551443
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java Wed Jun 27 21:34:57 2007
@@ -16,13 +16,23 @@
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Hashtable;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
+
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
import org.apache.activemq.Service;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.NetworkBridgeView;
+import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
@@ -40,6 +50,9 @@
private List dynamicallyIncludedDestinations=new CopyOnWriteArrayList();
private List staticallyIncludedDestinations=new CopyOnWriteArrayList();
protected ConnectionFilter connectionFilter;
+ private BrokerService brokerService;
+ private ObjectName objectName;
+
protected ServiceSupport serviceSupport=new ServiceSupport(){
protected void doStart() throws Exception{
@@ -188,4 +201,76 @@
protected void handleStop(ServiceStopper stopper) throws Exception{
log.info("Network Connector "+getName()+" Stopped");
}
+
+ public ObjectName getObjectName() {
+ return objectName;
+ }
+
+ public void setObjectName(ObjectName objectName) {
+ this.objectName = objectName;
+ }
+
+ public BrokerService getBrokerService() {
+ return brokerService;
+ }
+
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerService = brokerService;
+ }
+
+ protected void registerNetworkBridgeMBean(NetworkBridge bridge) {
+ if (!getBrokerService().isUseJmx())
+ return;
+
+ MBeanServer mbeanServer = getBrokerService().getManagementContext()
+ .getMBeanServer();
+ if (mbeanServer != null) {
+ NetworkBridgeViewMBean view = new NetworkBridgeView(bridge);
+ try {
+ ObjectName objectName = createNetworkBridgeObjectName(bridge);
+ mbeanServer.registerMBean(view, objectName);
+ } catch (Throwable e) {
+ log.debug("Network bridge could not be registered in JMX: "
+ + e.getMessage(), e);
+ }
+ }
+ }
+
+ protected void unregisterNetworkBridgeMBean(NetworkBridge bridge) {
+ if (!getBrokerService().isUseJmx())
+ return;
+
+ MBeanServer mbeanServer = getBrokerService().getManagementContext()
+ .getMBeanServer();
+ if (mbeanServer != null) {
+ try {
+ ObjectName objectName = createNetworkBridgeObjectName(bridge);
+ mbeanServer.unregisterMBean(objectName);
+ } catch (Throwable e) {
+ log.debug("Network bridge could not be unregistered in JMX: "
+ + e.getMessage(), e);
+ }
+ }
+ }
+
+ protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge)
+ throws MalformedObjectNameException {
+ ObjectName connectorName = getObjectName();
+ Hashtable map = connectorName.getKeyPropertyList();
+ return new ObjectName(connectorName.getDomain()
+ + ":"
+ + "BrokerName="
+ + JMXSupport.encodeObjectNamePart((String) map
+ .get("BrokerName"))
+ + ","
+ + "Type=NetworkBridge,"
+ + "NetworkConnectorName="
+ + JMXSupport.encodeObjectNamePart((String) map
+ .get("NetworkConnectorName"))
+ + ","
+ + "Name="
+ + JMXSupport.encodeObjectNamePart(JMXSupport
+ .encodeObjectNamePart(bridge.getRemoteAddress())));
+ }
+
}