You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/08 17:34:00 UTC

svn commit: r1480325 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/ activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ activemq-broker/src/main/java/org/apache/activemq/util/ activemq-console/src/main/java/or...

Author: chirino
Date: Wed May  8 15:34:00 2013
New Revision: 1480325

URL: http://svn.apache.org/r1480325
Log:
Implements AMQ-4526: ActiveMQ should automatically restart if a Locker looses it's lock.

* Adds a new broker config option 'restartAllowed'.  Set it to false to revert to the preserve behavior.
* Adds a new 'restart' JMX operation on the broker
* The default IO exception handler will trigger a broker restart instead of a broker stop.

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
    activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StartCommand.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1480325&r1=1480324&r2=1480325&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java Wed May  8 15:34:00 2013
@@ -236,6 +236,9 @@ public class BrokerService implements Se
     private Date startDate;
     private boolean slave = true;
 
+    private boolean restartAllowed = true;
+    private boolean restartRequested = false;
+
     static {
 
         try {
@@ -2846,4 +2849,35 @@ public class BrokerService implements Se
     public boolean isStopping() {
         return this.stopping.get();
     }
+
+    /**
+     * @return true if the broker allowed to restart on shutdown.
+     */
+    public boolean isRestartAllowed() {
+        return restartAllowed;
+    }
+
+    /**
+     * Sets if the broker allowed to restart on shutdown.
+     * @return
+     */
+    public void setRestartAllowed(boolean restartAllowed) {
+        this.restartAllowed = restartAllowed;
+    }
+
+    /**
+     * A lifecycle manager of the BrokerService should
+     * inspect this property after a broker shutdown has occurred
+     * to find out if the broker needs to be re-created and started
+     * again.
+     *
+     * @return true if the broker wants to be restarted after it shuts down.
+     */
+    public boolean isRestartRequested() {
+        return restartRequested;
+    }
+
+    public void requestRestart() {
+        this.restartRequested = true;
+    }
 }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java?rev=1480325&r1=1480324&r2=1480325&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java Wed May  8 15:34:00 2013
@@ -133,6 +133,9 @@ public abstract class LockableServiceSup
         // we can no longer keep the lock so lets fail
         LOG.info(brokerService.getBrokerName() + ", no longer able to keep the exclusive lock so giving up being a master");
         try {
+            if( brokerService.isRestartAllowed() ) {
+                brokerService.requestRestart();
+            }
             brokerService.stop();
         } catch (Exception e) {
             LOG.warn("Failure occurred while stopping broker");

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=1480325&r1=1480324&r2=1480325&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Wed May  8 15:34:00 2013
@@ -109,6 +109,12 @@ public class BrokerView implements Broke
     }
 
     @Override
+    public void restart() throws Exception {
+        brokerService.requestRestart();
+        brokerService.stop();
+    }
+
+    @Override
     public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval)
             throws Exception {
         brokerService.stopGracefully(connectorName, queueName, timeout, pollInterval);

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=1480325&r1=1480324&r2=1480325&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java Wed May  8 15:34:00 2013
@@ -140,6 +140,13 @@ public interface BrokerViewMBean extends
      */
     @MBeanInfo("Stop the broker and all its components.")
     void stop() throws Exception;
+
+    /**
+     * Restart the broker and all it's components.
+     */
+    @MBeanInfo("Restart the broker and all its components.")
+    void restart() throws Exception;
+
     @MBeanInfo("Poll for queues matching queueName are empty before stopping")
     void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception;
 

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java?rev=1480325&r1=1480324&r2=1480325&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java Wed May  8 15:34:00 2013
@@ -128,6 +128,9 @@ import org.slf4j.LoggerFactory;
         new Thread("Stopping the broker due to IO exception") {
             public void run() {
                 try {
+                    if( broker.isRestartAllowed() ) {
+                        broker.requestRestart();
+                    }
                     broker.stop();
                 } catch (Exception e) {
                     LOG.warn("Failure occurred while stopping broker", e);

Modified: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StartCommand.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StartCommand.java?rev=1480325&r1=1480324&r2=1480325&view=diff
==============================================================================
--- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StartCommand.java (original)
+++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StartCommand.java Wed May  8 15:34:00 2013
@@ -18,11 +18,8 @@
 package org.apache.activemq.console.command;
 
 import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
@@ -58,9 +55,6 @@ public class StartCommand extends Abstra
         ""
     };
 
-    private URI configURI;
-    private List<BrokerService> brokers = new ArrayList<BrokerService>(5);
-
     @Override
     public String getName() {
         return "start";
@@ -77,124 +71,57 @@ public class StartCommand extends Abstra
      * @param brokerURIs
      */
     protected void runTask(List<String> brokerURIs) throws Exception {
-        try {
-            // If no config uri, use default setting
-            if (brokerURIs.isEmpty()) {
-                setConfigUri(new URI(DEFAULT_CONFIG_URI));
-                startBroker(getConfigUri());
-
-                // Set configuration data, if available, which in this case
-                // would be the config URI
-            } else {
-                String strConfigURI;
-
-                while (!brokerURIs.isEmpty()) {
-                    strConfigURI = (String)brokerURIs.remove(0);
-
-                    try {
-                        setConfigUri(new URI(strConfigURI));
-                    } catch (URISyntaxException e) {
-                        context.printException(e);
-                        return;
-                    }
+        URI configURI;
 
-                    startBroker(getConfigUri());
+        while( true ) {
+            final BrokerService broker;
+            try {
+                // If no config uri, use default setting
+                if (brokerURIs.isEmpty()) {
+                    configURI = new URI(DEFAULT_CONFIG_URI);
+                } else {
+                    configURI = new URI(brokerURIs.get(0));
                 }
-            }
 
-            // Prevent the main thread from exiting unless it is terminated
-            // elsewhere
-        } catch (Exception e) {
-            context.printException(new RuntimeException("Failed to execute start task. Reason: " + e, e));
-            throw new Exception(e);
-        }
-        
-        // The broker start up fine.  If this unblocks it's cause they were stopped
-        // and this would occur because of an internal error (like the DB going offline)
-        waitForShutdown();
-    }
+                System.out.println("Loading message broker from: " + configURI);
+                broker = BrokerFactory.createBroker(configURI);
+                broker.start();
+
+            } catch (Exception e) {
+                context.printException(new RuntimeException("Failed to execute start task. Reason: " + e, e));
+                throw e;
+            }
 
-    /**
-     * Create and run a broker specified by the given configuration URI
-     * 
-     * @param configURI
-     * @throws Exception
-     */
-    public void startBroker(URI configURI) throws Exception {
-        System.out.println("Loading message broker from: " + configURI);
-        BrokerService broker = BrokerFactory.createBroker(configURI);
-        brokers.add(broker);
-        broker.start();
-        if (!broker.waitUntilStarted()) {
-            throw new Exception(broker.getStartException());
-        }
-    }
+            if (!broker.waitUntilStarted()) {
+                throw new Exception(broker.getStartException());
+            }
 
-    /**
-     * Wait for a shutdown invocation elsewhere
-     * 
-     * @throws Exception
-     */
-    protected void waitForShutdown() throws Exception {
-        final boolean[] shutdown = new boolean[] {
-            false
-        };
-        
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            public void run() {
-                for (Iterator<BrokerService> i = brokers.iterator(); i.hasNext();) {
+            // The broker started up fine.  Now lets wait for it to stop...
+            final CountDownLatch shutdownLatch = new CountDownLatch(1);
+            final Thread jvmShutdownHook = new Thread() {
+                public void run() {
                     try {
-                        BrokerService broker = i.next();
                         broker.stop();
                     } catch (Exception e) {
                     }
                 }
-            }
-        });
-        
-        final AtomicInteger brokerCounter = new AtomicInteger(brokers.size());
-        for (BrokerService bs : brokers) {
-            bs.addShutdownHook(new Runnable() {
+            };
+
+            Runtime.getRuntime().addShutdownHook(jvmShutdownHook);
+            broker.addShutdownHook(new Runnable() {
                 public void run() {
-                    // When the last broker lets us know he is closed....
-                    if( brokerCounter.decrementAndGet() == 0 ) {
-                        synchronized (shutdown) {
-                            shutdown[0] = true;
-                            shutdown.notify();
-                        }
-                    }
+                    shutdownLatch.countDown();
                 }
             });
-        }
 
-        // Wait for any shutdown event
-        synchronized (shutdown) {
-            while (!shutdown[0]) {
-                try {
-                    shutdown.wait();
-                } catch (InterruptedException e) {
-                }
+            // The broker has stopped..
+            shutdownLatch.await();
+            Runtime.getRuntime().removeShutdownHook(jvmShutdownHook);
+            if( !broker.isRestartRequested() ) {
+                break;
             }
+            System.out.println("Restarting broker");
         }
-
-    }
-
-    /**
-     * Sets the current configuration URI used by the start task
-     * 
-     * @param uri
-     */
-    public void setConfigUri(URI uri) {
-        configURI = uri;
-    }
-
-    /**
-     * Gets the current configuration URI used by the start task
-     * 
-     * @return current configuration URI
-     */
-    public URI getConfigUri() {
-        return configURI;
     }
 
     /**