You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/01/30 18:15:05 UTC
svn commit: r1440531 -
/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Author: tabish
Date: Wed Jan 30 17:15:05 2013
New Revision: 1440531
URL: http://svn.apache.org/viewvc?rev=1440531&view=rev
Log:
apply patch for: https://issues.apache.org/jira/browse/AMQ-4082
Reduces async work that's don't during network bridge startup.
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1440531&r1=1440530&r2=1440531&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Wed Jan 30 17:15:05 2013
@@ -25,9 +25,12 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -112,7 +115,8 @@ public abstract class DemandForwardingBr
protected int demandConsumerDispatched;
protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
- protected AtomicBoolean disposed = new AtomicBoolean();
+ protected final AtomicBoolean bridgeFailed = new AtomicBoolean();
+ protected final AtomicBoolean disposed = new AtomicBoolean();
protected BrokerId localBrokerId;
protected ActiveMQDestination[] excludedDestinations;
protected ActiveMQDestination[] dynamicallyIncludedDestinations;
@@ -128,7 +132,6 @@ public abstract class DemandForwardingBr
protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
protected final BrokerId remoteBrokerPath[] = new BrokerId[] { null };
- protected Object brokerInfoMutex = new Object();
protected BrokerId remoteBrokerId;
final AtomicLong enqueueCounter = new AtomicLong();
@@ -139,6 +142,9 @@ public abstract class DemandForwardingBr
private BrokerInfo localBrokerInfo;
private BrokerInfo remoteBrokerInfo;
+ private final FutureBrokerInfo futureRemoteBrokerInfo = new FutureBrokerInfo(remoteBrokerInfo, disposed);
+ private final FutureBrokerInfo futureLocalBrokerInfo = new FutureBrokerInfo(localBrokerInfo, disposed);
+
private final AtomicBoolean started = new AtomicBoolean();
private TransportConnection duplexInitiatingConnection;
private BrokerService brokerService = null;
@@ -197,9 +203,14 @@ public abstract class DemandForwardingBr
@Override
public void onException(IOException error) {
+ if (!futureLocalBrokerInfo.isDone()) {
+ futureLocalBrokerInfo.cancel(true);
+ return;
+ }
serviceLocalException(error);
}
});
+
remoteBroker.setTransportListener(new DefaultTransportListener() {
@Override
@@ -210,16 +221,20 @@ public abstract class DemandForwardingBr
@Override
public void onException(IOException error) {
+ if (!futureRemoteBrokerInfo.isDone()) {
+ futureRemoteBrokerInfo.cancel(true);
+ return;
+ }
serviceRemoteException(error);
}
-
});
- localBroker.start();
remoteBroker.start();
+ localBroker.start();
+
if (!disposed.get()) {
try {
- triggerRemoteStartBridge();
+ triggerStartAsyncNetworkBridgeCreation();
} catch (IOException e) {
LOG.warn("Caught exception from remote start", e);
}
@@ -230,33 +245,92 @@ public abstract class DemandForwardingBr
}
}
- protected void triggerLocalStartBridge() throws IOException {
- brokerService.getTaskRunnerFactory().execute(new Runnable() {
- @Override
- public void run() {
- final String originalName = Thread.currentThread().getName();
- Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker);
+ @Override
+ public void stop() throws Exception {
+ if (started.compareAndSet(true, false)) {
+ if (disposed.compareAndSet(false, true)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName);
+ }
+
+ futureRemoteBrokerInfo.cancel(true);
+ futureLocalBrokerInfo.cancel(true);
+
+ NetworkBridgeListener l = this.networkBridgeListener;
+ if (l != null) {
+ l.onStop(this);
+ }
try {
- startLocalBridge();
- } catch (Throwable e) {
- serviceLocalException(e);
+ remoteBridgeStarted.set(false);
+ final CountDownLatch sendShutdown = new CountDownLatch(1);
+
+ brokerService.getTaskRunnerFactory().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ serialExecutor.shutdown();
+ if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+ List<Runnable> pendingTasks = serialExecutor.shutdownNow();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("pending tasks on stop" + pendingTasks);
+ }
+ }
+ localBroker.oneway(new ShutdownInfo());
+ remoteBroker.oneway(new ShutdownInfo());
+ } catch (Throwable e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Caught exception sending shutdown", e);
+ }
+ } finally {
+ sendShutdown.countDown();
+ }
+
+ }
+ }, "ActiveMQ ForwardingBridge StopTask");
+
+ if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
+ LOG.info("Network Could not shutdown in a timely manner");
+ }
} finally {
- Thread.currentThread().setName(originalName);
+ ServiceStopper ss = new ServiceStopper();
+ ss.stop(remoteBroker);
+ ss.stop(localBroker);
+ ss.stop(duplexInboundLocalBroker);
+ // Release the started Latch since another thread could be
+ // stuck waiting for it to start up.
+ startedLatch.countDown();
+ startedLatch.countDown();
+ localStartedLatch.countDown();
+
+ ss.throwFirstException();
}
}
- });
+
+ if (remoteBrokerInfo != null) {
+ brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
+ brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
+ if (LOG.isInfoEnabled()) {
+ LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
+ }
+ }
+ }
}
- protected void triggerRemoteStartBridge() throws IOException {
+ protected void triggerStartAsyncNetworkBridgeCreation() throws IOException {
brokerService.getTaskRunnerFactory().execute(new Runnable() {
@Override
public void run() {
final String originalName = Thread.currentThread().getName();
- Thread.currentThread().setName("StartRemoteBridge: remoteBroker=" + remoteBroker);
+ Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " +
+ "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker);
+
try {
- startRemoteBridge();
- } catch (Exception e) {
- serviceRemoteException(e);
+ // First we collect the info data from both the local and remote ends
+ collectBrokerInfos();
+
+ // Once we have all required broker info we can attempt to start
+ // the local and then remote sides of the bridge.
+ doStartLocalAndRemoteBridges();
} finally {
Thread.currentThread().setName(originalName);
}
@@ -264,6 +338,92 @@ public abstract class DemandForwardingBr
});
}
+ private void collectBrokerInfos() {
+
+ // First wait for the remote to feed us its BrokerInfo, then we can check on
+ // the LocalBrokerInfo and decide is this is a loop.
+ try {
+ remoteBrokerInfo = futureRemoteBrokerInfo.get();
+ if (remoteBrokerInfo == null) {
+ fireBridgeFailed();
+ }
+ } catch (Exception e) {
+ serviceRemoteException(e);
+ return;
+ }
+
+ try {
+ localBrokerInfo = futureLocalBrokerInfo.get();
+ if (localBrokerInfo == null) {
+ fireBridgeFailed();
+ }
+
+ // Before we try and build the bridge lets check if we are in a loop
+ // and if so just stop now before registering anything.
+ if (localBrokerId.equals(remoteBrokerId)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(configuration.getBrokerName() +
+ " disconnecting remote loop back connection for: " +
+ remoteBrokerName + ", with id:" + remoteBrokerId);
+ }
+ ServiceSupport.dispose(localBroker);
+ ServiceSupport.dispose(remoteBroker);
+ return;
+ }
+
+ // Fill in the remote broker's information now.
+ remoteBrokerId = remoteBrokerInfo.getBrokerId();
+ remoteBrokerPath[0] = remoteBrokerId;
+ remoteBrokerName = remoteBrokerInfo.getBrokerName();
+ } catch (Throwable e) {
+ serviceLocalException(e);
+ }
+ }
+
+ private void doStartLocalAndRemoteBridges() {
+ try {
+ startLocalBridge();
+ } catch (Throwable e) {
+ serviceLocalException(e);
+ return;
+ }
+
+ try {
+
+ if (disposed.get()) {
+ return;
+ }
+
+ Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
+ try {
+ IntrospectionSupport.getProperties(configuration, props, null);
+ if (configuration.getExcludedDestinations() != null) {
+ excludedDestinations = configuration.getExcludedDestinations().toArray(
+ new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
+ }
+ if (configuration.getStaticallyIncludedDestinations() != null) {
+ staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
+ new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
+ }
+ if (configuration.getDynamicallyIncludedDestinations() != null) {
+ dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
+ new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
+ }
+ } catch (Throwable t) {
+ LOG.error("Error mapping remote destinations", t);
+ }
+
+ // Let the local broker know the remote broker's ID.
+ localBroker.oneway(remoteBrokerInfo);
+ // new peer broker (a consumer can work with remote broker also)
+ brokerService.getBroker().addBroker(null, remoteBrokerInfo);
+
+ startRemoteBridge();
+ } catch (Throwable e) {
+ serviceRemoteException(e);
+ }
+ }
+
private void startLocalBridge() throws Throwable {
if (localBridgeStarted.compareAndSet(false, true)) {
synchronized (this) {
@@ -334,8 +494,6 @@ public abstract class DemandForwardingBr
localStartedLatch.countDown();
}
- safeWaitUntilStarted();
-
if (!disposed.get()) {
setupStaticDestinations();
} else {
@@ -400,73 +558,6 @@ public abstract class DemandForwardingBr
}
@Override
- public void stop() throws Exception {
- if (started.compareAndSet(true, false)) {
- if (disposed.compareAndSet(false, true)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName);
- }
- NetworkBridgeListener l = this.networkBridgeListener;
- if (l != null) {
- l.onStop(this);
- }
- try {
- remoteBridgeStarted.set(false);
- final CountDownLatch sendShutdown = new CountDownLatch(1);
-
- brokerService.getTaskRunnerFactory().execute(new Runnable() {
- @Override
- public void run() {
- try {
- serialExecutor.shutdown();
- if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
- List<Runnable> pendingTasks = serialExecutor.shutdownNow();
- if (LOG.isInfoEnabled()) {
- LOG.info("pending tasks on stop" + pendingTasks);
- }
- }
- localBroker.oneway(new ShutdownInfo());
- remoteBroker.oneway(new ShutdownInfo());
- } catch (Throwable e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Caught exception sending shutdown", e);
- }
- } finally {
- sendShutdown.countDown();
- }
-
- }
- }, "ActiveMQ ForwardingBridge StopTask");
-
- if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
- LOG.info("Network Could not shutdown in a timely manner");
- }
- } finally {
- ServiceStopper ss = new ServiceStopper();
- ss.stop(remoteBroker);
- ss.stop(localBroker);
- ss.stop(duplexInboundLocalBroker);
- // Release the started Latch since another thread could be
- // stuck waiting for it to start up.
- startedLatch.countDown();
- startedLatch.countDown();
- localStartedLatch.countDown();
-
- ss.throwFirstException();
- }
- }
-
- if (remoteBrokerInfo != null) {
- brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
- brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
- if (LOG.isInfoEnabled()) {
- LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
- }
- }
- }
- }
-
- @Override
public void serviceRemoteException(Throwable error) {
if (!disposed.get()) {
if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
@@ -494,31 +585,7 @@ public abstract class DemandForwardingBr
serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
ackAdvisory(md.getMessage());
} else if (command.isBrokerInfo()) {
- lastConnectSucceeded.set(true);
- remoteBrokerInfo = (BrokerInfo) command;
- Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
- try {
- IntrospectionSupport.getProperties(configuration, props, null);
- if (configuration.getExcludedDestinations() != null) {
- excludedDestinations = configuration.getExcludedDestinations().toArray(
- new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
- }
- if (configuration.getStaticallyIncludedDestinations() != null) {
- staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
- new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
- }
- if (configuration.getDynamicallyIncludedDestinations() != null) {
- dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
- new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
- }
- } catch (Throwable t) {
- LOG.error("Error mapping remote destinations", t);
- }
- serviceRemoteBrokerInfo(command);
- // Let the local broker know the remote broker's ID.
- localBroker.oneway(command);
- // new peer broker (a consumer can work with remote broker also)
- brokerService.getBroker().addBroker(null, remoteBrokerInfo);
+ futureRemoteBrokerInfo.set((BrokerInfo) command);
} else if (command.getClass() == ConnectionError.class) {
ConnectionError ce = (ConnectionError) command;
serviceRemoteException(ce.getException());
@@ -944,8 +1011,7 @@ public abstract class DemandForwardingBr
}
}
} else if (command.isBrokerInfo()) {
- localBrokerInfo = (BrokerInfo) command;
- serviceLocalBrokerInfo(command);
+ futureLocalBrokerInfo.set((BrokerInfo) command);
} else if (command.isShutdownInfo()) {
LOG.info(configuration.getBrokerName() + " Shutting down");
stop();
@@ -967,42 +1033,6 @@ public abstract class DemandForwardingBr
}
}
- protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
- synchronized (brokerInfoMutex) {
- if (remoteBrokerId != null) {
- if (remoteBrokerId.equals(localBrokerId)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(configuration.getBrokerName() + " disconnecting local loop back connection for: " + remoteBrokerName + ", with id:"
- + remoteBrokerId);
- }
- safeWaitUntilStarted();
- ServiceSupport.dispose(this);
- }
- }
- }
- }
-
- protected void serviceRemoteBrokerInfo(Command command) throws IOException {
- synchronized (brokerInfoMutex) {
- BrokerInfo remoteBrokerInfo = (BrokerInfo) command;
- remoteBrokerId = remoteBrokerInfo.getBrokerId();
- remoteBrokerPath[0] = remoteBrokerId;
- remoteBrokerName = remoteBrokerInfo.getBrokerName();
- if (localBrokerId != null) {
- if (localBrokerId.equals(remoteBrokerId)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(configuration.getBrokerName() + " disconnecting remote loop back connection for: " + remoteBrokerName + ", with id:"
- + remoteBrokerId);
- }
- ServiceSupport.dispose(this);
- }
- }
- if (!disposed.get()) {
- triggerLocalStartBridge();
- }
- }
- }
-
private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
boolean suppress = false;
// for durable subs, suppression via filter leaves dangling acks so we
@@ -1387,7 +1417,7 @@ public abstract class DemandForwardingBr
private void fireBridgeFailed() {
NetworkBridgeListener l = this.networkBridgeListener;
- if (l != null) {
+ if (l != null && this.bridgeFailed.compareAndSet(false, true)) {
l.bridgeFailed();
}
}
@@ -1535,4 +1565,83 @@ public abstract class DemandForwardingBr
public ObjectName getMbeanObjectName() {
return mbeanObjectName;
}
+
+ /*
+ * Used to allow for async tasks to await receipt of the BrokerInfo from the local and
+ * remote sides of the network bridge.
+ */
+ private static class FutureBrokerInfo implements Future<BrokerInfo> {
+
+ private final CountDownLatch slot = new CountDownLatch(1);
+ private final AtomicBoolean disposed;
+ private BrokerInfo info = null;
+
+ public FutureBrokerInfo(BrokerInfo info, AtomicBoolean disposed) {
+ this.info = info;
+ this.disposed = disposed;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ slot.countDown();
+ return true;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return slot.getCount() == 0 && info == null;
+ }
+
+ @Override
+ public boolean isDone() {
+ return info != null;
+ }
+
+ @Override
+ public BrokerInfo get() throws InterruptedException, ExecutionException {
+ try {
+ if (info == null) {
+ while (!disposed.get()) {
+ if (slot.await(1, TimeUnit.SECONDS)) {
+ break;
+ }
+ }
+ }
+ return info;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Operation interupted: " + e, e);
+ }
+ throw new InterruptedException("Interrupted.");
+ }
+ }
+
+ @Override
+ public BrokerInfo get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ try {
+ if (info == null) {
+ long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
+
+ while (!disposed.get() || System.currentTimeMillis() < deadline) {
+ if (slot.await(1, TimeUnit.MILLISECONDS)) {
+ break;
+ }
+ }
+ if (info == null) {
+ throw new TimeoutException();
+ }
+ }
+ return info;
+ } catch (InterruptedException e) {
+ throw new InterruptedException("Interrupted.");
+ }
+ }
+
+ public void set(BrokerInfo info) {
+ this.info = info;
+ this.slot.countDown();
+ }
+ }
+
}