You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/05/11 16:15:58 UTC
svn commit: r773569 - in /activemq/trunk/activemq-core: ./
src/main/java/org/apache/activemq/broker/
src/main/java/org/apache/activemq/broker/jmx/
src/main/java/org/apache/activemq/broker/region/
src/main/java/org/apache/activemq/kaha/impl/async/ src/m...
Author: dejanb
Date: Mon May 11 14:15:58 2009
New Revision: 773569
URL: http://svn.apache.org/viewvc?rev=773569&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-2245 - broker restart
Modified:
activemq/trunk/activemq-core/ (props changed)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java
Propchange: activemq/trunk/activemq-core/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Mon May 11 14:15:58 2009
@@ -1,4 +1,3 @@
-
target
foo
activemq-data
@@ -16,3 +15,6 @@
derbydb
testJdbcConfig
amqstore
+broker1
+kahadir
+data
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=773569&r1=773568&r2=773569&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Mon May 11 14:15:58 2009
@@ -31,12 +31,13 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
+
import org.apache.activemq.ActiveMQConnectionMetaData;
import org.apache.activemq.Service;
-import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
import org.apache.activemq.broker.ft.MasterConnector;
@@ -72,6 +73,7 @@
import org.apache.activemq.proxy.ProxyConnector;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.security.SecurityContext;
+import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.PersistenceAdapterFactory;
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
@@ -178,6 +180,8 @@
private int systemExitOnShutdownExitCode;
private SslContext sslContext;
+ private boolean forceStart = false;
+
static {
String localHostName = "localhost";
try {
@@ -418,6 +422,11 @@
return started.get();
}
+ public void start(boolean force) throws Exception {
+ forceStart = force;
+ start();
+ }
+
// Service interface
// -------------------------------------------------------------------------
public void start() throws Exception {
@@ -456,12 +465,22 @@
startDestinations();
addShutdownHook();
-
+
+ getBroker().start();
+
if (isUseJmx()) {
- getManagementContext().start();
+ getManagementContext().start();
+ ManagedRegionBroker managedBroker = (ManagedRegionBroker)regionBroker;
+ managedBroker.setContextBroker(broker);
+ adminView = new BrokerView(this, managedBroker);
+ MBeanServer mbeanServer = getManagementContext().getMBeanServer();
+ if (mbeanServer != null) {
+ ObjectName objectName = getBrokerObjectName();
+ mbeanServer.registerMBean(adminView, objectName);
+ registeredMBeanNames.add(objectName);
+ }
}
- getBroker().start();
BrokerRegistry.getInstance().bind(getBrokerName(), this);
// see if there is a MasterBroker service and if so, configure
@@ -532,6 +551,7 @@
}
}
}
+ registeredMBeanNames.clear();
stopper.stop(getManagementContext());
}
// Clear SelectorParser cache to free memory
@@ -1585,30 +1605,27 @@
// Add a filter that will stop access to the broker once stopped
broker = new MutableBrokerFilter(broker) {
- public void stop() throws Exception {
- Broker old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
+ Broker old;
+
+ public void stop() throws Exception {
+ old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
// Just ignore additional stop actions.
public void stop() throws Exception {
}
+
});
old.stop();
}
+
+ public void start() throws Exception {
+ if (forceStart && old != null) {
+ this.next.set(old);
+ }
+ getNext().start();
+ }
+
};
-// RegionBroker rBroker = (RegionBroker)regionBroker;
-
- if (isUseJmx()) {
- ManagedRegionBroker managedBroker = (ManagedRegionBroker)regionBroker;
- managedBroker.setContextBroker(broker);
- adminView = new BrokerView(this, managedBroker);
- MBeanServer mbeanServer = getManagementContext().getMBeanServer();
- if (mbeanServer != null) {
- ObjectName objectName = getBrokerObjectName();
- mbeanServer.registerMBean(adminView, objectName);
- registeredMBeanNames.add(objectName);
- }
- }
-
return broker;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java?rev=773569&r1=773568&r2=773569&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java Mon May 11 14:15:58 2009
@@ -20,6 +20,7 @@
import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -62,6 +63,7 @@
private AtomicBoolean started = new AtomicBoolean(false);
private JMXConnectorServer connectorServer;
private ObjectName namingServiceObjectName;
+ private Registry registry;
public ManagementContext() {
this(null);
@@ -121,6 +123,7 @@
MBeanServerFactory.releaseMBeanServer(beanServer);
}
}
+ beanServer = null;
}
}
@@ -361,7 +364,9 @@
private void createConnector(MBeanServer mbeanServer) throws MalformedObjectNameException, MalformedURLException, IOException {
// Create the NamingService, needed by JSR 160
try {
- LocateRegistry.createRegistry(connectorPort);
+ if (registry == null) {
+ registry = LocateRegistry.createRegistry(connectorPort);
+ }
namingServiceObjectName = ObjectName.getInstance("naming:type=rmiregistry");
// Do not use the createMBean as the mx4j jar may not be in the
// same class loader than the server
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=773569&r1=773568&r2=773569&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Mon May 11 14:15:58 2009
@@ -184,6 +184,11 @@
ServiceStopper ss = new ServiceStopper();
doStop(ss);
ss.throwFirstException();
+ // clear the state
+ clientIdSet.clear();
+ connections.clear();
+ destinations.clear();
+ brokerInfos.clear();
}
public PolicyMap getDestinationPolicy() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?rev=773569&r1=773568&r2=773569&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java Mon May 11 14:15:58 2009
@@ -90,7 +90,7 @@
protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
protected DataFileAppender appender;
- protected DataFileAccessorPool accessorPool = new DataFileAccessorPool(this);
+ protected DataFileAccessorPool accessorPool;
protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
@@ -120,6 +120,7 @@
preferedFileLength=Math.max(PREFERED_DIFF, getMaxFileLength()-PREFERED_DIFF);
lock();
+ accessorPool = new DataFileAccessorPool(this);
ByteSequence sequence = controlFile.load();
if (sequence != null && sequence.getLength() > 0) {
unmarshallState(sequence);
@@ -197,7 +198,7 @@
public void lock() throws IOException {
synchronized (this) {
- if (controlFile == null) {
+ if (controlFile == null || controlFile.isDisposed()) {
IOHelper.mkdirs(directory);
controlFile = new ControlFile(new File(directory, filePrefix + "control"), CONTROL_RECORD_MAX_LENGTH);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java?rev=773569&r1=773568&r2=773569&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java Mon May 11 14:15:58 2009
@@ -179,4 +179,8 @@
}
}
+ public boolean isDisposed() {
+ return disposed;
+ }
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=773569&r1=773568&r2=773569&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Mon May 11 14:15:58 2009
@@ -203,7 +203,7 @@
asyncDataManager.lock();
break;
} catch (IOException e) {
- LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked.");
+ LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked.", e);
try {
Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
} catch (InterruptedException e1) {
@@ -325,6 +325,7 @@
topics.clear();
IOException firstException = null;
referenceStoreAdapter.stop();
+ referenceStoreAdapter = null;
try {
LOG.debug("Journal close");
asyncDataManager.close();
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java?rev=773569&r1=773568&r2=773569&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java Mon May 11 14:15:58 2009
@@ -19,8 +19,16 @@
import java.net.URI;
import java.util.List;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
@@ -52,6 +60,39 @@
assertEquals(new ActiveMQTopic("include.test.bar"), dynamicallyIncludedDestinations.get(1));
}
+
+ public void testBrokerRestartFails() throws Exception {
+ brokerService.stop();
+ brokerService.waitUntilStopped();
+
+ try {
+ brokerService.start();
+ } catch (Exception e) {
+ return;
+ }
+ fail("Error broker should have prevented us from starting it again");
+ }
+
+ public void testForceBrokerRestart() throws Exception {
+ brokerService.stop();
+ brokerService.waitUntilStopped();
+
+ brokerService.start(true); // force restart
+ brokerService.waitUntilStarted();
+
+ //send and receive a message from a restarted broker
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61636");
+ Connection conn = factory.createConnection();
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ conn.start();
+ Destination dest = new ActiveMQQueue("test");
+ MessageProducer producer = sess.createProducer(dest);
+ MessageConsumer consumer = sess.createConsumer(dest);
+ producer.send(sess.createTextMessage("test"));
+ TextMessage msg = (TextMessage)consumer.receive(1000);
+ assertEquals("test", msg.getText());
+ }
+
protected void setUp() throws Exception {
brokerService = createBroker();