You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/11/05 21:21:51 UTC

svn commit: r1405934 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java test/java/org/apache/activemq/bugs/AMQ4160Test.java

Author: tabish
Date: Mon Nov  5 20:21:51 2012
New Revision: 1405934

URL: http://svn.apache.org/viewvc?rev=1405934&view=rev
Log:
Apply patch for: https://issues.apache.org/jira/browse/AMQ-4160

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=1405934&r1=1405933&r2=1405934&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java Mon Nov  5 20:21:51 2012
@@ -21,6 +21,8 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import javax.management.ObjectName;
 
@@ -51,7 +53,7 @@ public class DiscoveryNetworkConnector e
 
     private DiscoveryAgent discoveryAgent;
     private Map<String, String> parameters;
-
+    private ConcurrentMap<URI, DiscoveryEvent> activeEvents = new ConcurrentHashMap<URI, DiscoveryEvent>();
     public DiscoveryNetworkConnector() {
     }
 
@@ -85,15 +87,6 @@ public class DiscoveryNetworkConnector e
                 return;
             }
 
-            // Should we try to connect to that URI?
-            synchronized (bridges) {
-                if( bridges.containsKey(uri) ) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Discovery agent generated a duplicate onServiceAdd event for: "+uri );
-                    }
-                    return;
-                }
-            }
             if (localURI.equals(uri)) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("not connecting loopback: " + uri);
@@ -108,6 +101,11 @@ public class DiscoveryNetworkConnector e
                 return;
             }
 
+            // Should we try to connect to that URI?
+            if (activeEvents.putIfAbsent(uri, event) != null) {
+                LOG.debug("Discovery agent generated a duplicate onServiceAdd event for: "+uri );
+            }
+
             URI connectUri = uri;
             try {
                 connectUri = URISupport.applyParameters(connectUri, parameters, DISCOVERED_OPTION_PREFIX);
@@ -131,6 +129,7 @@ public class DiscoveryNetworkConnector e
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Connection failure exception: " + e, e);
                     }
+                    activeEvents.remove(url);
                     return;
                 }
                 try {
@@ -141,6 +140,7 @@ public class DiscoveryNetworkConnector e
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Connection failure exception: " + e, e);
                     }
+                    activeEvents.remove(url);
                     return;
                 }
             } finally {
@@ -148,10 +148,10 @@ public class DiscoveryNetworkConnector e
             }
             NetworkBridge bridge = createBridge(localTransport, remoteTransport, event);
             try {
-                bridge.start();
                 synchronized (bridges) {
                     bridges.put(uri, bridge);
                 }
+                bridge.start();
             } catch (Exception e) {
                 ServiceSupport.dispose(localTransport);
                 ServiceSupport.dispose(remoteTransport);
@@ -160,6 +160,7 @@ public class DiscoveryNetworkConnector e
                     LOG.debug("Start failure exception: " + e, e);
                 }
                 try {
+                    // Will remove bridge and active event.
                     discoveryAgent.serviceFailed(event);
                 } catch (IOException e1) {
                     if (LOG.isDebugEnabled()) {
@@ -181,8 +182,11 @@ public class DiscoveryNetworkConnector e
                 return;
             }
 
-            synchronized (bridges) {
-                bridges.remove(uri);
+            // Only remove bridge if this is the active discovery event for the URL.
+            if (activeEvents.remove(url, event)) {
+                synchronized (bridges) {
+                    bridges.remove(uri);
+                }
             }
         }
     }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java?rev=1405934&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java Mon Nov  5 20:21:51 2012
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.ObjectName;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.DiscoveryEvent;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkBridge;
+import org.apache.activemq.network.NetworkBridgeListener;
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.discovery.DiscoveryAgent;
+import org.apache.activemq.transport.discovery.DiscoveryListener;
+import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
+import org.junit.Assert;
+
+/**
+ * This test demonstrates a number of race conditions in
+ * {@link DiscoveryNetworkConnector} that can result in an active bridge no
+ * longer being reported as active and vice-versa, an inactive bridge still
+ * being reported as active.
+ */
+public class AMQ4160Test extends JmsMultipleBrokersTestSupport {
+    /**
+     * This test demonstrates how concurrent attempts to establish a bridge to
+     * the same remote broker are allowed to occur. Connection uniqueness will
+     * cause whichever bridge creation attempt is second to fail. However, this
+     * failure erases the entry in
+     * {@link DiscoveryNetworkConnector#activeBridges()} that represents the
+     * successful first bridge creation attempt.
+     */
+    public void testLostActiveBridge() throws Exception {
+        // Start two brokers with a bridge from broker1 to broker2.
+        BrokerService broker1 = createBroker(new URI(
+                "broker:(vm://broker1)/broker1?persistent=false"));
+        final BrokerService broker2 = createBroker(new URI(
+                "broker:(vm://broker2)/broker2?persistent=false"));
+
+        // Allow the concurrent local bridge connections to be made even though
+        // they are duplicated; this prevents both of the bridge attempts from
+        // failing in the case that the local and remote bridges are established
+        // out-of-order.
+        BrokerPlugin ignoreAddConnectionPlugin = new BrokerPlugin() {
+            @Override
+            public Broker installPlugin(Broker broker) throws Exception {
+                return new BrokerFilter(broker) {
+                    @Override
+                    public void addConnection(ConnectionContext context,
+                            ConnectionInfo info) throws Exception {
+                        // ignore
+                    }
+                };
+            }
+        };
+
+        broker1.setPlugins(new BrokerPlugin[] { ignoreAddConnectionPlugin });
+
+        startAllBrokers();
+
+        // Start a bridge from broker1 to broker2. The discovery agent attempts
+        // to create the bridge concurrently with two threads, and the
+        // synchronization in createBridge ensures that both threads actually
+        // attempt to start bridges.
+        final CountDownLatch createLatch = new CountDownLatch(2);
+
+        DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector() {
+            @Override
+            protected NetworkBridge createBridge(Transport localTransport,
+                    Transport remoteTransport, final DiscoveryEvent event) {
+                createLatch.countDown();
+                try {
+                    createLatch.await();
+                } catch (InterruptedException e) {
+                }
+                return super.createBridge(localTransport, remoteTransport,
+                        event);
+            }
+        };
+
+        nc.setDiscoveryAgent(new DiscoveryAgent() {
+            TaskRunnerFactory taskRunner = new TaskRunnerFactory();
+            DiscoveryListener listener;
+
+            @Override
+            public void start() throws Exception {
+                taskRunner.init();
+                taskRunner.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        listener.onServiceAdd(new DiscoveryEvent(broker2
+                                .getVmConnectorURI().toString()));
+                    }
+                });
+                taskRunner.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        listener.onServiceAdd(new DiscoveryEvent(broker2
+                                .getVmConnectorURI().toString()));
+                    }
+                });
+            }
+
+            @Override
+            public void stop() throws Exception {
+                taskRunner.shutdown();
+            }
+
+            @Override
+            public void setDiscoveryListener(DiscoveryListener listener) {
+                this.listener = listener;
+            }
+
+            @Override
+            public void registerService(String name) throws IOException {
+            }
+
+            @Override
+            public void serviceFailed(DiscoveryEvent event) throws IOException {
+                listener.onServiceRemove(event);
+            }
+        });
+
+        broker1.addNetworkConnector(nc);
+        nc.start();
+
+        // The bridge should be formed by the second creation attempt, but the
+        // wait will time out because the active bridge entry from the second
+        // (successful) bridge creation attempt is removed by the first
+        // (unsuccessful) bridge creation attempt.
+        waitForBridgeFormation();
+
+        Assert.assertFalse(nc.activeBridges().isEmpty());
+    }
+
+    /**
+     * This test demonstrates a race condition where a failed bridge can be
+     * removed from the list of active bridges in
+     * {@link DiscoveryNetworkConnector} before it has been added. Eventually,
+     * the failed bridge is added, but never removed, which prevents subsequent
+     * bridge creation attempts to be ignored. The result is a network connector
+     * that thinks it has an active bridge, when in fact it doesn't.
+     */
+    public void testInactiveBridgStillActive() throws Exception {
+        // Start two brokers with a bridge from broker1 to broker2.
+        BrokerService broker1 = createBroker(new URI(
+                "broker:(vm://broker1)/broker1?persistent=false"));
+        final BrokerService broker2 = createBroker(new URI(
+                "broker:(vm://broker2)/broker2?persistent=false"));
+
+        // Force bridge failure by having broker1 disallow connections.
+        BrokerPlugin disallowAddConnectionPlugin = new BrokerPlugin() {
+            @Override
+            public Broker installPlugin(Broker broker) throws Exception {
+                return new BrokerFilter(broker) {
+                    @Override
+                    public void addConnection(ConnectionContext context,
+                            ConnectionInfo info) throws Exception {
+                        throw new Exception(
+                                "Test exception to force bridge failure");
+                    }
+                };
+            }
+        };
+
+        broker1.setPlugins(new BrokerPlugin[] { disallowAddConnectionPlugin });
+
+        startAllBrokers();
+
+        // Start a bridge from broker1 to broker2. The bridge delays returning
+        // from start until after the bridge failure has been processed;
+        // this leaves the first bridge creation attempt recorded as active,
+        // even though it failed.
+        final SimpleDiscoveryAgent da = new SimpleDiscoveryAgent();
+        da.setServices(new URI[] { broker2.getVmConnectorURI() });
+
+        final CountDownLatch attemptLatch = new CountDownLatch(3);
+        final CountDownLatch removedLatch = new CountDownLatch(1);
+
+        DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector() {
+            @Override
+            public void onServiceAdd(DiscoveryEvent event) {
+                attemptLatch.countDown();
+                super.onServiceAdd(event);
+            }
+
+            @Override
+            public void onServiceRemove(DiscoveryEvent event) {
+                super.onServiceRemove(event);
+                removedLatch.countDown();
+            }
+
+            @Override
+            protected NetworkBridge createBridge(Transport localTransport,
+                    Transport remoteTransport, final DiscoveryEvent event) {
+                final NetworkBridge next = super.createBridge(localTransport,
+                        remoteTransport, event);
+                return new NetworkBridge() {
+
+                    @Override
+                    public void start() throws Exception {
+                        next.start();
+                        // Delay returning until the failed service has been
+                        // removed.
+                        removedLatch.await();
+                    }
+
+                    @Override
+                    public void stop() throws Exception {
+                        next.stop();
+                    }
+
+                    @Override
+                    public void serviceRemoteException(Throwable error) {
+                        next.serviceRemoteException(error);
+                    }
+
+                    @Override
+                    public void serviceLocalException(Throwable error) {
+                        next.serviceLocalException(error);
+                    }
+
+                    @Override
+                    public void setNetworkBridgeListener(
+                            NetworkBridgeListener listener) {
+                        next.setNetworkBridgeListener(listener);
+                    }
+
+                    @Override
+                    public String getRemoteAddress() {
+                        return next.getRemoteAddress();
+                    }
+
+                    @Override
+                    public String getRemoteBrokerName() {
+                        return next.getRemoteBrokerName();
+                    }
+
+                    @Override
+                    public String getLocalAddress() {
+                        return next.getLocalAddress();
+                    }
+
+                    @Override
+                    public String getLocalBrokerName() {
+                        return next.getLocalBrokerName();
+                    }
+
+                    @Override
+                    public long getEnqueueCounter() {
+                        return next.getEnqueueCounter();
+                    }
+
+                    @Override
+                    public long getDequeueCounter() {
+                        return next.getDequeueCounter();
+                    }
+
+                    @Override
+                    public void setMbeanObjectName(ObjectName objectName) {
+                        next.setMbeanObjectName(objectName);
+                    }
+
+                    @Override
+                    public ObjectName getMbeanObjectName() {
+                        return next.getMbeanObjectName();
+                    }
+                };
+            }
+        };
+        nc.setDiscoveryAgent(da);
+
+        broker1.addNetworkConnector(nc);
+        nc.start();
+
+        // All bridge attempts should fail, so the attempt latch should get
+        // triggered. However, because of the race condition, the first attempt
+        // is considered successful and causes further attempts to stop.
+        // Therefore, this wait will time out and cause the test to fail.
+        Assert.assertTrue(attemptLatch.await(30, TimeUnit.SECONDS));
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java
------------------------------------------------------------------------------
    svn:eol-style = native