You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/07/26 15:34:25 UTC

svn commit: r979277 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/discovery/ main/java/org/apache/activemq/transport/fanout/ test/java/org/apache/activemq/transport/ test/java/org/apache/activemq/transport/discovery/

Author: gtully
Date: Mon Jul 26 13:34:24 2010
New Revision: 979277

URL: http://svn.apache.org/viewvc?rev=979277&view=rev
Log:
resolve: https://issues.apache.org/activemq/browse/AMQ-2849 - patch applied with thanks

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/StubCompositeTransport.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransportFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransportFactory.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java?rev=979277&r1=979276&r2=979277&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java Mon Jul 26 13:34:24 2010
@@ -74,9 +74,10 @@ public class DiscoveryTransport extends 
         if (url != null) {
             try {
                 URI uri = new URI(url);
-                serviceURIs.put(event.getServiceName(), uri);
                 LOG.info("Adding new broker connection URL: " + uri);
-                next.add(false,new URI[] {URISupport.applyParameters(uri, parameters)});
+                uri = URISupport.applyParameters(uri, parameters);
+                serviceURIs.put(event.getServiceName(), uri);
+                next.add(false,new URI[] {uri});
             } catch (URISyntaxException e) {
                 LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransportFactory.java?rev=979277&r1=979276&r2=979277&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransportFactory.java Mon Jul 26 13:34:24 2010
@@ -21,8 +21,10 @@ import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.activemq.transport.CompositeTransport;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.failover.FailoverTransport;
 import org.apache.activemq.transport.failover.FailoverTransportFactory;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.URISupport.CompositeData;
@@ -32,14 +34,30 @@ import org.apache.activemq.util.URISuppo
  */
 public class DiscoveryTransportFactory extends FailoverTransportFactory {
         
-    public Transport createTransport(CompositeData compositData) throws IOException {
-        Map<String, String> parameters = new HashMap<String, String>(compositData.getParameters());
-        DiscoveryTransport transport = new DiscoveryTransport(createTransport(parameters));
-
-        DiscoveryAgent discoveryAgent = DiscoveryAgentFactory.createDiscoveryAgent(compositData.getComponents()[0]);
-        transport.setDiscoveryAgent(discoveryAgent);
+    public Transport createTransport(CompositeData compositeData) throws IOException {
+        Map<String, String> parameters = new HashMap<String, String>(compositeData.getParameters());
+        FailoverTransport failoverTransport = createTransport(parameters);
+        return createTransport(failoverTransport, compositeData);
+    }
+    
+    /**
+     * Creates a transport that reports discovered brokers to a specific composite transport.
+     * 
+     * @param compositeTransport transport to report discovered brokers to
+     * @param compositeData used to apply parameters to this transport 
+     * @return a transport that reports discovered brokers to a specific composite transport.
+     * @throws IOException
+     */
+    public static DiscoveryTransport createTransport(CompositeTransport compositeTransport, CompositeData compositeData) throws IOException {                
+        DiscoveryTransport transport = new DiscoveryTransport(compositeTransport);
+        
+        Map<String, String> parameters = new HashMap<String, String>(compositeData.getParameters());
         IntrospectionSupport.setProperties(transport, parameters);
         transport.setParameters(parameters);
+        
+        URI discoveryAgentURI = compositeData.getComponents()[0];
+        DiscoveryAgent discoveryAgent = DiscoveryAgentFactory.createDiscoveryAgent(discoveryAgentURI);
+        transport.setDiscoveryAgent(discoveryAgent);
         return transport;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransportFactory.java?rev=979277&r1=979276&r2=979277&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransportFactory.java Mon Jul 26 13:34:24 2010
@@ -27,9 +27,8 @@ import org.apache.activemq.transport.Res
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportServer;
-import org.apache.activemq.transport.discovery.DiscoveryAgent;
-import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
 import org.apache.activemq.transport.discovery.DiscoveryTransport;
+import org.apache.activemq.transport.discovery.DiscoveryTransportFactory;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.URISupport;
 import org.apache.activemq.util.URISupport.CompositeData;
@@ -62,19 +61,14 @@ public class FanoutTransportFactory exte
      * @throws URISyntaxException
      */
     public Transport createTransport(URI location) throws IOException, URISyntaxException {
-
         CompositeData compositeData = URISupport.parseComposite(location);
         Map<String, String> parameters = new HashMap<String, String>(compositeData.getParameters());
-        DiscoveryTransport transport = new DiscoveryTransport(createTransport(parameters));
-
-        DiscoveryAgent discoveryAgent = DiscoveryAgentFactory.createDiscoveryAgent(compositeData.getComponents()[0]);
-        transport.setDiscoveryAgent(discoveryAgent);
-
-        return transport;
-
+        FanoutTransport fanoutTransport = createTransport(parameters);        
+        DiscoveryTransport discoveryTransport = DiscoveryTransportFactory.createTransport(fanoutTransport, compositeData);        
+        return discoveryTransport;
     }
 
-    public FanoutTransport createTransport(Map parameters) throws IOException {
+    public FanoutTransport createTransport(Map<String,String> parameters) throws IOException {
         FanoutTransport transport = new FanoutTransport();
         IntrospectionSupport.setProperties(transport, parameters);
         return transport;

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/StubCompositeTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/StubCompositeTransport.java?rev=979277&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/StubCompositeTransport.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/StubCompositeTransport.java Mon Jul 26 13:34:24 2010
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ *
+ */
+public class StubCompositeTransport extends StubTransport implements CompositeTransport
+{
+    private List<URI> transportURIs = new ArrayList<URI>();    
+    
+    /**
+     * @see org.apache.activemq.transport.CompositeTransport#add(java.net.URI[])
+     */
+    public void add(boolean rebalance, URI[] uris)
+    {
+        transportURIs.addAll(Arrays.asList(uris));
+    }
+
+    /**
+     * @see org.apache.activemq.transport.CompositeTransport#remove(java.net.URI[])
+     */
+    public void remove(boolean rebalance, URI[] uris)
+    {
+        transportURIs.removeAll(Arrays.asList(uris));
+    }
+
+    public URI[] getTransportURIs()
+    {
+        return transportURIs.toArray(new URI[0]);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/StubCompositeTransport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/StubCompositeTransport.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java?rev=979277&r1=979276&r2=979277&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java Mon Jul 26 13:34:24 2010
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.discovery;
 
 import java.net.URI;
+import java.util.Map;
 import java.util.Vector;
 
 import javax.jms.Connection;
@@ -26,6 +27,10 @@ import org.apache.activemq.ActiveMQConne
 import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.DiscoveryEvent;
+import org.apache.activemq.transport.StubCompositeTransport;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.util.URISupport.CompositeData;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -112,4 +117,37 @@ public class DiscoveryTransportNoBrokerT
             assertTrue("took at least initialReconnectDelay time: " + duration + " e:" + expected, duration >= initialReconnectDelay);
         }
     }
+    
+    public void testSetDiscoveredBrokerProperties() throws Exception {
+        final String extraParameterName = "connectionTimeout";
+        final String extraParameterValue = "3000";
+        final URI uri = new URI("discovery:(multicast://default)?initialReconnectDelay=100&" + extraParameterName + "=" + extraParameterValue);        
+        CompositeData compositeData = URISupport.parseComposite(uri);
+        
+        StubCompositeTransport compositeTransport = new StubCompositeTransport();      
+        DiscoveryTransport discoveryTransport = DiscoveryTransportFactory.createTransport(compositeTransport, compositeData);
+        
+        discoveryTransport.onServiceAdd(new DiscoveryEvent("tcp://localhost:61616"));        
+        assertEquals("expected added URI after discovery event", compositeTransport.getTransportURIs().length, 1);
+        
+        URI discoveredServiceURI = compositeTransport.getTransportURIs()[0];
+        Map<String, String> parameters = URISupport.parseParamters(discoveredServiceURI);
+        assertTrue("unable to add parameter to discovered service", parameters.containsKey(extraParameterName));
+        assertEquals("incorrect value for parameter added to discovered service", parameters.get(extraParameterName), extraParameterValue);
+    }
+    
+    public void testAddRemoveDiscoveredBroker() throws Exception {
+        final URI uri = new URI("discovery:(multicast://default)?initialReconnectDelay=100&connectionTimeout=3000");        
+        CompositeData compositeData = URISupport.parseComposite(uri);
+        
+        StubCompositeTransport compositeTransport = new StubCompositeTransport();      
+        DiscoveryTransport discoveryTransport = DiscoveryTransportFactory.createTransport(compositeTransport, compositeData);
+        
+        final String serviceName = "tcp://localhost:61616";
+        discoveryTransport.onServiceAdd(new DiscoveryEvent(serviceName));        
+        assertEquals("expected added URI after discovery event", 1, compositeTransport.getTransportURIs().length);
+        
+        discoveryTransport.onServiceRemove(new DiscoveryEvent(serviceName));        
+        assertEquals("expected URI removed after discovery event", 0, compositeTransport.getTransportURIs().length);
+    }
 }