You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/03/07 10:25:07 UTC
svn commit: r383828 - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/jmx/
main/java/org/apache/activemq/broker/region/
test/java/org/apache/activemq/broker/
Author: jstrachan
Date: Tue Mar 7 01:25:05 2006
New Revision: 383828
URL: http://svn.apache.org/viewcvs?rev=383828&view=rev
Log:
added extra check to make sure all MBeans are unregistered on a stop() call to ensure that AMQ-585 is fixed
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ReconnectWithJMXEnabledTest.java (with props)
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=383828&r1=383827&r2=383828&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Tue Mar 7 01:25:05 2006
@@ -13,25 +13,8 @@
*/
package org.apache.activemq.broker.jmx;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Map.Entry;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
-import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.TabularData;
-import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
@@ -56,11 +39,34 @@
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.JMXSupport;
+import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
public class ManagedRegionBroker extends RegionBroker{
private static final Log log=LogFactory.getLog(ManagedRegionBroker.class);
private final MBeanServer mbeanServer;
@@ -77,6 +83,7 @@
private final Map temporaryTopicSubscribers=new ConcurrentHashMap();
private final Map subscriptionKeys = new ConcurrentHashMap();
private final Map subscriptionMap = new ConcurrentHashMap();
+ private final Set registeredMBeans = new CopyOnWriteArraySet();
/* This is the first broker in the broker interceptor chain. */
private Broker contextBroker;
@@ -95,6 +102,23 @@
}
+
+ protected void doStop(ServiceStopper stopper) {
+ super.doStop(stopper);
+
+ // lets remove any mbeans not yet removed
+ for (Iterator iter = registeredMBeans.iterator(); iter.hasNext();) {
+ ObjectName name = (ObjectName) iter.next();
+ try {
+ mbeanServer.unregisterMBean(name);
+ }
+ catch (Exception e) {
+ stopper.onException(this, e);
+ }
+ }
+ registeredMBeans.clear();
+ }
+
protected Region createQueueRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory,
PersistenceAdapter adapter){
return new ManagedQueueRegion(this,destinationStatistics,memoryManager,taskRunnerFactory,adapter);
@@ -114,15 +138,8 @@
}
public void register(ActiveMQDestination destName,Destination destination){
- // Build the object name for the destination
- Hashtable map=brokerObjectName.getKeyPropertyList();
try{
- ObjectName objectName = new ObjectName(
- brokerObjectName.getDomain()+":"+
- "BrokerName="+map.get("BrokerName")+","+
- "Type="+JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString())+","+
- "Destination="+JMXSupport.encodeObjectNamePart(destName.getPhysicalName())
- );
+ ObjectName objectName = createObjectName(destName);
DestinationView view;
if(destination instanceof Queue){
view=new QueueView(this, (Queue) destination);
@@ -136,15 +153,8 @@
}
public void unregister(ActiveMQDestination destName){
- // Build the object name for the destination
- Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList());
try{
- ObjectName objectName = new ObjectName(
- brokerObjectName.getDomain()+":"+
- "BrokerName="+map.get("BrokerName")+","+
- "Type="+JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString())+","+
- "Destination="+JMXSupport.encodeObjectNamePart(destName.getPhysicalName())
- );
+ ObjectName objectName = createObjectName(destName);
unregisterDestination(objectName);
}catch(Exception e){
log.error("Failed to unregister "+destName,e);
@@ -208,6 +218,7 @@
topics.put(key,view);
}
}
+ registeredMBeans.add(key);
mbeanServer.registerMBean(view,key);
}
@@ -216,6 +227,7 @@
queues.remove(key);
temporaryQueues.remove(key);
temporaryTopics.remove(key);
+ registeredMBeans.remove(key);
mbeanServer.unregisterMBean(key);
}
@@ -238,6 +250,7 @@
ObjectName inactiveName = (ObjectName) subscriptionKeys.get(subscriptionKey);
if (inactiveName != null){
inactiveDurableTopicSubscribers.remove(inactiveName);
+ registeredMBeans.remove(inactiveName);
mbeanServer.unregisterMBean(inactiveName);
}
}catch(Exception e){
@@ -248,6 +261,7 @@
}
}
}
+ registeredMBeans.add(key);
mbeanServer.registerMBean(view,key);
}
@@ -257,6 +271,7 @@
inactiveDurableTopicSubscribers.remove(key);
temporaryQueueSubscribers.remove(key);
temporaryTopicSubscribers.remove(key);
+ registeredMBeans.remove(key);
mbeanServer.unregisterMBean(key);
DurableSubscriptionView view = (DurableSubscriptionView) durableTopicSubscribers.remove(key);
if (view != null){
@@ -313,6 +328,7 @@
);
SubscriptionView view = new InactiveDurableSubscriptionView(this,key.getClientId(),info);
+ registeredMBeans.add(objectName);
mbeanServer.registerMBean(view,objectName);
inactiveDurableTopicSubscribers.put(objectName,view);
subscriptionKeys.put(key, objectName);
@@ -417,5 +433,17 @@
public void setContextBroker(Broker contextBroker) {
this.contextBroker = contextBroker;
+ }
+
+ protected ObjectName createObjectName(ActiveMQDestination destName) throws MalformedObjectNameException {
+ // Build the object name for the destination
+ Hashtable map=brokerObjectName.getKeyPropertyList();
+ ObjectName objectName = new ObjectName(
+ brokerObjectName.getDomain()+":"+
+ "BrokerName="+map.get("BrokerName")+","+
+ "Type="+JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString())+","+
+ "Destination="+JMXSupport.encodeObjectNamePart(destName.getPhysicalName())
+ );
+ return objectName;
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=383828&r1=383827&r2=383828&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Tue Mar 7 01:25:05 2006
@@ -133,10 +133,7 @@
public void stop() throws Exception {
stopped = true;
ServiceStopper ss = new ServiceStopper();
- ss.stop(queueRegion);
- ss.stop(topicRegion);
- ss.stop(tempQueueRegion);
- ss.stop(tempTopicRegion);
+ doStop(ss);
ss.throwFirstException();
}
@@ -461,6 +458,13 @@
return adaptor != null ? adaptor.getDestinations() : Collections.EMPTY_SET;
}
-
+
+ protected void doStop(ServiceStopper ss) {
+ ss.stop(queueRegion);
+ ss.stop(topicRegion);
+ ss.stop(tempQueueRegion);
+ ss.stop(tempTopicRegion);
+ }
+
}
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ReconnectWithJMXEnabledTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ReconnectWithJMXEnabledTest.java?rev=383828&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ReconnectWithJMXEnabledTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ReconnectWithJMXEnabledTest.java Tue Mar 7 01:25:05 2006
@@ -0,0 +1,84 @@
+/**
+ *
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+package org.apache.activemq.broker;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ *
+ * @version $Revision$
+ */
+public class ReconnectWithJMXEnabledTest extends EmbeddedBrokerTestSupport {
+
+ protected Connection connection;
+ protected boolean transacted;
+ protected int authMode = Session.AUTO_ACKNOWLEDGE;
+
+ public void testTestUseConnectionCloseBrokerThenRestartInSameJVM() throws Exception {
+ connection = connectionFactory.createConnection();
+ useConnection(connection);
+ connection.close();
+
+ broker.stop();
+ broker = createBroker();
+ startBroker();
+
+ connection = connectionFactory.createConnection();
+ useConnection(connection);
+ }
+
+ protected void setUp() throws Exception {
+ bindAddress = "tcp://localhost:61616";
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception {
+ if (connection != null) {
+ connection.close();
+ connection = null;
+ }
+ super.tearDown();
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService answer = new BrokerService();
+ answer.setUseJmx(true);
+ answer.setPersistent(isPersistent());
+ answer.addConnector(bindAddress);
+ return answer;
+ }
+
+ protected void useConnection(Connection connection) throws Exception {
+ connection.setClientID("foo");
+ connection.start();
+ Session session = connection.createSession(transacted, authMode);
+ Destination destination = createDestination();
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(destination);
+ Message message = session.createTextMessage("Hello World");
+ producer.send(message);
+ Thread.sleep(1000);
+ }
+}
Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ReconnectWithJMXEnabledTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ReconnectWithJMXEnabledTest.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ReconnectWithJMXEnabledTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain