You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/08 17:34:00 UTC
svn commit: r1480325 - in /activemq/trunk:
activemq-broker/src/main/java/org/apache/activemq/broker/
activemq-broker/src/main/java/org/apache/activemq/broker/jmx/
activemq-broker/src/main/java/org/apache/activemq/util/
activemq-console/src/main/java/or...
Author: chirino
Date: Wed May 8 15:34:00 2013
New Revision: 1480325
URL: http://svn.apache.org/r1480325
Log:
Implements AMQ-4526: ActiveMQ should automatically restart if a Locker looses it's lock.
* Adds a new broker config option 'restartAllowed'. Set it to false to revert to the preserve behavior.
* Adds a new 'restart' JMX operation on the broker
* The default IO exception handler will trigger a broker restart instead of a broker stop.
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StartCommand.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1480325&r1=1480324&r2=1480325&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java Wed May 8 15:34:00 2013
@@ -236,6 +236,9 @@ public class BrokerService implements Se
private Date startDate;
private boolean slave = true;
+ private boolean restartAllowed = true;
+ private boolean restartRequested = false;
+
static {
try {
@@ -2846,4 +2849,35 @@ public class BrokerService implements Se
public boolean isStopping() {
return this.stopping.get();
}
+
+ /**
+ * @return true if the broker allowed to restart on shutdown.
+ */
+ public boolean isRestartAllowed() {
+ return restartAllowed;
+ }
+
+ /**
+ * Sets if the broker allowed to restart on shutdown.
+ * @return
+ */
+ public void setRestartAllowed(boolean restartAllowed) {
+ this.restartAllowed = restartAllowed;
+ }
+
+ /**
+ * A lifecycle manager of the BrokerService should
+ * inspect this property after a broker shutdown has occurred
+ * to find out if the broker needs to be re-created and started
+ * again.
+ *
+ * @return true if the broker wants to be restarted after it shuts down.
+ */
+ public boolean isRestartRequested() {
+ return restartRequested;
+ }
+
+ public void requestRestart() {
+ this.restartRequested = true;
+ }
}
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java?rev=1480325&r1=1480324&r2=1480325&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java Wed May 8 15:34:00 2013
@@ -133,6 +133,9 @@ public abstract class LockableServiceSup
// we can no longer keep the lock so lets fail
LOG.info(brokerService.getBrokerName() + ", no longer able to keep the exclusive lock so giving up being a master");
try {
+ if( brokerService.isRestartAllowed() ) {
+ brokerService.requestRestart();
+ }
brokerService.stop();
} catch (Exception e) {
LOG.warn("Failure occurred while stopping broker");
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=1480325&r1=1480324&r2=1480325&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Wed May 8 15:34:00 2013
@@ -109,6 +109,12 @@ public class BrokerView implements Broke
}
@Override
+ public void restart() throws Exception {
+ brokerService.requestRestart();
+ brokerService.stop();
+ }
+
+ @Override
public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval)
throws Exception {
brokerService.stopGracefully(connectorName, queueName, timeout, pollInterval);
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=1480325&r1=1480324&r2=1480325&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java Wed May 8 15:34:00 2013
@@ -140,6 +140,13 @@ public interface BrokerViewMBean extends
*/
@MBeanInfo("Stop the broker and all its components.")
void stop() throws Exception;
+
+ /**
+ * Restart the broker and all it's components.
+ */
+ @MBeanInfo("Restart the broker and all its components.")
+ void restart() throws Exception;
+
@MBeanInfo("Poll for queues matching queueName are empty before stopping")
void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception;
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java?rev=1480325&r1=1480324&r2=1480325&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java Wed May 8 15:34:00 2013
@@ -128,6 +128,9 @@ import org.slf4j.LoggerFactory;
new Thread("Stopping the broker due to IO exception") {
public void run() {
try {
+ if( broker.isRestartAllowed() ) {
+ broker.requestRestart();
+ }
broker.stop();
} catch (Exception e) {
LOG.warn("Failure occurred while stopping broker", e);
Modified: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StartCommand.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StartCommand.java?rev=1480325&r1=1480324&r2=1480325&view=diff
==============================================================================
--- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StartCommand.java (original)
+++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StartCommand.java Wed May 8 15:34:00 2013
@@ -18,11 +18,8 @@
package org.apache.activemq.console.command;
import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.CountDownLatch;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
@@ -58,9 +55,6 @@ public class StartCommand extends Abstra
""
};
- private URI configURI;
- private List<BrokerService> brokers = new ArrayList<BrokerService>(5);
-
@Override
public String getName() {
return "start";
@@ -77,124 +71,57 @@ public class StartCommand extends Abstra
* @param brokerURIs
*/
protected void runTask(List<String> brokerURIs) throws Exception {
- try {
- // If no config uri, use default setting
- if (brokerURIs.isEmpty()) {
- setConfigUri(new URI(DEFAULT_CONFIG_URI));
- startBroker(getConfigUri());
-
- // Set configuration data, if available, which in this case
- // would be the config URI
- } else {
- String strConfigURI;
-
- while (!brokerURIs.isEmpty()) {
- strConfigURI = (String)brokerURIs.remove(0);
-
- try {
- setConfigUri(new URI(strConfigURI));
- } catch (URISyntaxException e) {
- context.printException(e);
- return;
- }
+ URI configURI;
- startBroker(getConfigUri());
+ while( true ) {
+ final BrokerService broker;
+ try {
+ // If no config uri, use default setting
+ if (brokerURIs.isEmpty()) {
+ configURI = new URI(DEFAULT_CONFIG_URI);
+ } else {
+ configURI = new URI(brokerURIs.get(0));
}
- }
- // Prevent the main thread from exiting unless it is terminated
- // elsewhere
- } catch (Exception e) {
- context.printException(new RuntimeException("Failed to execute start task. Reason: " + e, e));
- throw new Exception(e);
- }
-
- // The broker start up fine. If this unblocks it's cause they were stopped
- // and this would occur because of an internal error (like the DB going offline)
- waitForShutdown();
- }
+ System.out.println("Loading message broker from: " + configURI);
+ broker = BrokerFactory.createBroker(configURI);
+ broker.start();
+
+ } catch (Exception e) {
+ context.printException(new RuntimeException("Failed to execute start task. Reason: " + e, e));
+ throw e;
+ }
- /**
- * Create and run a broker specified by the given configuration URI
- *
- * @param configURI
- * @throws Exception
- */
- public void startBroker(URI configURI) throws Exception {
- System.out.println("Loading message broker from: " + configURI);
- BrokerService broker = BrokerFactory.createBroker(configURI);
- brokers.add(broker);
- broker.start();
- if (!broker.waitUntilStarted()) {
- throw new Exception(broker.getStartException());
- }
- }
+ if (!broker.waitUntilStarted()) {
+ throw new Exception(broker.getStartException());
+ }
- /**
- * Wait for a shutdown invocation elsewhere
- *
- * @throws Exception
- */
- protected void waitForShutdown() throws Exception {
- final boolean[] shutdown = new boolean[] {
- false
- };
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() {
- for (Iterator<BrokerService> i = brokers.iterator(); i.hasNext();) {
+ // The broker started up fine. Now lets wait for it to stop...
+ final CountDownLatch shutdownLatch = new CountDownLatch(1);
+ final Thread jvmShutdownHook = new Thread() {
+ public void run() {
try {
- BrokerService broker = i.next();
broker.stop();
} catch (Exception e) {
}
}
- }
- });
-
- final AtomicInteger brokerCounter = new AtomicInteger(brokers.size());
- for (BrokerService bs : brokers) {
- bs.addShutdownHook(new Runnable() {
+ };
+
+ Runtime.getRuntime().addShutdownHook(jvmShutdownHook);
+ broker.addShutdownHook(new Runnable() {
public void run() {
- // When the last broker lets us know he is closed....
- if( brokerCounter.decrementAndGet() == 0 ) {
- synchronized (shutdown) {
- shutdown[0] = true;
- shutdown.notify();
- }
- }
+ shutdownLatch.countDown();
}
});
- }
- // Wait for any shutdown event
- synchronized (shutdown) {
- while (!shutdown[0]) {
- try {
- shutdown.wait();
- } catch (InterruptedException e) {
- }
+ // The broker has stopped..
+ shutdownLatch.await();
+ Runtime.getRuntime().removeShutdownHook(jvmShutdownHook);
+ if( !broker.isRestartRequested() ) {
+ break;
}
+ System.out.println("Restarting broker");
}
-
- }
-
- /**
- * Sets the current configuration URI used by the start task
- *
- * @param uri
- */
- public void setConfigUri(URI uri) {
- configURI = uri;
- }
-
- /**
- * Gets the current configuration URI used by the start task
- *
- * @return current configuration URI
- */
- public URI getConfigUri() {
- return configURI;
}
/**