You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2012/07/18 15:43:09 UTC

svn commit: r1362950 - in /activemq/trunk: activemq-console/src/main/java/org/apache/activemq/console/command/StartCommand.java activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java

Author: dejanb
Date: Wed Jul 18 13:43:09 2012
New Revision: 1362950

URL: http://svn.apache.org/viewvc?rev=1362950&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3696 - start broker asynchronously since hanging in start() method leads to problems with stopping slaves in osgi

Modified:
    activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StartCommand.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java

Modified: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StartCommand.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StartCommand.java?rev=1362950&r1=1362949&r2=1362950&view=diff
==============================================================================
--- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StartCommand.java (original)
+++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StartCommand.java Wed Jul 18 13:43:09 2012
@@ -115,6 +115,9 @@ public class StartCommand extends Abstra
         BrokerService broker = BrokerFactory.createBroker(configURI);
         brokers.add(broker);
         broker.start();
+        if (!broker.waitUntilStarted()) {
+            throw broker.getStartException();
+        }
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1362950&r1=1362949&r2=1362950&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Wed Jul 18 13:43:09 2012
@@ -221,6 +221,10 @@ public class BrokerService implements Se
     private int offlineDurableSubscriberTaskSchedule = 300000;
     private DestinationFilter virtualConsumerDestinationFilter;
 
+    private final Object persistenceAdapterLock = new Object();
+    private boolean persistenceAdapterStarted = false;
+    private Exception startException = null;
+
     static {
         String localHostName = "localhost";
         try {
@@ -517,54 +521,8 @@ public class BrokerService implements Se
                 startManagementContext();
             }
 
-            getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
-            getPersistenceAdapter().setBrokerName(getBrokerName());
-            LOG.info("Using Persistence Adapter: " + getPersistenceAdapter());
-            if (deleteAllMessagesOnStartup) {
-                deleteAllMessages();
-            }
-            getPersistenceAdapter().start();
-            slave = false;
-            startDestinations();
-            addShutdownHook();
-            getBroker().start();
-            if (isUseJmx()) {
-                if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
-                    // try to restart management context
-                    // typical for slaves that use the same ports as master
-                    managementContext.stop();
-                    startManagementContext();
-                }
-                ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
-                managedBroker.setContextBroker(broker);
-                adminView.setBroker(managedBroker);
-            }
-            BrokerRegistry.getInstance().bind(getBrokerName(), this);
-            // see if there is a MasterBroker service and if so, configure
-            // it and start it.
-            for (Service service : services) {
-                if (service instanceof MasterConnector) {
-                    configureService(service);
-                    service.start();
-                }
-            }
-            if (!isSlave() && (this.masterConnector == null || isShutdownOnMasterFailure() == false)) {
-                startAllConnectors();
-            }
-            if (!stopped.get()) {
-                if (isUseJmx() && masterConnector != null) {
-                    registerFTConnectorMBean(masterConnector);
-                }
-            }
-            if (brokerId == null) {
-                brokerId = broker.getBrokerId();
-            }
-            if (ioExceptionHandler == null) {
-                setIoExceptionHandler(new DefaultIOExceptionHandler());
-            }
-            LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started");
-            getBroker().brokerServiceStarted();
-            checkSystemUsageLimits();
+            startPersistenceAdapter();
+            startBroker();
             startedLatch.countDown();
         } catch (Exception e) {
             LOG.error("Failed to start ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + "). Reason: " + e, e);
@@ -581,6 +539,88 @@ public class BrokerService implements Se
         }
     }
 
+    private void startPersistenceAdapter() throws Exception {
+        new Thread() {
+            @Override
+            public void run() {
+                try {
+                    getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
+                    getPersistenceAdapter().setBrokerName(getBrokerName());
+                    LOG.info("Using Persistence Adapter: " + getPersistenceAdapter());
+                    if (deleteAllMessagesOnStartup) {
+                        deleteAllMessages();
+                    }
+                    getPersistenceAdapter().start();
+                } catch (Exception e) {
+                    startException = e;
+                } finally {
+                    synchronized (persistenceAdapterLock) {
+                        persistenceAdapterLock.notifyAll();
+                    }
+                }
+            }
+        }.start();
+    }
+
+    private void startBroker() throws Exception {
+        new Thread() {
+            @Override
+            public void run() {
+                try {
+                    synchronized (persistenceAdapterLock) {
+                        persistenceAdapterLock.wait();
+                    }
+                    if (startException != null) {
+                        return;
+                    }
+                    slave = false;
+                    startDestinations();
+                    addShutdownHook();
+                    getBroker().start();
+                    if (isUseJmx()) {
+                        if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
+                            // try to restart management context
+                            // typical for slaves that use the same ports as master
+                            managementContext.stop();
+                            startManagementContext();
+                        }
+                        ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
+                        managedBroker.setContextBroker(broker);
+                        adminView.setBroker(managedBroker);
+                    }
+                    BrokerRegistry.getInstance().bind(getBrokerName(), BrokerService.this);
+                    // see if there is a MasterBroker service and if so, configure
+                    // it and start it.
+                    for (Service service : services) {
+                        if (service instanceof MasterConnector) {
+                            configureService(service);
+                            service.start();
+                        }
+                    }
+                    if (!isSlave() && (masterConnector == null || isShutdownOnMasterFailure() == false)) {
+                        startAllConnectors();
+                    }
+                    if (!stopped.get()) {
+                        if (isUseJmx() && masterConnector != null) {
+                            registerFTConnectorMBean(masterConnector);
+                        }
+                    }
+                    if (brokerId == null) {
+                        brokerId = broker.getBrokerId();
+                    }
+                    if (ioExceptionHandler == null) {
+                        setIoExceptionHandler(new DefaultIOExceptionHandler());
+                    }
+                    LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started");
+                    getBroker().brokerServiceStarted();
+                    checkSystemUsageLimits();
+                } catch (Exception e) {
+                    startException = e;
+                }
+            }
+        }.start();
+    }
+
     /**
      *
      * @throws Exception
@@ -783,6 +823,9 @@ public class BrokerService implements Se
         boolean waitSucceeded = false;
         while (isStarted() && !stopped.get() && !waitSucceeded) {
             try {
+                if (startException != null) {
+                    return waitSucceeded;
+                }
                 waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS);
             } catch (InterruptedException ignore) {
             }
@@ -2718,4 +2761,8 @@ public class BrokerService implements Se
         return isUseVirtualTopics() && destination.isQueue() &&
                 getVirtualTopicConsumerDestinationFilter().matches(destination);
     }
+
+    public Exception getStartException() {
+        return startException;
+    }
 }