You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/11/05 16:30:56 UTC
svn commit: r1405836 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/command/DiscoveryEvent.java
main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
test/java/org/apache/activemq/bugs/AMQ4159Test.java
Author: tabish
Date: Mon Nov 5 15:30:55 2012
New Revision: 1405836
URL: http://svn.apache.org/viewvc?rev=1405836&view=rev
Log:
Apply patch for: https://issues.apache.org/jira/browse/AMQ-4159
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4159Test.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/DiscoveryEvent.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/DiscoveryEvent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/DiscoveryEvent.java?rev=1405836&r1=1405835&r2=1405836&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/DiscoveryEvent.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/DiscoveryEvent.java Mon Nov 5 15:30:55 2012
@@ -36,6 +36,11 @@ public class DiscoveryEvent implements D
this.serviceName = serviceName;
}
+ protected DiscoveryEvent(DiscoveryEvent copy) {
+ serviceName = copy.serviceName;
+ brokerName = copy.brokerName;
+ }
+
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java?rev=1405836&r1=1405835&r2=1405836&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java Mon Nov 5 15:30:55 2012
@@ -59,6 +59,14 @@ public class SimpleDiscoveryAgent implem
super(service);
}
+ public SimpleDiscoveryEvent(SimpleDiscoveryEvent copy) {
+ super(copy);
+ connectFailures = copy.connectFailures;
+ reconnectDelay = copy.reconnectDelay;
+ connectTime = copy.connectTime;
+ failed.set(copy.failed.get());
+ }
+
@Override
public String toString() {
return "[" + serviceName + ", failed:" + failed + ", connectionFailures:" + connectFailures + "]";
@@ -115,16 +123,16 @@ public class SimpleDiscoveryAgent implem
public void serviceFailed(DiscoveryEvent devent) throws IOException {
- final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
- if (event.failed.compareAndSet(false, true)) {
+ final SimpleDiscoveryEvent sevent = (SimpleDiscoveryEvent)devent;
+ if (sevent.failed.compareAndSet(false, true)) {
- listener.onServiceRemove(event);
+ listener.onServiceRemove(sevent);
taskRunner.execute(new Runnable() {
public void run() {
-
+ SimpleDiscoveryEvent event = new SimpleDiscoveryEvent(sevent);
+
// We detect a failed connection attempt because the service
- // fails right
- // away.
+ // fails right away.
if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
LOG.debug("Failure occurred soon after the discovery event was generated. It will be classified as a connection failure: "+event);
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4159Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4159Test.java?rev=1405836&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4159Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4159Test.java Mon Nov 5 15:30:55 2012
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.DiscoveryEvent;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
+import org.junit.Assert;
+
+/**
+ * This unit test demonstrates a bug in {@link SimpleDiscoveryAgent} that
+ * results from a lack of thread safety when handling bridge failures. This bug
+ * allows a single discovery event (which should result in a single bridge
+ * connection attempt) to multiply into several concurrent bridge connection
+ * attempts, which can lead to additional bugs (see related) due to a lack of
+ * thread safety in {@link DiscoveryNetworkConnector}.
+ */
+public class AMQ4159Test extends JmsMultipleBrokersTestSupport {
+ /**
+ * This test is expected to pass since the reconnect delay preserves the
+ * discovery event's failed flag long enough for the multiple bridge failure
+ * events to be ignored.
+ */
+ public void testWithReconnectDelay() throws Exception {
+ doTest(1000, 5000);
+ }
+
+ /**
+ * This test is expected to fail since the lack of reconnect delay allows
+ * the discovery event's failed flag to be reset while there are still
+ * pending failure events, thus allowing (unexpectedly) concurrent bridge
+ * reconnect attempts.
+ */
+ public void testWithoutReconnectDelay() throws Exception {
+ doTest(0, 0);
+ }
+
+ private void doTest(long reconnectDelay, long minConnectTime)
+ throws Exception {
+ // Start two brokers with a bridge from broker1 to broker2.
+ BrokerService broker1 = createBroker(new URI(
+ "broker:(vm://broker1)/broker1?persistent=false"));
+ BrokerService broker2 = createBroker(new URI(
+ "broker:(vm://broker2)/broker2?persistent=false"));
+
+ // Prevent broker2 from removing its (inbound) bridge connection.
+ BrokerPlugin ignoreRemoveConnectionPlugin = new BrokerPlugin() {
+ @Override
+ public Broker installPlugin(Broker broker) throws Exception {
+ return new BrokerFilter(broker) {
+ @Override
+ public void removeConnection(ConnectionContext context,
+ ConnectionInfo info, Throwable error)
+ throws Exception {
+ // Ignore, leaving behind clientId and connection.
+ }
+ };
+ }
+ };
+
+ broker2.setPlugins(new BrokerPlugin[] { ignoreRemoveConnectionPlugin });
+
+ startAllBrokers();
+
+ // Start a bridge from broker1 to broker2 that tracks information about
+ // the connection attemps.
+ final AtomicInteger numAttempts = new AtomicInteger(0);
+ final AtomicInteger concurrency = new AtomicInteger(0);
+ final AtomicInteger concurrencyAttempt = new AtomicInteger(-1);
+ final CountDownLatch attemptLatch = new CountDownLatch(10);
+
+ DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector() {
+ @Override
+ public void onServiceAdd(DiscoveryEvent event) {
+ int attempt = numAttempts.incrementAndGet();
+ if (concurrency.incrementAndGet() > 1) {
+ concurrencyAttempt.compareAndSet(-1, attempt);
+ }
+ try {
+ super.onServiceAdd(event);
+ } finally {
+ concurrency.decrementAndGet();
+ }
+ attemptLatch.countDown();
+ }
+ };
+
+ SimpleDiscoveryAgent da = new SimpleDiscoveryAgent();
+ da.setInitialReconnectDelay(reconnectDelay);
+ da.setMinConnectTime(minConnectTime);
+ da.setUseExponentialBackOff(false);
+ da.setServices(new URI[] { broker2.getVmConnectorURI() });
+
+ nc.setDiscoveryAgent(da);
+ broker1.addNetworkConnector(nc);
+ nc.start();
+
+ waitForBridgeFormation();
+
+ // Verify that only one attempt and thread was used to establish the
+ // bridge.
+ Assert.assertEquals(1, numAttempts.get());
+ Assert.assertEquals(-1, concurrencyAttempt.get());
+
+ // Stop the bridge; this will leave the connections in the broker and
+ // prevent a new bridge from being established.
+ nc.stop();
+
+ // Attempt to re-establish the bridge; this will cause repeated
+ // "connection already exists" exceptions.
+ numAttempts.set(0);
+ nc.start();
+
+ // Wait for several attempts and verify that only a single thread was
+ // used to (attempt) to establish the bridge.
+ attemptLatch.await();
+ Assert.assertEquals(-1, concurrencyAttempt.get());
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4159Test.java
------------------------------------------------------------------------------
svn:eol-style = native