You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2012/07/18 15:43:09 UTC
svn commit: r1362950 - in /activemq/trunk:
activemq-console/src/main/java/org/apache/activemq/console/command/StartCommand.java
activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Author: dejanb
Date: Wed Jul 18 13:43:09 2012
New Revision: 1362950
URL: http://svn.apache.org/viewvc?rev=1362950&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3696 - start broker asynchronously since hanging in start() method leads to problems with stopping slaves in osgi
Modified:
activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StartCommand.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Modified: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StartCommand.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StartCommand.java?rev=1362950&r1=1362949&r2=1362950&view=diff
==============================================================================
--- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StartCommand.java (original)
+++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StartCommand.java Wed Jul 18 13:43:09 2012
@@ -115,6 +115,9 @@ public class StartCommand extends Abstra
BrokerService broker = BrokerFactory.createBroker(configURI);
brokers.add(broker);
broker.start();
+ if (!broker.waitUntilStarted()) {
+ throw broker.getStartException();
+ }
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1362950&r1=1362949&r2=1362950&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Wed Jul 18 13:43:09 2012
@@ -221,6 +221,10 @@ public class BrokerService implements Se
private int offlineDurableSubscriberTaskSchedule = 300000;
private DestinationFilter virtualConsumerDestinationFilter;
+ private final Object persistenceAdapterLock = new Object();
+ private boolean persistenceAdapterStarted = false;
+ private Exception startException = null;
+
static {
String localHostName = "localhost";
try {
@@ -517,54 +521,8 @@ public class BrokerService implements Se
startManagementContext();
}
- getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
- getPersistenceAdapter().setBrokerName(getBrokerName());
- LOG.info("Using Persistence Adapter: " + getPersistenceAdapter());
- if (deleteAllMessagesOnStartup) {
- deleteAllMessages();
- }
- getPersistenceAdapter().start();
- slave = false;
- startDestinations();
- addShutdownHook();
- getBroker().start();
- if (isUseJmx()) {
- if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
- // try to restart management context
- // typical for slaves that use the same ports as master
- managementContext.stop();
- startManagementContext();
- }
- ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
- managedBroker.setContextBroker(broker);
- adminView.setBroker(managedBroker);
- }
- BrokerRegistry.getInstance().bind(getBrokerName(), this);
- // see if there is a MasterBroker service and if so, configure
- // it and start it.
- for (Service service : services) {
- if (service instanceof MasterConnector) {
- configureService(service);
- service.start();
- }
- }
- if (!isSlave() && (this.masterConnector == null || isShutdownOnMasterFailure() == false)) {
- startAllConnectors();
- }
- if (!stopped.get()) {
- if (isUseJmx() && masterConnector != null) {
- registerFTConnectorMBean(masterConnector);
- }
- }
- if (brokerId == null) {
- brokerId = broker.getBrokerId();
- }
- if (ioExceptionHandler == null) {
- setIoExceptionHandler(new DefaultIOExceptionHandler());
- }
- LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started");
- getBroker().brokerServiceStarted();
- checkSystemUsageLimits();
+ startPersistenceAdapter();
+ startBroker();
startedLatch.countDown();
} catch (Exception e) {
LOG.error("Failed to start ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + "). Reason: " + e, e);
@@ -581,6 +539,88 @@ public class BrokerService implements Se
}
}
+ private void startPersistenceAdapter() throws Exception {
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
+ getPersistenceAdapter().setBrokerName(getBrokerName());
+ LOG.info("Using Persistence Adapter: " + getPersistenceAdapter());
+ if (deleteAllMessagesOnStartup) {
+ deleteAllMessages();
+ }
+ getPersistenceAdapter().start();
+ } catch (Exception e) {
+ startException = e;
+ } finally {
+ synchronized (persistenceAdapterLock) {
+ persistenceAdapterLock.notifyAll();
+ }
+ }
+ }
+ }.start();
+ }
+
+ private void startBroker() throws Exception {
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ synchronized (persistenceAdapterLock) {
+ persistenceAdapterLock.wait();
+ }
+ if (startException != null) {
+ return;
+ }
+ slave = false;
+ startDestinations();
+ addShutdownHook();
+ getBroker().start();
+ if (isUseJmx()) {
+ if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
+ // try to restart management context
+ // typical for slaves that use the same ports as master
+ managementContext.stop();
+ startManagementContext();
+ }
+ ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
+ managedBroker.setContextBroker(broker);
+ adminView.setBroker(managedBroker);
+ }
+ BrokerRegistry.getInstance().bind(getBrokerName(), BrokerService.this);
+ // see if there is a MasterBroker service and if so, configure
+ // it and start it.
+ for (Service service : services) {
+ if (service instanceof MasterConnector) {
+ configureService(service);
+ service.start();
+ }
+ }
+ if (!isSlave() && (masterConnector == null || isShutdownOnMasterFailure() == false)) {
+ startAllConnectors();
+ }
+ if (!stopped.get()) {
+ if (isUseJmx() && masterConnector != null) {
+ registerFTConnectorMBean(masterConnector);
+ }
+ }
+ if (brokerId == null) {
+ brokerId = broker.getBrokerId();
+ }
+ if (ioExceptionHandler == null) {
+ setIoExceptionHandler(new DefaultIOExceptionHandler());
+ }
+ LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started");
+ getBroker().brokerServiceStarted();
+ checkSystemUsageLimits();
+ } catch (Exception e) {
+ startException = e;
+ }
+ }
+ }.start();
+ }
+
/**
*
* @throws Exception
@@ -783,6 +823,9 @@ public class BrokerService implements Se
boolean waitSucceeded = false;
while (isStarted() && !stopped.get() && !waitSucceeded) {
try {
+ if (startException != null) {
+ return waitSucceeded;
+ }
waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {
}
@@ -2718,4 +2761,8 @@ public class BrokerService implements Se
return isUseVirtualTopics() && destination.isQueue() &&
getVirtualTopicConsumerDestinationFilter().matches(destination);
}
+
+ public Exception getStartException() {
+ return startException;
+ }
}