You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/10/13 17:30:50 UTC

svn commit: r824807 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network: DiscoveryNetworkConnector.java NetworkConnector.java

Author: gtully
Date: Tue Oct 13 15:30:50 2009
New Revision: 824807

URL: http://svn.apache.org/viewvc?rev=824807&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2448 - failed network bridge being incorrectly recorded in a list (and leaked), issue introduced by https://issues.apache.org/activemq/browse/AMQ-2298 now resolved by reusing exsiting correctly managed list of bridges

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=824807&r1=824806&r2=824807&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java Tue Oct 13 15:30:50 2009
@@ -48,7 +48,7 @@
     private static final Log LOG = LogFactory.getLog(DiscoveryNetworkConnector.class);
 
     private DiscoveryAgent discoveryAgent;
-    private ConcurrentHashMap<URI, NetworkBridge> bridges = new ConcurrentHashMap<URI, NetworkBridge>();
+    
     private Map<String, String> parameters;
     
     public DiscoveryNetworkConnector() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java?rev=824807&r1=824806&r2=824807&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java Tue Oct 13 15:30:50 2009
@@ -16,6 +16,19 @@
  */
 package org.apache.activemq.network;
 
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.AnnotatedMBean;
@@ -30,17 +43,6 @@
 import org.apache.activemq.util.ServiceSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
 
 /**
  * @version $Revision$
@@ -50,6 +52,8 @@
     private static final Log LOG = LogFactory.getLog(NetworkConnector.class);
     protected URI localURI;
     protected ConnectionFilter connectionFilter;
+    protected ConcurrentHashMap<URI, NetworkBridge> bridges = new ConcurrentHashMap<URI, NetworkBridge>();
+    
     protected ServiceSupport serviceSupport = new ServiceSupport() {
 
         protected void doStart() throws Exception {
@@ -67,8 +71,7 @@
     private List<ActiveMQDestination> staticallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
     private BrokerService brokerService;
     private ObjectName objectName;
-    private ConcurrentLinkedQueue<DemandForwardingBridgeSupport> configuredBridges = new ConcurrentLinkedQueue<DemandForwardingBridgeSupport>();
- 
+    
     public NetworkConnector() {
     }
 
@@ -187,7 +190,6 @@
             dest = (ActiveMQDestination[])topics.toArray(dest);
             result.setDurableDestinations(dest);
         }
-        configuredBridges.add(result);
         return result;
     }
 
@@ -268,10 +270,13 @@
     // ask all the bridges as we can't know to which this consumer is tied
     public boolean removeDemandSubscription(ConsumerId consumerId) {
         boolean removeSucceeded = false;
-        for (DemandForwardingBridgeSupport bridge: configuredBridges) {
-            if (bridge.removeDemandSubscriptionByLocalId(consumerId)) {
-                removeSucceeded = true;
-                break;
+        for (NetworkBridge bridge : bridges.values()) {
+            if (bridge instanceof DemandForwardingBridgeSupport) {
+                DemandForwardingBridgeSupport demandBridge = (DemandForwardingBridgeSupport) bridge;
+                if (demandBridge.removeDemandSubscriptionByLocalId(consumerId)) {
+                    removeSucceeded = true;
+                    break;
+                }
             }
         }
         return removeSucceeded;