You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/07/08 15:00:44 UTC

svn commit: r792127 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/transport/discovery/ main/java/org/apache/activemq/transport/discovery/multicast/ test/java/org/apache/activemq/transport/di...

Author: gtully
Date: Wed Jul  8 13:00:44 2009
New Revision: 792127

URL: http://svn.apache.org/viewvc?rev=792127&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2283. In suppressing asyncColos I found the need to suppress brodcast of query parameters by the multicast discovery agent as server side and client side options are not compatible. Added support to the DiscoveryTransport to apply query parameters to each uri that it discovers.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransportFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=792127&r1=792126&r2=792127&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java Wed Jul  8 13:00:44 2009
@@ -240,7 +240,7 @@
         
         DiscoveryAgent da = getDiscoveryAgent();
         if (da != null) {
-            da.registerService(getConnectUri().toString());
+            da.registerService(getPublishableConnectString());
             da.start();
         }
         if (enableStatusMonitor) {
@@ -251,6 +251,20 @@
         LOG.info("Connector " + getName() + " Started");
     }
 
+    private String getPublishableConnectString() throws Exception {
+        URI connectUri = getConnectUri();
+        String publishableConnectString = connectUri.toString();
+        // strip off server side query parameters which may not be compatible to clients
+        if (connectUri.getRawQuery() != null) {
+            publishableConnectString = 
+                publishableConnectString.substring(0, publishableConnectString.indexOf(connectUri.getRawQuery()) -1); 
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Publishing: " + publishableConnectString + " for broker transport URI: " + connectUri);
+        }
+        return publishableConnectString;
+    }
+
     public void stop() throws Exception {
         ServiceStopper ss = new ServiceStopper();
         if (discoveryAgent != null) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java?rev=792127&r1=792126&r2=792127&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java Wed Jul  8 13:00:44 2009
@@ -18,6 +18,7 @@
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.activemq.command.DiscoveryEvent;
@@ -41,6 +42,8 @@
     private DiscoveryAgent discoveryAgent;
     private final ConcurrentHashMap<String, URI> serviceURIs = new ConcurrentHashMap<String, URI>();
 
+    private Map<String, String> parameters;
+
     public DiscoveryTransport(CompositeTransport next) {
         super(next);
         this.next = next;
@@ -71,13 +74,27 @@
                 URI uri = new URI(url);
                 serviceURIs.put(event.getServiceName(), uri);
                 LOG.info("Adding new broker connection URL: " + uri);
-                next.add(new URI[] {uri});
+                next.add(new URI[] {applyParameters(uri)});
             } catch (URISyntaxException e) {
                 LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
             }
         }
     }
 
+    private URI applyParameters(URI uri) throws URISyntaxException {
+        if (parameters != null && !parameters.isEmpty()) {
+            StringBuffer newQuery = uri.getRawQuery() != null ? new StringBuffer(uri.getRawQuery()) : new StringBuffer() ;
+            for ( Map.Entry<String, String> param: parameters.entrySet()) {
+                if (newQuery.length()!=0) {
+                    newQuery.append(';');
+                }
+                newQuery.append(param.getKey()).append('=').append(param.getValue());
+            }
+            uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), newQuery.toString(), uri.getFragment());
+        }
+        return uri;
+}
+
     public void onServiceRemove(DiscoveryEvent event) {
         URI uri = serviceURIs.get(event.getServiceName());
         if (uri != null) {
@@ -93,4 +110,8 @@
         this.discoveryAgent = discoveryAgent;
     }
 
+    public void setParameters(Map<String, String> parameters) {
+       this.parameters = parameters;      
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransportFactory.java?rev=792127&r1=792126&r2=792127&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransportFactory.java Wed Jul  8 13:00:44 2009
@@ -39,7 +39,7 @@
         DiscoveryAgent discoveryAgent = DiscoveryAgentFactory.createDiscoveryAgent(compositData.getComponents()[0]);
         transport.setDiscoveryAgent(discoveryAgent);
         IntrospectionSupport.setProperties(transport, parameters);
-
+        transport.setParameters(parameters);
         return transport;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java?rev=792127&r1=792126&r2=792127&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java Wed Jul  8 13:00:44 2009
@@ -28,6 +28,7 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -176,7 +177,7 @@
     private long lastAdvertizeTime;
     private AtomicBoolean started = new AtomicBoolean(false);
     private boolean reportAdvertizeFailed = true;
-    private Executor executor = null;
+    private ExecutorService executor = null;
 
     /**
      * Set the discovery listener
@@ -315,6 +316,8 @@
             if (mcast != null) {
                 mcast.close();
             }
+            runner.interrupt();
+            getExecutor().shutdownNow();
         }
     }
 
@@ -488,7 +491,7 @@
         }
     }
 
-    private Executor getExecutor() {
+    private ExecutorService getExecutor() {
         if (executor == null) {
             final String threadName = "Notifier-" + this.toString();
             executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java?rev=792127&r1=792126&r2=792127&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java Wed Jul  8 13:00:44 2009
@@ -16,11 +16,16 @@
  */
 package org.apache.activemq.transport.discovery;
 
+import java.net.URI;
+import java.util.Vector;
+
 import javax.jms.Connection;
 import javax.jms.JMSException;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -28,6 +33,51 @@
 
     private static final Log LOG = LogFactory.getLog(DiscoveryTransportNoBrokerTest.class);
     
+
+    public void testNoExtraThreads() throws Exception {
+        BrokerService broker = new BrokerService();
+        TransportConnector tcp = broker.addConnector("tcp://localhost:0?transport.closeAsync=false");
+        String group = "GR-" +  System.currentTimeMillis();
+        URI discoveryUri = new URI("multicast://default?group=" + group);
+        tcp.setDiscoveryUri(discoveryUri);
+        broker.start();
+        broker.waitUntilStarted();
+        
+        Vector<String> existingNames = new Vector<String>();
+        Thread[] threads = getThreads();
+        for (Thread t : threads) {
+            existingNames.add(t.getName());
+        }
+        final int idleThreadCount = threads.length;
+        LOG.info("Broker started - thread Count:" + idleThreadCount);
+        
+       final int noConnectionToCreate = 10;
+        for (int i=0; i<10;i++) {
+            ActiveMQConnectionFactory factory = 
+                new ActiveMQConnectionFactory("discovery:(multicast://239.255.2.3:6155?group=" + group +")?closeAsync=false");
+            LOG.info("Connecting.");
+            Connection connection = factory.createConnection();
+            connection.setClientID("test");  
+            connection.close();
+        }
+        Thread.sleep(2000);
+        threads = getThreads();
+        for (Thread t : threads) {
+            if (!existingNames.contains(t.getName())) {
+                LOG.info("Remaining thread:" + t);
+            }
+        }
+        assertTrue("no extra threads per connection", Thread.activeCount() - idleThreadCount < noConnectionToCreate);
+    }
+   
+    
+    private Thread[] getThreads() {
+        Thread[] threads = new Thread[Thread.activeCount()];
+        Thread.enumerate(threads);
+        return threads;
+    }
+
+
     public void testMaxReconnectAttempts() throws JMSException {
         try {
             ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("discovery:(multicast://doesNOTexist)");