You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/03/17 03:24:30 UTC

svn commit: r386507 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/BrokerService.java network/ConnectionFilter.java network/DiscoveryNetworkConnector.java network/NetworkConnector.java

Author: chirino
Date: Thu Mar 16 18:24:29 2006
New Revision: 386507

URL: http://svn.apache.org/viewcvs?rev=386507&view=rev
Log:
Fix for http://jira.activemq.org/jira/browse/AMQ-639

The NetworkConnector was not removing the old bridge for the map of bridges that he was mantinaing,
so when the remote broker came back up, then the bridge was not restarted.

Also add a new ConnectionFilter interface which allows the broker to avoid establishing loopback connections.

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConnectionFilter.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=386507&r1=386506&r2=386507&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Thu Mar 16 18:24:29 2006
@@ -47,6 +47,7 @@
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.network.ConnectionFilter;
 import org.apache.activemq.network.DiscoveryNetworkConnector;
 import org.apache.activemq.network.NetworkConnector;
 import org.apache.activemq.network.jms.JmsConnector;
@@ -224,6 +225,24 @@
         map.put("network", "true");
         uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
         connector.setLocalUri(uri);
+        
+        // Set a connection filter so that the connector does not establish loop back connections.
+        connector.setConnectionFilter(new ConnectionFilter() {
+            public boolean connectTo(URI location) {
+                List transportConnectors = getTransportConnectors();
+                for (Iterator iter = transportConnectors.iterator(); iter.hasNext();) {
+                    try {
+                        TransportConnector tc = (TransportConnector) iter.next();
+                        if( location.equals(tc.getConnectUri()) ) {
+                            return false;
+                        }
+                    } catch (Throwable e) {
+                    }
+                }
+                return true;
+            }
+        });
+        
         networkConnectors.add(connector);
         if (isUseJmx()) {
             registerNetworkConnectorMBean(connector);

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConnectionFilter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConnectionFilter.java?rev=386507&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConnectionFilter.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConnectionFilter.java Thu Mar 16 18:24:29 2006
@@ -0,0 +1,16 @@
+package org.apache.activemq.network;
+
+import java.net.URI;
+
+/**
+ * Abstraction that allows you to control which brokers a NetworkConnector connects bridges to.
+ * 
+ * @version $Revision$
+ */
+public interface ConnectionFilter {
+    /**
+     * @param location
+     * @return true if the network connector should establish a connection to the specified location.
+     */
+    boolean connectTo(URI location);
+}

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=386507&r1=386506&r2=386507&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java Thu Mar 16 18:24:29 2006
@@ -18,6 +18,7 @@
 
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.DiscoveryEvent;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
@@ -44,7 +45,7 @@
 
     private DiscoveryAgent discoveryAgent;
     private ConcurrentHashMap bridges = new ConcurrentHashMap();
-
+    
     public DiscoveryNetworkConnector() {
     }
 
@@ -69,8 +70,11 @@
                 return;
             }
 
-            // Has it allready been added?
-            if (bridges.containsKey(uri) || localURI.equals(uri))
+            // Should we try to connect to that URI?
+            if (    bridges.containsKey(uri) 
+                    || localURI.equals(uri) 
+                    || (connectionFilter!=null && !connectionFilter.connectTo(uri))
+                    )
                 return;
 
             URI connectUri = uri;
@@ -131,7 +135,7 @@
                 return;
             }
 
-            Bridge bridge = (Bridge) bridges.get(uri);
+            Bridge bridge = (Bridge) bridges.remove(uri);
             if (bridge == null)
                 return;
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java?rev=386507&r1=386506&r2=386507&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java Thu Mar 16 18:24:29 2006
@@ -18,6 +18,7 @@
 
 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
 
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
@@ -50,6 +51,9 @@
     private boolean decreaseNetworkConsumerPriority;
     private int networkTTL = 1;
     private String name = "bridge";
+    private int prefetchSize = 1000;
+    private boolean dispatchAsync = true;
+    protected ConnectionFilter connectionFilter;
 
     public NetworkConnector() {
     }
@@ -234,6 +238,8 @@
         result.setLocalBrokerName(getBrokerName());
         result.setName(getBrokerName());
         result.setNetworkTTL(getNetworkTTL());
+        result.setPrefetchSize(prefetchSize);
+        result.setDispatchAsync(dispatchAsync);
         result.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority());
 
         List destsList = getDynamicallyIncludedDestinations();
@@ -271,5 +277,29 @@
 
     protected Transport createLocalTransport() throws Exception {
         return TransportFactory.connect(localURI);
+    }
+
+    public boolean isDispatchAsync() {
+        return dispatchAsync;
+    }
+
+    public void setDispatchAsync(boolean dispatchAsync) {
+        this.dispatchAsync = dispatchAsync;
+    }
+
+    public int getPrefetchSize() {
+        return prefetchSize;
+    }
+
+    public void setPrefetchSize(int prefetchSize) {
+        this.prefetchSize = prefetchSize;
+    }
+
+    public ConnectionFilter getConnectionFilter() {
+        return connectionFilter;
+    }
+
+    public void setConnectionFilter(ConnectionFilter connectionFilter) {
+        this.connectionFilter = connectionFilter;
     }
 }