You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/01/30 18:15:05 UTC

svn commit: r1440531 - /activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Author: tabish
Date: Wed Jan 30 17:15:05 2013
New Revision: 1440531

URL: http://svn.apache.org/viewvc?rev=1440531&view=rev
Log:
apply patch for: https://issues.apache.org/jira/browse/AMQ-4082

Reduces async work that's don't during network bridge startup. 

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1440531&r1=1440530&r2=1440531&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Wed Jan 30 17:15:05 2013
@@ -25,9 +25,12 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -112,7 +115,8 @@ public abstract class DemandForwardingBr
     protected int demandConsumerDispatched;
     protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
     protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
-    protected AtomicBoolean disposed = new AtomicBoolean();
+    protected final AtomicBoolean bridgeFailed = new AtomicBoolean();
+    protected final AtomicBoolean disposed = new AtomicBoolean();
     protected BrokerId localBrokerId;
     protected ActiveMQDestination[] excludedDestinations;
     protected ActiveMQDestination[] dynamicallyIncludedDestinations;
@@ -128,7 +132,6 @@ public abstract class DemandForwardingBr
     protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
 
     protected final BrokerId remoteBrokerPath[] = new BrokerId[] { null };
-    protected Object brokerInfoMutex = new Object();
     protected BrokerId remoteBrokerId;
 
     final AtomicLong enqueueCounter = new AtomicLong();
@@ -139,6 +142,9 @@ public abstract class DemandForwardingBr
     private BrokerInfo localBrokerInfo;
     private BrokerInfo remoteBrokerInfo;
 
+    private final FutureBrokerInfo futureRemoteBrokerInfo = new FutureBrokerInfo(remoteBrokerInfo, disposed);
+    private final FutureBrokerInfo futureLocalBrokerInfo = new FutureBrokerInfo(localBrokerInfo, disposed);
+
     private final AtomicBoolean started = new AtomicBoolean();
     private TransportConnection duplexInitiatingConnection;
     private BrokerService brokerService = null;
@@ -197,9 +203,14 @@ public abstract class DemandForwardingBr
 
                 @Override
                 public void onException(IOException error) {
+                    if (!futureLocalBrokerInfo.isDone()) {
+                        futureLocalBrokerInfo.cancel(true);
+                        return;
+                    }
                     serviceLocalException(error);
                 }
             });
+
             remoteBroker.setTransportListener(new DefaultTransportListener() {
 
                 @Override
@@ -210,16 +221,20 @@ public abstract class DemandForwardingBr
 
                 @Override
                 public void onException(IOException error) {
+                    if (!futureRemoteBrokerInfo.isDone()) {
+                        futureRemoteBrokerInfo.cancel(true);
+                        return;
+                    }
                     serviceRemoteException(error);
                 }
-
             });
 
-            localBroker.start();
             remoteBroker.start();
+            localBroker.start();
+
             if (!disposed.get()) {
                 try {
-                    triggerRemoteStartBridge();
+                    triggerStartAsyncNetworkBridgeCreation();
                 } catch (IOException e) {
                     LOG.warn("Caught exception from remote start", e);
                 }
@@ -230,33 +245,92 @@ public abstract class DemandForwardingBr
         }
     }
 
-    protected void triggerLocalStartBridge() throws IOException {
-        brokerService.getTaskRunnerFactory().execute(new Runnable() {
-            @Override
-            public void run() {
-                final String originalName = Thread.currentThread().getName();
-                Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker);
+    @Override
+    public void stop() throws Exception {
+        if (started.compareAndSet(true, false)) {
+            if (disposed.compareAndSet(false, true)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName);
+                }
+
+                futureRemoteBrokerInfo.cancel(true);
+                futureLocalBrokerInfo.cancel(true);
+
+                NetworkBridgeListener l = this.networkBridgeListener;
+                if (l != null) {
+                    l.onStop(this);
+                }
                 try {
-                    startLocalBridge();
-                } catch (Throwable e) {
-                    serviceLocalException(e);
+                    remoteBridgeStarted.set(false);
+                    final CountDownLatch sendShutdown = new CountDownLatch(1);
+
+                    brokerService.getTaskRunnerFactory().execute(new Runnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                serialExecutor.shutdown();
+                                if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+                                    List<Runnable> pendingTasks = serialExecutor.shutdownNow();
+                                    if (LOG.isInfoEnabled()) {
+                                        LOG.info("pending tasks on stop" + pendingTasks);
+                                    }
+                                }
+                                localBroker.oneway(new ShutdownInfo());
+                                remoteBroker.oneway(new ShutdownInfo());
+                            } catch (Throwable e) {
+                                if (LOG.isDebugEnabled()) {
+                                    LOG.debug("Caught exception sending shutdown", e);
+                                }
+                            } finally {
+                                sendShutdown.countDown();
+                            }
+
+                        }
+                    }, "ActiveMQ ForwardingBridge StopTask");
+
+                    if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
+                        LOG.info("Network Could not shutdown in a timely manner");
+                    }
                 } finally {
-                    Thread.currentThread().setName(originalName);
+                    ServiceStopper ss = new ServiceStopper();
+                    ss.stop(remoteBroker);
+                    ss.stop(localBroker);
+                    ss.stop(duplexInboundLocalBroker);
+                    // Release the started Latch since another thread could be
+                    // stuck waiting for it to start up.
+                    startedLatch.countDown();
+                    startedLatch.countDown();
+                    localStartedLatch.countDown();
+
+                    ss.throwFirstException();
                 }
             }
-        });
+
+            if (remoteBrokerInfo != null) {
+                brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
+                brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
+                if (LOG.isInfoEnabled()) {
+                    LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
+                }
+            }
+        }
     }
 
-    protected void triggerRemoteStartBridge() throws IOException {
+    protected void triggerStartAsyncNetworkBridgeCreation() throws IOException {
         brokerService.getTaskRunnerFactory().execute(new Runnable() {
             @Override
             public void run() {
                 final String originalName = Thread.currentThread().getName();
-                Thread.currentThread().setName("StartRemoteBridge: remoteBroker=" + remoteBroker);
+                Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " +
+                    "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker);
+
                 try {
-                    startRemoteBridge();
-                } catch (Exception e) {
-                    serviceRemoteException(e);
+                    // First we collect the info data from both the local and remote ends
+                    collectBrokerInfos();
+
+                    // Once we have all required broker info we can attempt to start
+                    // the local and then remote sides of the bridge.
+                    doStartLocalAndRemoteBridges();
                 } finally {
                     Thread.currentThread().setName(originalName);
                 }
@@ -264,6 +338,92 @@ public abstract class DemandForwardingBr
         });
     }
 
+    private void collectBrokerInfos() {
+
+        // First wait for the remote to feed us its BrokerInfo, then we can check on
+        // the LocalBrokerInfo and decide is this is a loop.
+        try {
+            remoteBrokerInfo = futureRemoteBrokerInfo.get();
+            if (remoteBrokerInfo == null) {
+                fireBridgeFailed();
+            }
+        } catch (Exception e) {
+            serviceRemoteException(e);
+            return;
+        }
+
+        try {
+            localBrokerInfo = futureLocalBrokerInfo.get();
+            if (localBrokerInfo == null) {
+                fireBridgeFailed();
+            }
+
+            // Before we try and build the bridge lets check if we are in a loop
+            // and if so just stop now before registering anything.
+            if (localBrokerId.equals(remoteBrokerId)) {
+                 if (LOG.isTraceEnabled()) {
+                     LOG.trace(configuration.getBrokerName() +
+                         " disconnecting remote loop back connection for: " +
+                         remoteBrokerName + ", with id:" + remoteBrokerId);
+                 }
+                 ServiceSupport.dispose(localBroker);
+                 ServiceSupport.dispose(remoteBroker);
+                 return;
+            }
+
+            // Fill in the remote broker's information now.
+            remoteBrokerId = remoteBrokerInfo.getBrokerId();
+            remoteBrokerPath[0] = remoteBrokerId;
+            remoteBrokerName = remoteBrokerInfo.getBrokerName();
+        } catch (Throwable e) {
+            serviceLocalException(e);
+        }
+    }
+
+    private void doStartLocalAndRemoteBridges() {
+        try {
+            startLocalBridge();
+        } catch (Throwable e) {
+            serviceLocalException(e);
+            return;
+        }
+
+        try {
+
+            if (disposed.get()) {
+                return;
+            }
+
+            Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
+            try {
+                IntrospectionSupport.getProperties(configuration, props, null);
+                if (configuration.getExcludedDestinations() != null) {
+                    excludedDestinations = configuration.getExcludedDestinations().toArray(
+                        new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
+                }
+                if (configuration.getStaticallyIncludedDestinations() != null) {
+                    staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
+                        new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
+                }
+                if (configuration.getDynamicallyIncludedDestinations() != null) {
+                    dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
+                        new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
+                }
+            } catch (Throwable t) {
+                LOG.error("Error mapping remote destinations", t);
+            }
+
+            // Let the local broker know the remote broker's ID.
+            localBroker.oneway(remoteBrokerInfo);
+            // new peer broker (a consumer can work with remote broker also)
+            brokerService.getBroker().addBroker(null, remoteBrokerInfo);
+
+            startRemoteBridge();
+        } catch (Throwable e) {
+            serviceRemoteException(e);
+        }
+    }
+
     private void startLocalBridge() throws Throwable {
         if (localBridgeStarted.compareAndSet(false, true)) {
             synchronized (this) {
@@ -334,8 +494,6 @@ public abstract class DemandForwardingBr
                 localStartedLatch.countDown();
             }
 
-            safeWaitUntilStarted();
-
             if (!disposed.get()) {
                 setupStaticDestinations();
             } else {
@@ -400,73 +558,6 @@ public abstract class DemandForwardingBr
     }
 
     @Override
-    public void stop() throws Exception {
-        if (started.compareAndSet(true, false)) {
-            if (disposed.compareAndSet(false, true)) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName);
-                }
-                NetworkBridgeListener l = this.networkBridgeListener;
-                if (l != null) {
-                    l.onStop(this);
-                }
-                try {
-                    remoteBridgeStarted.set(false);
-                    final CountDownLatch sendShutdown = new CountDownLatch(1);
-
-                    brokerService.getTaskRunnerFactory().execute(new Runnable() {
-                        @Override
-                        public void run() {
-                            try {
-                                serialExecutor.shutdown();
-                                if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
-                                    List<Runnable> pendingTasks = serialExecutor.shutdownNow();
-                                    if (LOG.isInfoEnabled()) {
-                                        LOG.info("pending tasks on stop" + pendingTasks);
-                                    }
-                                }
-                                localBroker.oneway(new ShutdownInfo());
-                                remoteBroker.oneway(new ShutdownInfo());
-                            } catch (Throwable e) {
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug("Caught exception sending shutdown", e);
-                                }
-                            } finally {
-                                sendShutdown.countDown();
-                            }
-
-                        }
-                    }, "ActiveMQ ForwardingBridge StopTask");
-
-                    if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
-                        LOG.info("Network Could not shutdown in a timely manner");
-                    }
-                } finally {
-                    ServiceStopper ss = new ServiceStopper();
-                    ss.stop(remoteBroker);
-                    ss.stop(localBroker);
-                    ss.stop(duplexInboundLocalBroker);
-                    // Release the started Latch since another thread could be
-                    // stuck waiting for it to start up.
-                    startedLatch.countDown();
-                    startedLatch.countDown();
-                    localStartedLatch.countDown();
-
-                    ss.throwFirstException();
-                }
-            }
-
-            if (remoteBrokerInfo != null) {
-                brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
-                brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
-                if (LOG.isInfoEnabled()) {
-                    LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
-                }
-            }
-        }
-    }
-
-    @Override
     public void serviceRemoteException(Throwable error) {
         if (!disposed.get()) {
             if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
@@ -494,31 +585,7 @@ public abstract class DemandForwardingBr
                     serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
                     ackAdvisory(md.getMessage());
                 } else if (command.isBrokerInfo()) {
-                    lastConnectSucceeded.set(true);
-                    remoteBrokerInfo = (BrokerInfo) command;
-                    Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
-                    try {
-                        IntrospectionSupport.getProperties(configuration, props, null);
-                        if (configuration.getExcludedDestinations() != null) {
-                            excludedDestinations = configuration.getExcludedDestinations().toArray(
-                                new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
-                        }
-                        if (configuration.getStaticallyIncludedDestinations() != null) {
-                            staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
-                                new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
-                        }
-                        if (configuration.getDynamicallyIncludedDestinations() != null) {
-                            dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
-                                new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
-                        }
-                    } catch (Throwable t) {
-                        LOG.error("Error mapping remote destinations", t);
-                    }
-                    serviceRemoteBrokerInfo(command);
-                    // Let the local broker know the remote broker's ID.
-                    localBroker.oneway(command);
-                    // new peer broker (a consumer can work with remote broker also)
-                    brokerService.getBroker().addBroker(null, remoteBrokerInfo);
+                    futureRemoteBrokerInfo.set((BrokerInfo) command);
                 } else if (command.getClass() == ConnectionError.class) {
                     ConnectionError ce = (ConnectionError) command;
                     serviceRemoteException(ce.getException());
@@ -944,8 +1011,7 @@ public abstract class DemandForwardingBr
                         }
                     }
                 } else if (command.isBrokerInfo()) {
-                    localBrokerInfo = (BrokerInfo) command;
-                    serviceLocalBrokerInfo(command);
+                    futureLocalBrokerInfo.set((BrokerInfo) command);
                 } else if (command.isShutdownInfo()) {
                     LOG.info(configuration.getBrokerName() + " Shutting down");
                     stop();
@@ -967,42 +1033,6 @@ public abstract class DemandForwardingBr
         }
     }
 
-    protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
-        synchronized (brokerInfoMutex) {
-            if (remoteBrokerId != null) {
-                if (remoteBrokerId.equals(localBrokerId)) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace(configuration.getBrokerName() + " disconnecting local loop back connection for: " + remoteBrokerName + ", with id:"
-                            + remoteBrokerId);
-                    }
-                    safeWaitUntilStarted();
-                    ServiceSupport.dispose(this);
-                }
-            }
-        }
-    }
-
-    protected void serviceRemoteBrokerInfo(Command command) throws IOException {
-        synchronized (brokerInfoMutex) {
-            BrokerInfo remoteBrokerInfo = (BrokerInfo) command;
-            remoteBrokerId = remoteBrokerInfo.getBrokerId();
-            remoteBrokerPath[0] = remoteBrokerId;
-            remoteBrokerName = remoteBrokerInfo.getBrokerName();
-            if (localBrokerId != null) {
-                if (localBrokerId.equals(remoteBrokerId)) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace(configuration.getBrokerName() + " disconnecting remote loop back connection for: " + remoteBrokerName + ", with id:"
-                            + remoteBrokerId);
-                    }
-                    ServiceSupport.dispose(this);
-                }
-            }
-            if (!disposed.get()) {
-                triggerLocalStartBridge();
-            }
-        }
-    }
-
     private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
         boolean suppress = false;
         // for durable subs, suppression via filter leaves dangling acks so we
@@ -1387,7 +1417,7 @@ public abstract class DemandForwardingBr
 
     private void fireBridgeFailed() {
         NetworkBridgeListener l = this.networkBridgeListener;
-        if (l != null) {
+        if (l != null && this.bridgeFailed.compareAndSet(false, true)) {
             l.bridgeFailed();
         }
     }
@@ -1535,4 +1565,83 @@ public abstract class DemandForwardingBr
     public ObjectName getMbeanObjectName() {
         return mbeanObjectName;
     }
+
+    /*
+     * Used to allow for async tasks to await receipt of the BrokerInfo from the local and
+     * remote sides of the network bridge.
+     */
+    private static class FutureBrokerInfo implements Future<BrokerInfo> {
+
+        private final CountDownLatch slot = new CountDownLatch(1);
+        private final AtomicBoolean disposed;
+        private BrokerInfo info = null;
+
+        public FutureBrokerInfo(BrokerInfo info, AtomicBoolean disposed) {
+            this.info = info;
+            this.disposed = disposed;
+        }
+
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            slot.countDown();
+            return true;
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return slot.getCount() == 0 && info == null;
+        }
+
+        @Override
+        public boolean isDone() {
+            return info != null;
+        }
+
+        @Override
+        public BrokerInfo get() throws InterruptedException, ExecutionException {
+            try {
+                if (info == null) {
+                    while (!disposed.get()) {
+                        if (slot.await(1, TimeUnit.SECONDS)) {
+                            break;
+                        }
+                    }
+                }
+                return info;
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Operation interupted: " + e, e);
+                }
+                throw new InterruptedException("Interrupted.");
+            }
+        }
+
+        @Override
+        public BrokerInfo get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+            try {
+                if (info == null) {
+                    long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
+
+                    while (!disposed.get() || System.currentTimeMillis() < deadline) {
+                        if (slot.await(1, TimeUnit.MILLISECONDS)) {
+                            break;
+                        }
+                    }
+                    if (info == null) {
+                        throw new TimeoutException();
+                    }
+                }
+                return info;
+            } catch (InterruptedException e) {
+                throw new InterruptedException("Interrupted.");
+            }
+        }
+
+        public void set(BrokerInfo info) {
+            this.info = info;
+            this.slot.countDown();
+        }
+    }
+
 }