You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2009/07/31 21:19:07 UTC
svn commit: r799706 [1/2] - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/jmx/
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/broker/view/ main/java/org/apac...
Author: rajdavies
Date: Fri Jul 31 19:19:07 2009
New Revision: 799706
URL: http://svn.apache.org/viewvc?rev=799706&view=rev
Log:
Refactored ManagementContext to improve encapsulation - so all registrations of MBeans happen in one place
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLoggerFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLoggerView.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/xbean/ManagementContextXBeanConfigTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=799706&r1=799705&r2=799706&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Fri Jul 31 19:19:07 2009
@@ -31,11 +31,9 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
-
import org.apache.activemq.ActiveMQConnectionMetaData;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
@@ -90,7 +88,6 @@
import org.apache.activemq.util.URISupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
/**
* Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a
* number of transport connectors, network connectors and a bunch of properties
@@ -99,14 +96,12 @@
* @version $Revision: 1.1 $
*/
public class BrokerService implements Service {
- protected CountDownLatch slaveStartSignal = new CountDownLatch(1);
+ protected CountDownLatch slaveStartSignal = new CountDownLatch(1);
public static final String DEFAULT_PORT = "61616";
public static final String LOCAL_HOST_NAME;
public static final String DEFAULT_BROKER_NAME = "localhost";
-
private static final Log LOG = LogFactory.getLog(BrokerService.class);
private static final long serialVersionUID = 7353129142305630237L;
-
private boolean useJmx = true;
private boolean enableStatistics = true;
private boolean persistent = true;
@@ -135,7 +130,6 @@
private List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
private List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
private List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
- private List<ObjectName> registeredMBeanNames = new CopyOnWriteArrayList<ObjectName>();
private List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
private List<Service> services = new ArrayList<Service>();
private MasterConnector masterConnector;
@@ -156,7 +150,7 @@
private boolean keepDurableSubsActive = true;
private boolean useVirtualTopics = true;
private boolean useMirroredQueues = false;
- private boolean useTempMirroredQueues=true;
+ private boolean useTempMirroredQueues = true;
private BrokerId brokerId;
private DestinationInterceptor[] destinationInterceptors;
private ActiveMQDestination[] destinations;
@@ -170,18 +164,16 @@
private int producerSystemUsagePortion = 60;
private int consumerSystemUsagePortion = 40;
private boolean splitSystemUsageForProducersConsumers;
- private boolean monitorConnectionSplits=false;
+ private boolean monitorConnectionSplits = false;
private int taskRunnerPriority = Thread.NORM_PRIORITY;
private boolean dedicatedTaskRunner;
- private boolean cacheTempDestinations=false;//useful for failover
+ private boolean cacheTempDestinations = false;// useful for failover
private int timeBeforePurgeTempDestinations = 5000;
- private List<Runnable> shutdownHooks= new ArrayList<Runnable>();
+ private List<Runnable> shutdownHooks = new ArrayList<Runnable>();
private boolean systemExitOnShutdown;
private int systemExitOnShutdownExitCode;
private SslContext sslContext;
-
private boolean forceStart = false;
-
static {
String localHostName = "localhost";
try {
@@ -234,9 +226,7 @@
* @throws Exception
*/
public TransportConnector addConnector(TransportConnector connector) throws Exception {
-
transportConnectors.add(connector);
-
return connector;
}
@@ -253,7 +243,6 @@
unregisterConnectorMBean(connector);
}
return rc;
-
}
/**
@@ -284,7 +273,8 @@
*/
public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
if (!isAdvisorySupport()) {
- throw new javax.jms.IllegalStateException("Networks require advisory messages to function - advisories are currently disabled");
+ throw new javax.jms.IllegalStateException(
+ "Networks require advisory messages to function - advisories are currently disabled");
}
NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
return addNetworkConnector(connector);
@@ -314,7 +304,6 @@
map.put("network", "true");
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
connector.setLocalUri(uri);
-
// Set a connection filter so that the connector does not establish loop
// back connections.
connector.setConnectionFilter(new ConnectionFilter() {
@@ -332,7 +321,6 @@
return true;
}
});
-
networkConnectors.add(connector);
if (isUseJmx()) {
registerNetworkConnectorMBean(connector);
@@ -386,7 +374,8 @@
}
/**
- * @param masterConnectorURI The masterConnectorURI to set.
+ * @param masterConnectorURI
+ * The masterConnectorURI to set.
*/
public void setMasterConnectorURI(String masterConnectorURI) {
this.masterConnectorURI = masterConnectorURI;
@@ -423,10 +412,10 @@
}
public void start(boolean force) throws Exception {
- forceStart = force;
- start();
+ forceStart = force;
+ start();
}
-
+
// Service interface
// -------------------------------------------------------------------------
public void start() throws Exception {
@@ -435,25 +424,18 @@
// as its way too easy to not be completely sure if start() has been
// called or not with the gazillion of different configuration
// mechanisms
-
// throw new IllegalStateException("Allready started.");
return;
}
-
try {
-
- if( systemExitOnShutdown ) {
- addShutdownHook(new Runnable(){
+ if (systemExitOnShutdown) {
+ addShutdownHook(new Runnable() {
public void run() {
System.exit(systemExitOnShutdownExitCode);
}
});
}
-
processHelperProperties();
-
-
-
getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
getPersistenceAdapter().setBrokerName(getBrokerName());
LOG.info("Using Persistence Adapter: " + getPersistenceAdapter());
@@ -461,54 +443,42 @@
deleteAllMessages();
}
getPersistenceAdapter().start();
-
startDestinations();
-
addShutdownHook();
-
getBroker().start();
-
if (isUseJmx()) {
- getManagementContext().start();
- ManagedRegionBroker managedBroker = (ManagedRegionBroker)regionBroker;
+ getManagementContext().start();
+ ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
managedBroker.setContextBroker(broker);
adminView = new BrokerView(this, managedBroker);
- MBeanServer mbeanServer = getManagementContext().getMBeanServer();
- if (mbeanServer != null) {
- ObjectName objectName = getBrokerObjectName();
- mbeanServer.registerMBean(adminView, objectName);
- registeredMBeanNames.add(objectName);
- }
+ ObjectName objectName = getBrokerObjectName();
+ getManagementContext().registerMBean(adminView, objectName);
}
-
BrokerRegistry.getInstance().bind(getBrokerName(), this);
-
- // see if there is a MasterBroker service and if so, configure
+ // see if there is a MasterBroker service and if so, configure
// it and start it.
for (Service service : services) {
if (service instanceof MasterConnector) {
configureService(service);
service.start();
}
- }
+ }
if (!isSlave()) {
startAllConnectors();
}
-
if (isUseJmx() && masterConnector != null) {
registerFTConnectorMBean(masterConnector);
}
-
brokerId = broker.getBrokerId();
LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started");
getBroker().brokerServiceStarted();
startedLatch.countDown();
} catch (Exception e) {
LOG.error("Failed to start ActiveMQ JMS Message Broker. Reason: " + e, e);
- try{
+ try {
stop();
- }catch(Exception ex) {
- LOG.warn("Failed to stop broker after failure in start ",ex);
+ } catch (Exception ex) {
+ LOG.warn("Failed to stop broker after failure in start ", ex);
}
throw e;
}
@@ -522,7 +492,7 @@
removeShutdownHook();
ServiceStopper stopper = new ServiceStopper();
if (services != null) {
- for (Service service: services) {
+ for (Service service : services) {
stopper.stop(service);
}
}
@@ -531,7 +501,7 @@
// this has to be done after services are stopped,
// to avoid timimg issue with discovery (spinning up a new instance)
BrokerRegistry.getInstance().unbind(getBrokerName());
- VMTransportFactory.stopped(getBrokerName());
+ VMTransportFactory.stopped(getBrokerName());
if (broker != null) {
stopper.stop(broker);
}
@@ -540,18 +510,6 @@
}
stopper.stop(persistenceAdapter);
if (isUseJmx()) {
- MBeanServer mbeanServer = getManagementContext().getMBeanServer();
- if (mbeanServer != null) {
- for (Iterator<ObjectName> iter = registeredMBeanNames.iterator(); iter.hasNext();) {
- ObjectName name = iter.next();
- try {
- mbeanServer.unregisterMBean(name);
- } catch (Exception e) {
- stopper.onException(mbeanServer, e);
- }
- }
- }
- registeredMBeanNames.clear();
stopper.stop(getManagementContext());
}
// Clear SelectorParser cache to free memory
@@ -559,11 +517,11 @@
stopped.set(true);
stoppedLatch.countDown();
LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") stopped");
- synchronized(shutdownHooks) {
+ synchronized (shutdownHooks) {
for (Runnable hook : shutdownHooks) {
try {
hook.run();
- } catch ( Throwable e ) {
+ } catch (Throwable e) {
stopper.onException(hook, e);
}
}
@@ -585,7 +543,6 @@
}
}
-
/**
* A helper method to block the caller thread until the broker has been
* started
@@ -602,13 +559,13 @@
// Properties
// -------------------------------------------------------------------------
-
/**
* Returns the message broker
*/
public Broker getBroker() throws Exception {
if (broker == null) {
- LOG.info("ActiveMQ " + ActiveMQConnectionMetaData.PROVIDER_VERSION + " JMS Message Broker (" + getBrokerName() + ") is starting");
+ LOG.info("ActiveMQ " + ActiveMQConnectionMetaData.PROVIDER_VERSION + " JMS Message Broker ("
+ + getBrokerName() + ") is starting");
LOG.info("For help or more information please see: http://activemq.apache.org/");
broker = createBroker();
}
@@ -650,7 +607,6 @@
LOG.error("Broker Name: " + brokerName + " contained illegal characters - replaced with " + str);
}
this.brokerName = str.trim();
-
}
public PersistenceAdapterFactory getPersistenceFactory() {
@@ -676,7 +632,8 @@
* Sets the directory in which the data files will be stored by default for
* the JDBC and Journal persistence adaptors.
*
- * @param dataDirectory the directory to store data files
+ * @param dataDirectory
+ * the directory to store data files
*/
public void setDataDirectory(String dataDirectory) {
setDataDirectoryFile(new File(dataDirectory));
@@ -686,7 +643,8 @@
* Sets the directory in which the data files will be stored by default for
* the JDBC and Journal persistence adaptors.
*
- * @param dataDirectoryFile the directory to store data files
+ * @param dataDirectoryFile
+ * the directory to store data files
*/
public void setDataDirectoryFile(File dataDirectoryFile) {
this.dataDirectoryFile = dataDirectoryFile;
@@ -703,7 +661,8 @@
}
/**
- * @param tmpDataDirectory the tmpDataDirectory to set
+ * @param tmpDataDirectory
+ * the tmpDataDirectory to set
*/
public void setTmpDataDirectory(File tmpDataDirectory) {
this.tmpDataDirectory = tmpDataDirectory;
@@ -743,9 +702,13 @@
try {
if (systemUsage == null) {
systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore());
- systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default 64 Meg
- systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 100); // 10 Gb
- systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB
+ systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default
+ // 64
+ // Meg
+ systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 100); // 10
+ // Gb
+ systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100
+ // GB
addService(this.systemUsage);
}
return systemUsage;
@@ -765,24 +728,25 @@
/**
* @return the consumerUsageManager
- * @throws IOException
+ * @throws IOException
*/
public SystemUsage getConsumerSystemUsage() throws IOException {
if (this.consumerSystemUsaage == null) {
- if(splitSystemUsageForProducersConsumers) {
+ if (splitSystemUsageForProducersConsumers) {
this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
- float portion = consumerSystemUsagePortion/100f;
+ float portion = consumerSystemUsagePortion / 100f;
this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion);
addService(this.consumerSystemUsaage);
- }else {
- consumerSystemUsaage=getSystemUsage();
+ } else {
+ consumerSystemUsaage = getSystemUsage();
}
}
return this.consumerSystemUsaage;
}
/**
- * @param consumerSystemUsaage the storeSystemUsage to set
+ * @param consumerSystemUsaage
+ * the storeSystemUsage to set
*/
public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
if (this.consumerSystemUsaage != null) {
@@ -794,24 +758,25 @@
/**
* @return the producerUsageManager
- * @throws IOException
+ * @throws IOException
*/
public SystemUsage getProducerSystemUsage() throws IOException {
- if (producerSystemUsage == null ) {
+ if (producerSystemUsage == null) {
if (splitSystemUsageForProducersConsumers) {
producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer");
- float portion = producerSystemUsagePortion/100f;
+ float portion = producerSystemUsagePortion / 100f;
producerSystemUsage.getMemoryUsage().setUsagePortion(portion);
addService(producerSystemUsage);
- }else {
- producerSystemUsage=getSystemUsage();
+ } else {
+ producerSystemUsage = getSystemUsage();
}
}
return producerSystemUsage;
}
/**
- * @param producerUsageManager the producerUsageManager to set
+ * @param producerUsageManager
+ * the producerUsageManager to set
*/
public void setProducerSystemUsage(SystemUsage producerUsageManager) {
if (this.producerSystemUsage != null) {
@@ -832,18 +797,19 @@
/**
* Sets the persistence adaptor implementation to use for this broker
- * @throws IOException
+ *
+ * @throws IOException
*/
public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
this.persistenceAdapter = persistenceAdapter;
configureService(this.persistenceAdapter);
this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
-
}
public TaskRunnerFactory getTaskRunnerFactory() {
if (taskRunnerFactory == null) {
- taskRunnerFactory = new TaskRunnerFactory("BrokerService",getTaskRunnerPriority(),true,1000,isDedicatedTaskRunner());
+ taskRunnerFactory = new TaskRunnerFactory("BrokerService", getTaskRunnerPriority(), true, 1000,
+ isDedicatedTaskRunner());
}
return taskRunnerFactory;
}
@@ -854,7 +820,8 @@
public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
if (taskRunnerFactory == null) {
- persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority, true, 1000);
+ persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority,
+ true, 1000);
}
return persistenceTaskRunnerFactory;
}
@@ -912,8 +879,8 @@
}
public NetworkConnector getNetworkConnectorByName(String connectorName) {
- for(NetworkConnector connector : networkConnectors) {
- if(connector.getName().equals(connectorName)) {
+ for (NetworkConnector connector : networkConnectors) {
+ if (connector.getName().equals(connectorName)) {
return connector;
}
}
@@ -929,8 +896,8 @@
}
public TransportConnector getConnectorByName(String connectorName) {
- for(TransportConnector connector : transportConnectors) {
- if(connector.getName().equals(connectorName)) {
+ for (TransportConnector connector : transportConnectors) {
+ if (connector.getName().equals(connectorName)) {
return connector;
}
}
@@ -953,7 +920,8 @@
}
/**
- * @param jmsConnectors The jmsBridgeConnectors to set.
+ * @param jmsConnectors
+ * The jmsBridgeConnectors to set.
*/
public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) {
this.jmsBridgeConnectors = jmsConnectors;
@@ -970,7 +938,7 @@
public void setServices(Service[] services) {
this.services.clear();
if (services != null) {
- for (int i=0; i < services.length;i++) {
+ for (int i = 0; i < services.length; i++) {
this.services.add(services[i]);
}
}
@@ -983,7 +951,7 @@
public void addService(Service service) {
services.add(service);
}
-
+
public void removeService(Service service) {
services.remove(service);
}
@@ -1033,7 +1001,8 @@
* Sets the transport connectors which this broker will listen on for new
* clients
*
- * @org.apache.xbean.Property nestedType="org.apache.activemq.broker.TransportConnector"
+ * @org.apache.xbean.Property
+ * nestedType="org.apache.activemq.broker.TransportConnector"
*/
public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
@@ -1054,11 +1023,12 @@
* Sets the network connectors which this broker will use to connect to
* other brokers in a federated network
*
- * @org.apache.xbean.Property nestedType="org.apache.activemq.network.NetworkConnector"
+ * @org.apache.xbean.Property
+ * nestedType="org.apache.activemq.network.NetworkConnector"
*/
public void setNetworkConnectors(List networkConnectors) throws Exception {
for (Iterator iter = networkConnectors.iterator(); iter.hasNext();) {
- NetworkConnector connector = (NetworkConnector)iter.next();
+ NetworkConnector connector = (NetworkConnector) iter.next();
addNetworkConnector(connector);
}
}
@@ -1069,7 +1039,7 @@
*/
public void setProxyConnectors(List proxyConnectors) throws Exception {
for (Iterator iter = proxyConnectors.iterator(); iter.hasNext();) {
- ProxyConnector connector = (ProxyConnector)iter.next();
+ ProxyConnector connector = (ProxyConnector) iter.next();
addProxyConnector(connector);
}
}
@@ -1154,7 +1124,8 @@
}
/**
- * @param shutdownOnMasterFailure The shutdownOnMasterFailure to set.
+ * @param shutdownOnMasterFailure
+ * The shutdownOnMasterFailure to set.
*/
public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) {
this.shutdownOnMasterFailure = shutdownOnMasterFailure;
@@ -1223,11 +1194,9 @@
*/
public synchronized Store getTempDataStore() {
if (tempDataStore == null) {
-
if (!isPersistent()) {
return null;
}
-
boolean result = true;
boolean empty = true;
try {
@@ -1257,7 +1226,8 @@
}
/**
- * @param tempDataStore the tempDataStore to set
+ * @param tempDataStore
+ * the tempDataStore to set
*/
public void setTempDataStore(Store tempDataStore) {
this.tempDataStore = tempDataStore;
@@ -1279,7 +1249,8 @@
}
/**
- * @param useLocalHostBrokerName the useLocalHostBrokerName to set
+ * @param useLocalHostBrokerName
+ * the useLocalHostBrokerName to set
*/
public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) {
this.useLocalHostBrokerName = useLocalHostBrokerName;
@@ -1296,23 +1267,25 @@
}
/**
- * @param supportFailOver the supportFailOver to set
+ * @param supportFailOver
+ * the supportFailOver to set
*/
public void setSupportFailOver(boolean supportFailOver) {
this.supportFailOver = supportFailOver;
}
/**
- * Looks up and lazily creates if necessary the destination for the given JMS name
+ * Looks up and lazily creates if necessary the destination for the given
+ * JMS name
*/
public Destination getDestination(ActiveMQDestination destination) throws Exception {
return getBroker().addDestination(getAdminConnectionContext(), destination);
}
-
+
public void removeDestination(ActiveMQDestination destination) throws Exception {
- getBroker().removeDestination(getAdminConnectionContext(), destination,0);
+ getBroker().removeDestination(getAdminConnectionContext(), destination, 0);
}
-
+
public int getProducerSystemUsagePortion() {
return producerSystemUsagePortion;
}
@@ -1333,35 +1306,35 @@
return splitSystemUsageForProducersConsumers;
}
- public void setSplitSystemUsageForProducersConsumers(
- boolean splitSystemUsageForProducersConsumers) {
+ public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) {
this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers;
}
-
+
public boolean isMonitorConnectionSplits() {
- return monitorConnectionSplits;
- }
+ return monitorConnectionSplits;
+ }
+
+ public void setMonitorConnectionSplits(boolean monitorConnectionSplits) {
+ this.monitorConnectionSplits = monitorConnectionSplits;
+ }
+
+ public int getTaskRunnerPriority() {
+ return taskRunnerPriority;
+ }
+
+ public void setTaskRunnerPriority(int taskRunnerPriority) {
+ this.taskRunnerPriority = taskRunnerPriority;
+ }
+
+ public boolean isDedicatedTaskRunner() {
+ return dedicatedTaskRunner;
+ }
- public void setMonitorConnectionSplits(boolean monitorConnectionSplits) {
- this.monitorConnectionSplits = monitorConnectionSplits;
- }
- public int getTaskRunnerPriority() {
- return taskRunnerPriority;
- }
-
- public void setTaskRunnerPriority(int taskRunnerPriority) {
- this.taskRunnerPriority = taskRunnerPriority;
- }
-
- public boolean isDedicatedTaskRunner() {
- return dedicatedTaskRunner;
- }
-
- public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
- this.dedicatedTaskRunner = dedicatedTaskRunner;
- }
-
- public boolean isCacheTempDestinations() {
+ public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
+ this.dedicatedTaskRunner = dedicatedTaskRunner;
+ }
+
+ public boolean isCacheTempDestinations() {
return cacheTempDestinations;
}
@@ -1373,11 +1346,10 @@
return timeBeforePurgeTempDestinations;
}
- public void setTimeBeforePurgeTempDestinations(
- int timeBeforePurgeTempDestinations) {
+ public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) {
this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations;
}
-
+
public boolean isUseTempMirroredQueues() {
return useTempMirroredQueues;
}
@@ -1385,7 +1357,8 @@
public void setUseTempMirroredQueues(boolean useTempMirroredQueues) {
this.useTempMirroredQueues = useTempMirroredQueues;
}
- //
+
+ //
// Implementation methods
// -------------------------------------------------------------------------
/**
@@ -1408,7 +1381,6 @@
addNetworkConnector(uri);
}
}
-
if (jmsBridgeConnectors != null) {
for (int i = 0; i < jmsBridgeConnectors.length; i++) {
addJmsConnector(jmsBridgeConnectors[i]);
@@ -1422,7 +1394,8 @@
}
if (masterConnectorURI != null) {
if (masterServiceExists) {
- throw new IllegalStateException("Cannot specify masterConnectorURI when a masterConnector is already registered via the services property");
+ throw new IllegalStateException(
+ "Cannot specify masterConnectorURI when a masterConnector is already registered via the services property");
} else {
addService(new MasterConnector(masterConnectorURI));
}
@@ -1430,23 +1403,19 @@
}
protected void stopAllConnectors(ServiceStopper stopper) {
-
for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
NetworkConnector connector = iter.next();
unregisterNetworkConnectorMBean(connector);
stopper.stop(connector);
}
-
for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
ProxyConnector connector = iter.next();
stopper.stop(connector);
}
-
for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
JmsConnector connector = iter.next();
stopper.stop(connector);
}
-
for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
TransportConnector connector = iter.next();
stopper.stop(connector);
@@ -1454,141 +1423,105 @@
}
protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException {
- MBeanServer mbeanServer = getManagementContext().getMBeanServer();
- if (mbeanServer != null) {
-
- try {
- ObjectName objectName = createConnectorObjectName(connector);
- connector = connector.asManagedConnector(getManagementContext().getMBeanServer(), objectName);
- ConnectorViewMBean view = new ConnectorView(connector);
- mbeanServer.registerMBean(view, objectName);
- registeredMBeanNames.add(objectName);
- return connector;
- } catch (Throwable e) {
- throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e);
- }
+ try {
+ ObjectName objectName = createConnectorObjectName(connector);
+ connector = connector.asManagedConnector(getManagementContext(), objectName);
+ ConnectorViewMBean view = new ConnectorView(connector);
+ getManagementContext().registerMBean(view, objectName);
+ return connector;
+ } catch (Throwable e) {
+ throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e);
}
- return connector;
}
protected void unregisterConnectorMBean(TransportConnector connector) throws IOException {
if (isUseJmx()) {
- MBeanServer mbeanServer = getManagementContext().getMBeanServer();
- if (mbeanServer != null) {
- try {
- ObjectName objectName = createConnectorObjectName(connector);
-
- if (registeredMBeanNames.remove(objectName)) {
- mbeanServer.unregisterMBean(objectName);
- }
- } catch (Throwable e) {
- throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e);
- }
+ try {
+ ObjectName objectName = createConnectorObjectName(connector);
+ getManagementContext().unregisterMBean(objectName);
+ } catch (Throwable e) {
+ throw IOExceptionSupport.create(
+ "Transport Connector could not be registered in JMX: " + e.getMessage(), e);
}
}
}
-
+
protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
-// MBeanServer mbeanServer = getManagementContext().getMBeanServer();
-// if (mbeanServer != null) {
-//
-//
-// }
return adaptor;
}
protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
if (isUseJmx()) {
- MBeanServer mbeanServer = getManagementContext().getMBeanServer();
- if (mbeanServer != null) {
-
- }
}
}
private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
- return new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Connector,"
- + "ConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
+ return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
+ + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Connector," + "ConnectorName="
+ + JMXSupport.encodeObjectNamePart(connector.getName()));
}
protected void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException {
- MBeanServer mbeanServer = getManagementContext().getMBeanServer();
- if (mbeanServer != null) {
- NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
- try {
- ObjectName objectName = createNetworkConnectorObjectName(connector);
- connector.setObjectName(objectName);
- mbeanServer.registerMBean(view, objectName);
- registeredMBeanNames.add(objectName);
- } catch (Throwable e) {
- throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e);
- }
+ NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
+ try {
+ ObjectName objectName = createNetworkConnectorObjectName(connector);
+ connector.setObjectName(objectName);
+ getManagementContext().registerMBean(view, objectName);
+ } catch (Throwable e) {
+ throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e);
}
}
- protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException {
- return new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector,"
- + "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
+ protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector)
+ throws MalformedObjectNameException {
+ return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
+ + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector,"
+ + "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
}
protected void unregisterNetworkConnectorMBean(NetworkConnector connector) {
if (isUseJmx()) {
- MBeanServer mbeanServer = getManagementContext().getMBeanServer();
- if (mbeanServer != null) {
- try {
- ObjectName objectName = createNetworkConnectorObjectName(connector);
- if (registeredMBeanNames.remove(objectName)) {
- mbeanServer.unregisterMBean(objectName);
- }
- } catch (Exception e) {
- LOG.error("Network Connector could not be unregistered from JMX: " + e, e);
- }
+ try {
+ ObjectName objectName = createNetworkConnectorObjectName(connector);
+ getManagementContext().unregisterMBean(objectName);
+ } catch (Exception e) {
+ LOG.error("Network Connector could not be unregistered from JMX: " + e, e);
}
}
}
protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException {
- MBeanServer mbeanServer = getManagementContext().getMBeanServer();
- if (mbeanServer != null) {
- ProxyConnectorView view = new ProxyConnectorView(connector);
- try {
- ObjectName objectName = new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + ","
- + "Type=ProxyConnector," + "ProxyConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
- mbeanServer.registerMBean(view, objectName);
- registeredMBeanNames.add(objectName);
- } catch (Throwable e) {
- throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
- }
+ ProxyConnectorView view = new ProxyConnectorView(connector);
+ try {
+ ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
+ + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=ProxyConnector,"
+ + "ProxyConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
+ getManagementContext().registerMBean(view, objectName);
+ } catch (Throwable e) {
+ throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
}
}
protected void registerFTConnectorMBean(MasterConnector connector) throws IOException {
- MBeanServer mbeanServer = getManagementContext().getMBeanServer();
- if (mbeanServer != null) {
- FTConnectorView view = new FTConnectorView(connector);
- try {
- ObjectName objectName = new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + ","
- + "Type=MasterConnector");
- mbeanServer.registerMBean(view, objectName);
- registeredMBeanNames.add(objectName);
- } catch (Throwable e) {
- throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
- }
+ FTConnectorView view = new FTConnectorView(connector);
+ try {
+ ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
+ + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=MasterConnector");
+ getManagementContext().registerMBean(view, objectName);
+ } catch (Throwable e) {
+ throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
}
}
protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
- MBeanServer mbeanServer = getManagementContext().getMBeanServer();
- if (mbeanServer != null) {
- JmsConnectorView view = new JmsConnectorView(connector);
- try {
- ObjectName objectName = new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + ","
- + "Type=JmsConnector," + "JmsConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
- mbeanServer.registerMBean(view, objectName);
- registeredMBeanNames.add(objectName);
- } catch (Throwable e) {
- throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
- }
+ JmsConnectorView view = new JmsConnectorView(connector);
+ try {
+ ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
+ + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=JmsConnector,"
+ + "JmsConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
+ getManagementContext().registerMBean(view, objectName);
+ } catch (Throwable e) {
+ throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
}
}
@@ -1602,32 +1535,27 @@
protected Broker createBroker() throws Exception {
regionBroker = createRegionBroker();
Broker broker = addInterceptors(regionBroker);
-
// Add a filter that will stop access to the broker once stopped
broker = new MutableBrokerFilter(broker) {
- Broker old;
-
- public void stop() throws Exception {
+ Broker old;
+
+ public void stop() throws Exception {
old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
// Just ignore additional stop actions.
public void stop() throws Exception {
}
-
});
old.stop();
}
-
- public void start() throws Exception {
- if (forceStart && old != null) {
- this.next.set(old);
- }
- getNext().start();
- }
-
- };
+ public void start() throws Exception {
+ if (forceStart && old != null) {
+ this.next.set(old);
+ }
+ getNext().start();
+ }
+ };
return broker;
-
}
/**
@@ -1641,31 +1569,28 @@
destinationInterceptors = createDefaultDestinationInterceptor();
}
configureServices(destinationInterceptors);
-
DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
if (destinationFactory == null) {
destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter());
}
return createRegionBroker(destinationInterceptor);
}
-
+
protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException {
- RegionBroker regionBroker;
+ RegionBroker regionBroker;
if (isUseJmx()) {
- MBeanServer mbeanServer = getManagementContext().getMBeanServer();
- regionBroker = new ManagedRegionBroker(this, mbeanServer, getBrokerObjectName(), getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
- destinationInterceptor);
+ regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
+ getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor);
} else {
- regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor);
+ regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
+ destinationInterceptor);
}
destinationFactory.setRegionBroker(regionBroker);
-
regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
regionBroker.setBrokerName(getBrokerName());
regionBroker.getDestinationStatistics().setEnabled(enableStatistics);
-
- return regionBroker;
- }
+ return regionBroker;
+ }
/**
* Create the default destination interceptor
@@ -1676,12 +1601,12 @@
VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
VirtualTopic virtualTopic = new VirtualTopic();
virtualTopic.setName("VirtualTopic.>");
- VirtualDestination[] virtualDestinations = {virtualTopic};
+ VirtualDestination[] virtualDestinations = { virtualTopic };
interceptor.setVirtualDestinations(virtualDestinations);
answer.add(interceptor);
}
if (isUseMirroredQueues()) {
- MirroredQueue interceptor = new MirroredQueue();
+ MirroredQueue interceptor = new MirroredQueue();
answer.add(interceptor);
}
DestinationInterceptor[] array = new DestinationInterceptor[answer.size()];
@@ -1703,8 +1628,8 @@
if (isPopulateJMSXUserID()) {
broker = new UserIDBroker(broker);
}
- if (isMonitorConnectionSplits()){
- broker = new ConnectionSplitBroker(broker);
+ if (isMonitorConnectionSplits()) {
+ broker = new ConnectionSplitBroker(broker);
}
if (plugins != null) {
for (int i = 0; i < plugins.length; i++) {
@@ -1733,14 +1658,15 @@
protected ObjectName createBrokerObjectName() throws IOException {
try {
- return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Broker");
+ return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
+ + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Broker");
} catch (Throwable e) {
throw IOExceptionSupport.create("Invalid JMX broker name: " + brokerName, e);
}
}
protected TransportConnector createTransportConnector(URI brokerURI) throws Exception {
- TransportServer transport = TransportFactory.bind(this, brokerURI);
+ TransportServer transport = TransportFactory.bind(this, brokerURI);
return new TransportConnector(transport);
}
@@ -1812,7 +1738,6 @@
protected void startDestinations() throws Exception {
if (destinations != null) {
ConnectionContext adminConnectionContext = getAdminConnectionContext();
-
for (int i = 0; i < destinations.length; i++) {
ActiveMQDestination destination = destinations[i];
getBroker().addDestination(adminConnectionContext, destination);
@@ -1846,19 +1771,18 @@
return context;
}
- protected void waitForSlave(){
+ protected void waitForSlave() {
try {
- slaveStartSignal.await();
- }catch(InterruptedException e){
- LOG.error("Exception waiting for slave:"+e);
+ slaveStartSignal.await();
+ } catch (InterruptedException e) {
+ LOG.error("Exception waiting for slave:" + e);
}
}
-
- protected void slaveConnectionEstablished(){
- slaveStartSignal.countDown();
+
+ protected void slaveConnectionEstablished() {
+ slaveStartSignal.countDown();
}
-
-
+
/**
* Start all transport and network connections, proxies and bridges
*
@@ -1866,15 +1790,13 @@
*/
protected void startAllConnectors() throws Exception {
if (!isSlave()) {
- Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
+ Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
List<TransportConnector> al = new ArrayList<TransportConnector>();
-
for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
TransportConnector connector = iter.next();
connector.setBrokerService(this);
al.add(startTransportConnector(connector));
}
-
if (al.size() > 0) {
// let's clear the transportConnectors list and replace it with
// the started transportConnector instances
@@ -1886,8 +1808,8 @@
map.put("network", "true");
map.put("async", "false");
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
- if(isWaitForSlave()){
- waitForSlave();
+ if (isWaitForSlave()) {
+ waitForSlave();
}
for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
NetworkConnector connector = iter.next();
@@ -1896,17 +1818,15 @@
connector.setDurableDestinations(durableDestinations);
connector.start();
}
-
for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
ProxyConnector connector = iter.next();
connector.start();
}
-
for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
JmsConnector connector = iter.next();
connector.start();
}
- for (Service service:services) {
+ for (Service service : services) {
configureService(service);
service.start();
}
@@ -1919,15 +1839,11 @@
if (policy != null) {
connector.setMessageAuthorizationPolicy(policy);
}
-
if (isUseJmx()) {
connector = registerConnectorMBean(connector);
}
-
connector.getStatistics().setEnabled(enableStatistics);
-
connector.start();
-
return connector;
}
@@ -1964,7 +1880,6 @@
Set destinations = destinationFactory.getDestinations();
if (destinations != null) {
Iterator iter = destinations.iterator();
-
ConnectionContext adminConnectionContext = broker.getAdminConnectionContext();
if (adminConnectionContext == null) {
ConnectionContext context = new ConnectionContext();
@@ -1972,9 +1887,8 @@
adminConnectionContext = context;
broker.setAdminConnectionContext(adminConnectionContext);
}
-
while (iter.hasNext()) {
- ActiveMQDestination destination = (ActiveMQDestination)iter.next();
+ ActiveMQDestination destination = (ActiveMQDestination) iter.next();
broker.addDestination(adminConnectionContext, destination);
}
}
@@ -1988,15 +1902,14 @@
this.regionBroker = regionBroker;
}
-
public void addShutdownHook(Runnable hook) {
- synchronized(shutdownHooks) {
+ synchronized (shutdownHooks) {
shutdownHooks.add(hook);
}
}
-
+
public void removeShutdownHook(Runnable hook) {
- synchronized(shutdownHooks) {
+ synchronized (shutdownHooks) {
shutdownHooks.remove(hook);
}
}
@@ -2025,24 +1938,23 @@
this.sslContext = sslContext;
}
- public boolean isShutdownOnSlaveFailure() {
- return shutdownOnSlaveFailure;
- }
-
- public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) {
- this.shutdownOnSlaveFailure = shutdownOnSlaveFailure;
- }
-
- public boolean isWaitForSlave() {
- return waitForSlave;
- }
-
- public void setWaitForSlave(boolean waitForSlave) {
- this.waitForSlave = waitForSlave;
- }
-
- public CountDownLatch getSlaveStartSignal() {
- return slaveStartSignal;
- }
+ public boolean isShutdownOnSlaveFailure() {
+ return shutdownOnSlaveFailure;
+ }
+
+ public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) {
+ this.shutdownOnSlaveFailure = shutdownOnSlaveFailure;
+ }
+
+ public boolean isWaitForSlave() {
+ return waitForSlave;
+ }
+
+ public void setWaitForSlave(boolean waitForSlave) {
+ this.waitForSlave = waitForSlave;
+ }
+ public CountDownLatch getSlaveStartSignal() {
+ return slaveStartSignal;
+ }
}
\ No newline at end of file
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=799706&r1=799705&r2=799706&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java Fri Jul 31 19:19:07 2009
@@ -16,16 +16,8 @@
*/
package org.apache.activemq.broker;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Iterator;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
import org.apache.activemq.broker.jmx.ManagedTransportConnector;
+import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.region.ConnectorStatistics;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.security.MessageAuthorizationPolicy;
@@ -40,6 +32,12 @@
import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.concurrent.CopyOnWriteArrayList;
+import javax.management.ObjectName;
/**
* @org.apache.xbean.XBean
@@ -94,8 +92,8 @@
* Factory method to create a JMX managed version of this transport
* connector
*/
- public ManagedTransportConnector asManagedConnector(MBeanServer mbeanServer, ObjectName connectorName) throws IOException, URISyntaxException {
- ManagedTransportConnector rc = new ManagedTransportConnector(mbeanServer, connectorName, getServer());
+ public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName) throws IOException, URISyntaxException {
+ ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer());
rc.setBrokerInfo(getBrokerInfo());
rc.setConnectUri(getConnectUri());
rc.setDisableAsyncDispatch(isDisableAsyncDispatch());
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=799706&r1=799705&r2=799706&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Fri Jul 31 19:19:07 2009
@@ -16,30 +16,6 @@
*/
package org.apache.activemq.broker.jmx;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
-import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.TabularData;
-import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
-
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
@@ -71,10 +47,31 @@
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import javax.management.InstanceNotFoundException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
public class ManagedRegionBroker extends RegionBroker {
private static final Log LOG = LogFactory.getLog(ManagedRegionBroker.class);
- private final MBeanServer mbeanServer;
+ private final ManagementContext managementContext;
private final ObjectName brokerObjectName;
private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>();
private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>();
@@ -92,10 +89,10 @@
/* This is the first broker in the broker interceptor chain. */
private Broker contextBroker;
- public ManagedRegionBroker(BrokerService brokerService, MBeanServer mbeanServer, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
+ public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException {
super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor);
- this.mbeanServer = mbeanServer;
+ this.managementContext = context;
this.brokerObjectName = brokerObjectName;
}
@@ -111,7 +108,7 @@
for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) {
ObjectName name = iter.next();
try {
- mbeanServer.unregisterMBean(name);
+ managementContext.unregisterMBean(name);
} catch (InstanceNotFoundException e) {
LOG.warn("The MBean: " + name + " is no longer registered with JMX");
} catch (Exception e) {
@@ -245,7 +242,7 @@
}
}
try {
- mbeanServer.registerMBean(view, key);
+ managementContext.registerMBean(view, key);
registeredMBeans.add(key);
} catch (Throwable e) {
LOG.warn("Failed to register MBean: " + key);
@@ -260,7 +257,7 @@
temporaryTopics.remove(key);
if (registeredMBeans.remove(key)) {
try {
- mbeanServer.unregisterMBean(key);
+ managementContext.unregisterMBean(key);
} catch (Throwable e) {
LOG.warn("Failed to unregister MBean: " + key);
LOG.debug("Failure reason: " + e, e);
@@ -288,7 +285,7 @@
if (inactiveName != null) {
inactiveDurableTopicSubscribers.remove(inactiveName);
registeredMBeans.remove(inactiveName);
- mbeanServer.unregisterMBean(inactiveName);
+ managementContext.unregisterMBean(inactiveName);
}
} catch (Throwable e) {
LOG.error("Unable to unregister inactive durable subscriber: " + subscriptionKey, e);
@@ -300,7 +297,7 @@
}
try {
- mbeanServer.registerMBean(view, key);
+ managementContext.registerMBean(view, key);
registeredMBeans.add(key);
} catch (Throwable e) {
LOG.warn("Failed to register MBean: " + key);
@@ -317,7 +314,7 @@
temporaryTopicSubscribers.remove(key);
if (registeredMBeans.remove(key)) {
try {
- mbeanServer.unregisterMBean(key);
+ managementContext.unregisterMBean(key);
} catch (Throwable e) {
LOG.warn("Failed to unregister MBean: " + key);
LOG.debug("Failure reason: " + e, e);
@@ -370,7 +367,7 @@
SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info);
try {
- mbeanServer.registerMBean(view, objectName);
+ managementContext.registerMBean(view, objectName);
registeredMBeans.add(objectName);
} catch (Throwable e) {
LOG.warn("Failed to register MBean: " + key);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java?rev=799706&r1=799705&r2=799706&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java Fri Jul 31 19:19:07 2009
@@ -16,12 +16,6 @@
*/
package org.apache.activemq.broker.jmx;
-import java.io.IOException;
-import java.util.Hashtable;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.TransportConnector;
@@ -33,6 +27,9 @@
import org.apache.activemq.util.JMXSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import java.io.IOException;
+import java.util.Hashtable;
+import javax.management.ObjectName;
/**
* A managed transport connection
@@ -42,7 +39,7 @@
public class ManagedTransportConnection extends TransportConnection {
private static final Log LOG = LogFactory.getLog(ManagedTransportConnection.class);
- private final MBeanServer server;
+ private final ManagementContext managementContext;
private final ObjectName connectorName;
private ConnectionViewMBean mbean;
@@ -50,10 +47,10 @@
private ObjectName byAddressName;
public ManagedTransportConnection(TransportConnector connector, Transport transport, Broker broker,
- TaskRunnerFactory factory, MBeanServer server, ObjectName connectorName)
+ TaskRunnerFactory factory, ManagementContext context, ObjectName connectorName)
throws IOException {
super(connector, transport, broker, factory);
- this.server = server;
+ this.managementContext = context;
this.connectorName = connectorName;
this.mbean = new ConnectionView(this);
byAddressName = createByAddressObjectName("address", transport.getRemoteAddress());
@@ -99,7 +96,7 @@
protected void registerMBean(ObjectName name) {
if (name != null) {
try {
- server.registerMBean(mbean, name);
+ managementContext.registerMBean(mbean, name);
} catch (Throwable e) {
LOG.warn("Failed to register MBean: " + name);
LOG.debug("Failure reason: " + e, e);
@@ -110,7 +107,7 @@
protected void unregisterMBean(ObjectName name) {
if (name != null) {
try {
- server.unregisterMBean(name);
+ managementContext.unregisterMBean(name);
} catch (Throwable e) {
LOG.warn("Failed to unregister mbean: " + name);
LOG.debug("Failure reason: " + e, e);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java?rev=799706&r1=799705&r2=799706&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java Fri Jul 31 19:19:07 2009
@@ -16,17 +16,14 @@
*/
package org.apache.activemq.broker.jmx;
-import java.io.IOException;
-import java.net.URISyntaxException;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportServer;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
/**
* A managed transport connector which can create multiple managed connections
@@ -38,12 +35,12 @@
static long nextConnectionId = 1;
- private final MBeanServer mbeanServer;
+ private final ManagementContext managementContext;
private final ObjectName connectorName;
- public ManagedTransportConnector(MBeanServer mbeanServer, ObjectName connectorName, TransportServer server) {
+ public ManagedTransportConnector(ManagementContext context, ObjectName connectorName, TransportServer server) {
super(server);
- this.mbeanServer = mbeanServer;
+ this.managementContext = context;
this.connectorName = connectorName;
}
@@ -52,7 +49,7 @@
}
protected Connection createConnection(Transport transport) throws IOException {
- return new ManagedTransportConnection(this, transport, getBroker(), isDisableAsyncDispatch() ? null : getTaskRunnerFactory(), mbeanServer, connectorName);
+ return new ManagedTransportConnection(this, transport, getBroker(), isDisableAsyncDispatch() ? null : getTaskRunnerFactory(), managementContext, connectorName);
}
protected static synchronized long getNextConnectionId() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java?rev=799706&r1=799705&r2=799706&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java Fri Jul 31 19:19:07 2009
@@ -21,14 +21,23 @@
import java.net.MalformedURLException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
+import java.util.Iterator;
import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.Attribute;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.InstanceNotFoundException;
import javax.management.JMException;
+import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
+import javax.management.MBeanServerInvocationHandler;
import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectInstance;
import javax.management.ObjectName;
import javax.management.remote.JMXConnectorServer;
import javax.management.remote.JMXConnectorServerFactory;
@@ -37,6 +46,7 @@
import org.apache.activemq.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import sun.security.action.GetBooleanAction;
/**
* A Flow provides different dispatch policies within the NMR
@@ -64,6 +74,7 @@
private JMXConnectorServer connectorServer;
private ObjectName namingServiceObjectName;
private Registry registry;
+ private List<ObjectName> registeredMBeanNames = new CopyOnWriteArrayList<ObjectName>();
public ManagementContext() {
this(null);
@@ -101,8 +112,18 @@
}
}
- public void stop() throws IOException {
+ public void stop() throws Exception {
if (started.compareAndSet(true, false)) {
+ MBeanServer mbeanServer = getMBeanServer();
+ if (mbeanServer != null) {
+ for (Iterator<ObjectName> iter = registeredMBeanNames.iterator(); iter.hasNext();) {
+ ObjectName name = iter.next();
+
+ mbeanServer.unregisterMBean(name);
+
+ }
+ }
+ registeredMBeanNames.clear();
JMXConnectorServer server = connectorServer;
connectorServer = null;
if (server != null) {
@@ -146,7 +167,7 @@
*
* @return the MBeanServer
*/
- public MBeanServer getMBeanServer() {
+ protected MBeanServer getMBeanServer() {
if (this.beanServer == null) {
this.beanServer = findMBeanServer();
}
@@ -258,7 +279,24 @@
}
return containerName + "." + name;
}
-
+
+ public Object newProxyInstance( ObjectName objectName,
+ Class interfaceClass,
+ boolean notificationBroadcaster){
+ return MBeanServerInvocationHandler.newProxyInstance(getMBeanServer(), objectName, interfaceClass, notificationBroadcaster);
+
+ }
+
+ public Object getAttribute(ObjectName name, String attribute) throws Exception{
+ return getMBeanServer().getAttribute(name, attribute);
+ }
+
+ public ObjectInstance registerMBean(Object bean, ObjectName name) throws Exception{
+ ObjectInstance result = getMBeanServer().registerMBean(bean, name);
+ this.registeredMBeanNames.add(name);
+ return result;
+ }
+
/**
* Unregister an MBean
*
@@ -266,7 +304,7 @@
* @throws JMException
*/
public void unregisterMBean(ObjectName name) throws JMException {
- if (beanServer != null && beanServer.isRegistered(name)) {
+ if (beanServer != null && beanServer.isRegistered(name) && this.registeredMBeanNames.remove(name)) {
beanServer.unregisterMBean(name);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=799706&r1=799705&r2=799706&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Fri Jul 31 19:19:07 2009
@@ -520,9 +520,6 @@
public BrokerId getBrokerId() {
if (brokerId == null) {
- // TODO: this should persist the broker id so that subsequent
- // startup
- // uses the same broker id.
brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId());
}
return brokerId;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java?rev=799706&r1=799705&r2=799706&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java Fri Jul 31 19:19:07 2009
@@ -16,25 +16,10 @@
*/
package org.apache.activemq.broker.view;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-import javax.management.MBeanServer;
-import javax.management.MBeanServerInvocationHandler;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
-import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
@@ -43,6 +28,15 @@
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.filter.DestinationMapNode;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import javax.management.ObjectName;
/**
* @version $Revision: $
@@ -55,7 +49,6 @@
private boolean clearProducerCacheAfterRender;
private String domain = "org.apache.activemq";
private BrokerViewMBean brokerView;
- private MBeanServer mbeanServer;
// until we have some MBeans for producers, lets do it all ourselves
private Map<ProducerId, ProducerInfo> producers = new HashMap<ProducerId, ProducerInfo>();
@@ -65,10 +58,7 @@
public ConnectionDotFileInterceptor(Broker next, String file, boolean redrawOnRemove) throws IOException {
super(next, file);
this.redrawOnRemove = redrawOnRemove;
-
- mbeanServer = new ManagementContext().getMBeanServer();
- ObjectName brokerName = next.getBrokerService().getBrokerObjectName();
- brokerView = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
+
}
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
@@ -123,7 +113,7 @@
writer.println("digraph \"ActiveMQ Connections\" {");
writer.println();
- writer.println("label=\"ActiveMQ Broker: " + brokerView.getBrokerId() + "\"];");
+ writer.println("label=\"ActiveMQ Broker: " + getBrokerView().getBrokerId() + "\"];");
writer.println();
writer.println("node [style = \"rounded,filled\", fillcolor = yellow, fontname=\"Helvetica-Oblique\"];");
writer.println();
@@ -132,10 +122,10 @@
Map<String, String> queues = new HashMap<String, String>();
Map<String, String> topics = new HashMap<String, String>();
- printSubscribers(writer, clients, queues, "queue_", brokerView.getQueueSubscribers());
+ printSubscribers(writer, clients, queues, "queue_", getBrokerView().getQueueSubscribers());
writer.println();
- printSubscribers(writer, clients, topics, "topic_", brokerView.getTopicSubscribers());
+ printSubscribers(writer, clients, topics, "topic_", getBrokerView().getTopicSubscribers());
writer.println();
printProducers(writer, clients, queues, topics);
@@ -210,7 +200,7 @@
protected void printSubscribers(PrintWriter writer, Map<String, String> clients, Map<String, String> destinations, String type, ObjectName[] subscribers) {
for (int i = 0; i < subscribers.length; i++) {
ObjectName name = subscribers[i];
- SubscriptionViewMBean subscriber = (SubscriptionViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, SubscriptionViewMBean.class, true);
+ SubscriptionViewMBean subscriber = (SubscriptionViewMBean)getBrokerService().getManagementContext().newProxyInstance(name, SubscriptionViewMBean.class, true);
String clientId = subscriber.getClientId();
String safeClientId = asID(clientId);
@@ -332,4 +322,13 @@
}
return path;
}
+
+ BrokerViewMBean getBrokerView() throws Exception {
+ if (this.brokerView == null) {
+ ObjectName brokerName = getBrokerService().getBrokerObjectName();
+ this.brokerView = (BrokerViewMBean) getBrokerService().getManagementContext().newProxyInstance(brokerName,
+ BrokerViewMBean.class, true);
+ }
+ return this.brokerView;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java?rev=799706&r1=799705&r2=799706&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java Fri Jul 31 19:19:07 2009
@@ -16,22 +16,6 @@
*/
package org.apache.activemq.network;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
import org.apache.activemq.Service;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.NetworkBridgeView;
@@ -45,6 +29,17 @@
import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
/**
* @version $Revision$
@@ -238,16 +233,12 @@
if (!getBrokerService().isUseJmx()) {
return;
}
-
- MBeanServer mbeanServer = getBrokerService().getManagementContext().getMBeanServer();
- if (mbeanServer != null) {
- NetworkBridgeViewMBean view = new NetworkBridgeView(bridge);
- try {
- ObjectName objectName = createNetworkBridgeObjectName(bridge);
- mbeanServer.registerMBean(view, objectName);
- } catch (Throwable e) {
- LOG.debug("Network bridge could not be registered in JMX: " + e.getMessage(), e);
- }
+ NetworkBridgeViewMBean view = new NetworkBridgeView(bridge);
+ try {
+ ObjectName objectName = createNetworkBridgeObjectName(bridge);
+ getBrokerService().getManagementContext().registerMBean(view, objectName);
+ } catch (Throwable e) {
+ LOG.debug("Network bridge could not be registered in JMX: " + e.getMessage(), e);
}
}
@@ -255,15 +246,11 @@
if (!getBrokerService().isUseJmx()) {
return;
}
-
- MBeanServer mbeanServer = getBrokerService().getManagementContext().getMBeanServer();
- if (mbeanServer != null) {
- try {
- ObjectName objectName = createNetworkBridgeObjectName(bridge);
- mbeanServer.unregisterMBean(objectName);
- } catch (Throwable e) {
- LOG.debug("Network bridge could not be unregistered in JMX: " + e.getMessage(), e);
- }
+ try {
+ ObjectName objectName = createNetworkBridgeObjectName(bridge);
+ getBrokerService().getManagementContext().unregisterMBean(objectName);
+ } catch (Throwable e) {
+ LOG.debug("Network bridge could not be unregistered in JMX: " + e.getMessage(), e);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLoggerFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLoggerFactory.java?rev=799706&r1=799705&r2=799706&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLoggerFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLoggerFactory.java Fri Jul 31 19:19:07 2009
@@ -16,15 +16,13 @@
*/
package org.apache.activemq.transport;
-import java.io.IOException;
-
-import javax.management.ObjectName;
-
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.LogWriterFinder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import java.io.IOException;
+import javax.management.ObjectName;
/**
* Singleton class to create TransportLogger objects.
@@ -201,7 +199,7 @@
try {
this.objectName = new ObjectName(this.managementContext.getJmxDomainName()+":"+ "Type=TransportLoggerControl");
- this.managementContext.getMBeanServer().registerMBean(new TransportLoggerControl(this.managementContext),this.objectName);
+ this.managementContext.registerMBean(new TransportLoggerControl(this.managementContext),this.objectName);
this.transportLoggerControlCreated = true;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLoggerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLoggerView.java?rev=799706&r1=799705&r2=799706&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLoggerView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLoggerView.java Fri Jul 31 19:19:07 2009
@@ -119,7 +119,7 @@
*/
private void register() {
try {
- this.managementContext.getMBeanServer().registerMBean(this, this.name);
+ this.managementContext.registerMBean(this, this.name);
} catch (Exception e) {
log.error("Could not register MBean for TransportLoggerView " + id + "with name " + this.name.toString() + ", reason: " + e, e);
}
@@ -138,7 +138,7 @@
TransportLoggerView.transportLoggerViews.remove(this);
try {
- this.managementContext.getMBeanServer().unregisterMBean(this.name);
+ this.managementContext.unregisterMBean(this.name);
} catch (Exception e) {
log.error("Could not unregister MBean for TransportLoggerView " + id + "with name " + this.name.toString() + ", reason: " + e, e);
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=799706&r1=799705&r2=799706&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java Fri Jul 31 19:19:07 2009
@@ -910,7 +910,7 @@
}
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
- MBeanServer mbeanServer = broker.getManagementContext().getMBeanServer();
+
String domain = "org.apache.activemq";
ObjectName name;
if (destination.isQueue()) {
@@ -918,7 +918,7 @@
} else {
name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=test");
}
- return (DestinationViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, DestinationViewMBean.class, true);
+ return (DestinationViewMBean)broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true);
}
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java?rev=799706&r1=799705&r2=799706&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java Fri Jul 31 19:19:07 2009
@@ -74,9 +74,8 @@
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq"
+ ":Type=Queue,Destination=" + queue.getQueueName()
+ ",BrokerName=localhost");
- QueueViewMBean proxy = (QueueViewMBean) MBeanServerInvocationHandler
- .newProxyInstance(broker.getManagementContext()
- .getMBeanServer(), queueViewMBeanName,
+ QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+ .newProxyInstance(queueViewMBeanName,
QueueViewMBean.class, true);
return proxy;
}