You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/11/14 01:46:28 UTC

svn commit: r1409045 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/network/ activemq-core/src/test/java/org/apache/activemq/ activemq-core/src/test/java/org/apache/activemq/bugs/

Author: tabish
Date: Wed Nov 14 00:46:27 2012
New Revision: 1409045

URL: http://svn.apache.org/viewvc?rev=1409045&view=rev
Log:
apply patch: https://issues.apache.org/jira/browse/AMQ-4160

Additional fixes and test updates.

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=1409045&r1=1409044&r2=1409045&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java Wed Nov 14 00:46:27 2012
@@ -227,6 +227,7 @@ public class DiscoveryNetworkConnector e
             }
         }
         bridges.clear();
+        activeEvents.clear();
         try {
             this.discoveryAgent.stop();
         } catch (Exception e) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=1409045&r1=1409044&r2=1409045&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java Wed Nov 14 00:46:27 2012
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.Connection;
@@ -53,6 +54,7 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.network.DiscoveryNetworkConnector;
 import org.apache.activemq.network.NetworkBridge;
 import org.apache.activemq.network.NetworkConnector;
@@ -202,6 +204,68 @@ public class JmsMultipleBrokersTestSuppo
         }));
     }
 
+    /**
+     * Timed wait for {@link #hasBridge(String, String)}.
+     * 
+     * @see #hasBridge(String, String)
+     * 
+     * @param localBrokerName
+     *            - the name of the broker on the "local" side of the bridge
+     * @param remoteBrokerName
+     *            - the name of the broker on the "remote" side of the bridge
+     * @param time
+     *            - the maximum time to wait for the bridge to be established
+     * @param units
+     *            - the units for <param>time</param>
+     * @throws InterruptedException
+     *             - if the calling thread is interrupted
+     * @throws TimeoutException
+     *             - if the bridge is not established within the time limit
+     * @throws Exception
+     *             - some other unknown error occurs
+     */
+    protected void waitForBridge(final String localBrokerName,
+            final String remoteBrokerName, long time, TimeUnit units)
+            throws InterruptedException, TimeoutException, Exception {
+        if (!Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() {
+                return hasBridge(localBrokerName, remoteBrokerName);
+            }
+        }, units.toMillis(time))) {
+            throw new TimeoutException("Bridge not established from broker "
+                    + localBrokerName + " to " + remoteBrokerName + " within "
+                    + units.toMillis(time) + " milliseconds.");
+        }
+    }
+
+    /**
+     * Determines whether a bridge has been established between the specified
+     * brokers.Establishment means that connections have been created and broker
+     * info has been exchanged. Due to the asynchronous nature of the
+     * connections, there is still a possibility that the bridge may fail
+     * shortly after establishment.
+     * 
+     * @param localBrokerName
+     *            - the name of the broker on the "local" side of the bridge
+     * @param remoteBrokerName
+     *            - the name of the broker on the "remote" side of the bridge
+     */
+    protected boolean hasBridge(String localBrokerName, String remoteBrokerName) {
+        final BrokerItem fromBroker = brokers.get(localBrokerName);
+        if (fromBroker == null) {
+            throw new IllegalArgumentException("Unknown broker: "
+                    + localBrokerName);
+        }
+
+        for (BrokerInfo peerInfo : fromBroker.broker.getRegionBroker()
+                .getPeerBrokerInfos()) {
+            if (peerInfo.getBrokerName().equals(remoteBrokerName)) {
+                return true;
+            }
+        }
+        return false;
+    }
+    
     protected void waitForBridgeFormation() throws Exception {
         waitForBridgeFormation(1);
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java?rev=1409045&r1=1409044&r2=1409045&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java Wed Nov 14 00:46:27 2012
@@ -35,6 +35,7 @@ import org.apache.activemq.command.Disco
 import org.apache.activemq.network.DiscoveryNetworkConnector;
 import org.apache.activemq.network.NetworkBridge;
 import org.apache.activemq.network.NetworkBridgeListener;
+import org.apache.activemq.network.NetworkConnector;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.discovery.DiscoveryAgent;
@@ -49,6 +50,18 @@ import org.junit.Assert;
  * being reported as active.
  */
 public class AMQ4160Test extends JmsMultipleBrokersTestSupport {
+    final long MAX_TEST_TIME = TimeUnit.MINUTES.toMillis(2);
+
+    /**
+     * Since these tests involve wait conditions, protect against indefinite
+     * waits (due to unanticipated issues).
+     */
+    public void setUp() throws Exception {
+        setAutoFail(true);
+        setMaxTestTime(MAX_TEST_TIME);
+        super.setUp();
+    }
+
     /**
      * This test demonstrates how concurrent attempts to establish a bridge to
      * the same remote broker are allowed to occur. Connection uniqueness will
@@ -57,7 +70,9 @@ public class AMQ4160Test extends JmsMult
      * {@link DiscoveryNetworkConnector#activeBridges()} that represents the
      * successful first bridge creation attempt.
      */
-    public void x_testLostActiveBridge() throws Exception {
+    public void testLostActiveBridge() throws Exception {
+        final long ATTEMPT_TO_CREATE_DELAY = TimeUnit.SECONDS.toMillis(15);
+
         // Start two brokers with a bridge from broker1 to broker2.
         BrokerService broker1 = createBroker(new URI(
                 "broker:(vm://broker1)/broker1?persistent=false"));
@@ -87,21 +102,41 @@ public class AMQ4160Test extends JmsMult
 
         // Start a bridge from broker1 to broker2. The discovery agent attempts
         // to create the bridge concurrently with two threads, and the
-        // synchronization in createBridge ensures that both threads actually
-        // attempt to start bridges.
+        // synchronization in createBridge ensures that pre-patch both threads
+        // actually attempt to start bridges. Post-patch, only one thread is
+        // allowed to start the bridge.
+        final CountDownLatch attemptLatch = new CountDownLatch(2);
         final CountDownLatch createLatch = new CountDownLatch(2);
 
         DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector() {
             @Override
+            public void onServiceAdd(DiscoveryEvent event) {
+                // Pre-and-post patch, two threads attempt to establish a bridge
+                // to the same remote broker.
+                attemptLatch.countDown();
+                super.onServiceAdd(event);
+            }
+
+            @Override
             protected NetworkBridge createBridge(Transport localTransport,
                     Transport remoteTransport, final DiscoveryEvent event) {
-                createLatch.countDown();
+                // Pre-patch, the two threads are allowed to create the bridge.
+                // Post-patch, only the first thread is allowed. Wait a
+                // reasonable delay once both attempts are detected to allow
+                // the two bridge creations to occur concurrently (pre-patch).
+                // Post-patch, the wait will timeout and allow the first (and
+                // only) bridge creation to occur.
                 try {
-                    createLatch.await();
+                    attemptLatch.await();
+                    createLatch.countDown();
+                    createLatch.await(ATTEMPT_TO_CREATE_DELAY,
+                            TimeUnit.MILLISECONDS);
+                    return super.createBridge(localTransport, remoteTransport,
+                            event);
                 } catch (InterruptedException e) {
+                    Thread.interrupted();
+                    return null;
                 }
-                return super.createBridge(localTransport, remoteTransport,
-                        event);
             }
         };
 
@@ -151,12 +186,16 @@ public class AMQ4160Test extends JmsMult
         broker1.addNetworkConnector(nc);
         nc.start();
 
-        // The bridge should be formed by the second creation attempt, but the
-        // wait will time out because the active bridge entry from the second
-        // (successful) bridge creation attempt is removed by the first
-        // (unsuccessful) bridge creation attempt.
-        waitForBridgeFormation();
-
+        // Wait for the bridge to be formed by the first attempt.
+        waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(),
+                MAX_TEST_TIME, TimeUnit.MILLISECONDS);
+
+        // Pre-patch, the second bridge creation attempt fails and removes the
+        // first (successful) bridge creation attempt from the
+        // list of active bridges. Post-patch, the second bridge creation
+        // attempt is prevented, so the first bridge creation attempt
+        // remains "active". This assertion is expected to fail pre-patch and
+        // pass post-patch.
         Assert.assertFalse(nc.activeBridges().isEmpty());
     }
 
@@ -164,7 +203,7 @@ public class AMQ4160Test extends JmsMult
      * This test demonstrates a race condition where a failed bridge can be
      * removed from the list of active bridges in
      * {@link DiscoveryNetworkConnector} before it has been added. Eventually,
-     * the failed bridge is added, but never removed, which prevents subsequent
+     * the failed bridge is added, but never removed, which causes subsequent
      * bridge creation attempts to be ignored. The result is a network connector
      * that thinks it has an active bridge, when in fact it doesn't.
      */
@@ -306,4 +345,40 @@ public class AMQ4160Test extends JmsMult
         // Therefore, this wait will time out and cause the test to fail.
         Assert.assertTrue(attemptLatch.await(30, TimeUnit.SECONDS));
     }
+
+    /**
+     * This test verifies that when a network connector is restarted, any
+     * bridges that were active at the time of the stop are allowed to be
+     * re-established (i.e., the "active events" data structure in
+     * {@link DiscoveryNetworkConnector} is reset.
+     */
+    public void testAllowAttemptsAfterRestart() throws Exception {
+        final long STOP_DELAY = TimeUnit.SECONDS.toMillis(10);
+
+        // Start two brokers with a bridge from broker1 to broker2.
+        BrokerService broker1 = createBroker(new URI(
+                "broker:(vm://broker1)/broker1?persistent=false"));
+        final BrokerService broker2 = createBroker(new URI(
+                "broker:(vm://broker2)/broker2?persistent=false"));
+
+        startAllBrokers();
+
+        // Start a bridge from broker1 to broker2.
+        NetworkConnector nc = bridgeBrokers(broker1.getBrokerName(),
+                broker2.getBrokerName());
+        nc.start();
+
+        waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(),
+                MAX_TEST_TIME, TimeUnit.MILLISECONDS);
+
+        // Restart the network connector and verify that the bridge is
+        // re-established. The pause between start/stop is to account for the
+        // asynchronous closure.
+        nc.stop();
+        Thread.sleep(STOP_DELAY);
+        nc.start();
+
+        waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(),
+                MAX_TEST_TIME, TimeUnit.MILLISECONDS);
+    }
 }