You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/11/14 01:46:28 UTC
svn commit: r1409045 - in /activemq/trunk:
activemq-broker/src/main/java/org/apache/activemq/network/
activemq-core/src/test/java/org/apache/activemq/
activemq-core/src/test/java/org/apache/activemq/bugs/
Author: tabish
Date: Wed Nov 14 00:46:27 2012
New Revision: 1409045
URL: http://svn.apache.org/viewvc?rev=1409045&view=rev
Log:
apply patch: https://issues.apache.org/jira/browse/AMQ-4160
Additional fixes and test updates.
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=1409045&r1=1409044&r2=1409045&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java Wed Nov 14 00:46:27 2012
@@ -227,6 +227,7 @@ public class DiscoveryNetworkConnector e
}
}
bridges.clear();
+ activeEvents.clear();
try {
this.discoveryAgent.stop();
} catch (Exception e) {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=1409045&r1=1409044&r2=1409045&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java Wed Nov 14 00:46:27 2012
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
@@ -53,6 +54,7 @@ import org.apache.activemq.broker.region
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkBridge;
import org.apache.activemq.network.NetworkConnector;
@@ -202,6 +204,68 @@ public class JmsMultipleBrokersTestSuppo
}));
}
+ /**
+ * Timed wait for {@link #hasBridge(String, String)}.
+ *
+ * @see #hasBridge(String, String)
+ *
+ * @param localBrokerName
+ * - the name of the broker on the "local" side of the bridge
+ * @param remoteBrokerName
+ * - the name of the broker on the "remote" side of the bridge
+ * @param time
+ * - the maximum time to wait for the bridge to be established
+ * @param units
+ * - the units for <param>time</param>
+ * @throws InterruptedException
+ * - if the calling thread is interrupted
+ * @throws TimeoutException
+ * - if the bridge is not established within the time limit
+ * @throws Exception
+ * - some other unknown error occurs
+ */
+ protected void waitForBridge(final String localBrokerName,
+ final String remoteBrokerName, long time, TimeUnit units)
+ throws InterruptedException, TimeoutException, Exception {
+ if (!Wait.waitFor(new Wait.Condition() {
+ public boolean isSatisified() {
+ return hasBridge(localBrokerName, remoteBrokerName);
+ }
+ }, units.toMillis(time))) {
+ throw new TimeoutException("Bridge not established from broker "
+ + localBrokerName + " to " + remoteBrokerName + " within "
+ + units.toMillis(time) + " milliseconds.");
+ }
+ }
+
+ /**
+ * Determines whether a bridge has been established between the specified
+ * brokers.Establishment means that connections have been created and broker
+ * info has been exchanged. Due to the asynchronous nature of the
+ * connections, there is still a possibility that the bridge may fail
+ * shortly after establishment.
+ *
+ * @param localBrokerName
+ * - the name of the broker on the "local" side of the bridge
+ * @param remoteBrokerName
+ * - the name of the broker on the "remote" side of the bridge
+ */
+ protected boolean hasBridge(String localBrokerName, String remoteBrokerName) {
+ final BrokerItem fromBroker = brokers.get(localBrokerName);
+ if (fromBroker == null) {
+ throw new IllegalArgumentException("Unknown broker: "
+ + localBrokerName);
+ }
+
+ for (BrokerInfo peerInfo : fromBroker.broker.getRegionBroker()
+ .getPeerBrokerInfos()) {
+ if (peerInfo.getBrokerName().equals(remoteBrokerName)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
protected void waitForBridgeFormation() throws Exception {
waitForBridgeFormation(1);
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java?rev=1409045&r1=1409044&r2=1409045&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java Wed Nov 14 00:46:27 2012
@@ -35,6 +35,7 @@ import org.apache.activemq.command.Disco
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkBridge;
import org.apache.activemq.network.NetworkBridgeListener;
+import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.discovery.DiscoveryAgent;
@@ -49,6 +50,18 @@ import org.junit.Assert;
* being reported as active.
*/
public class AMQ4160Test extends JmsMultipleBrokersTestSupport {
+ final long MAX_TEST_TIME = TimeUnit.MINUTES.toMillis(2);
+
+ /**
+ * Since these tests involve wait conditions, protect against indefinite
+ * waits (due to unanticipated issues).
+ */
+ public void setUp() throws Exception {
+ setAutoFail(true);
+ setMaxTestTime(MAX_TEST_TIME);
+ super.setUp();
+ }
+
/**
* This test demonstrates how concurrent attempts to establish a bridge to
* the same remote broker are allowed to occur. Connection uniqueness will
@@ -57,7 +70,9 @@ public class AMQ4160Test extends JmsMult
* {@link DiscoveryNetworkConnector#activeBridges()} that represents the
* successful first bridge creation attempt.
*/
- public void x_testLostActiveBridge() throws Exception {
+ public void testLostActiveBridge() throws Exception {
+ final long ATTEMPT_TO_CREATE_DELAY = TimeUnit.SECONDS.toMillis(15);
+
// Start two brokers with a bridge from broker1 to broker2.
BrokerService broker1 = createBroker(new URI(
"broker:(vm://broker1)/broker1?persistent=false"));
@@ -87,21 +102,41 @@ public class AMQ4160Test extends JmsMult
// Start a bridge from broker1 to broker2. The discovery agent attempts
// to create the bridge concurrently with two threads, and the
- // synchronization in createBridge ensures that both threads actually
- // attempt to start bridges.
+ // synchronization in createBridge ensures that pre-patch both threads
+ // actually attempt to start bridges. Post-patch, only one thread is
+ // allowed to start the bridge.
+ final CountDownLatch attemptLatch = new CountDownLatch(2);
final CountDownLatch createLatch = new CountDownLatch(2);
DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector() {
@Override
+ public void onServiceAdd(DiscoveryEvent event) {
+ // Pre-and-post patch, two threads attempt to establish a bridge
+ // to the same remote broker.
+ attemptLatch.countDown();
+ super.onServiceAdd(event);
+ }
+
+ @Override
protected NetworkBridge createBridge(Transport localTransport,
Transport remoteTransport, final DiscoveryEvent event) {
- createLatch.countDown();
+ // Pre-patch, the two threads are allowed to create the bridge.
+ // Post-patch, only the first thread is allowed. Wait a
+ // reasonable delay once both attempts are detected to allow
+ // the two bridge creations to occur concurrently (pre-patch).
+ // Post-patch, the wait will timeout and allow the first (and
+ // only) bridge creation to occur.
try {
- createLatch.await();
+ attemptLatch.await();
+ createLatch.countDown();
+ createLatch.await(ATTEMPT_TO_CREATE_DELAY,
+ TimeUnit.MILLISECONDS);
+ return super.createBridge(localTransport, remoteTransport,
+ event);
} catch (InterruptedException e) {
+ Thread.interrupted();
+ return null;
}
- return super.createBridge(localTransport, remoteTransport,
- event);
}
};
@@ -151,12 +186,16 @@ public class AMQ4160Test extends JmsMult
broker1.addNetworkConnector(nc);
nc.start();
- // The bridge should be formed by the second creation attempt, but the
- // wait will time out because the active bridge entry from the second
- // (successful) bridge creation attempt is removed by the first
- // (unsuccessful) bridge creation attempt.
- waitForBridgeFormation();
-
+ // Wait for the bridge to be formed by the first attempt.
+ waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(),
+ MAX_TEST_TIME, TimeUnit.MILLISECONDS);
+
+ // Pre-patch, the second bridge creation attempt fails and removes the
+ // first (successful) bridge creation attempt from the
+ // list of active bridges. Post-patch, the second bridge creation
+ // attempt is prevented, so the first bridge creation attempt
+ // remains "active". This assertion is expected to fail pre-patch and
+ // pass post-patch.
Assert.assertFalse(nc.activeBridges().isEmpty());
}
@@ -164,7 +203,7 @@ public class AMQ4160Test extends JmsMult
* This test demonstrates a race condition where a failed bridge can be
* removed from the list of active bridges in
* {@link DiscoveryNetworkConnector} before it has been added. Eventually,
- * the failed bridge is added, but never removed, which prevents subsequent
+ * the failed bridge is added, but never removed, which causes subsequent
* bridge creation attempts to be ignored. The result is a network connector
* that thinks it has an active bridge, when in fact it doesn't.
*/
@@ -306,4 +345,40 @@ public class AMQ4160Test extends JmsMult
// Therefore, this wait will time out and cause the test to fail.
Assert.assertTrue(attemptLatch.await(30, TimeUnit.SECONDS));
}
+
+ /**
+ * This test verifies that when a network connector is restarted, any
+ * bridges that were active at the time of the stop are allowed to be
+ * re-established (i.e., the "active events" data structure in
+ * {@link DiscoveryNetworkConnector} is reset.
+ */
+ public void testAllowAttemptsAfterRestart() throws Exception {
+ final long STOP_DELAY = TimeUnit.SECONDS.toMillis(10);
+
+ // Start two brokers with a bridge from broker1 to broker2.
+ BrokerService broker1 = createBroker(new URI(
+ "broker:(vm://broker1)/broker1?persistent=false"));
+ final BrokerService broker2 = createBroker(new URI(
+ "broker:(vm://broker2)/broker2?persistent=false"));
+
+ startAllBrokers();
+
+ // Start a bridge from broker1 to broker2.
+ NetworkConnector nc = bridgeBrokers(broker1.getBrokerName(),
+ broker2.getBrokerName());
+ nc.start();
+
+ waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(),
+ MAX_TEST_TIME, TimeUnit.MILLISECONDS);
+
+ // Restart the network connector and verify that the bridge is
+ // re-established. The pause between start/stop is to account for the
+ // asynchronous closure.
+ nc.stop();
+ Thread.sleep(STOP_DELAY);
+ nc.start();
+
+ waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(),
+ MAX_TEST_TIME, TimeUnit.MILLISECONDS);
+ }
}