You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/07/26 15:34:25 UTC
svn commit: r979277 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/transport/discovery/
main/java/org/apache/activemq/transport/fanout/
test/java/org/apache/activemq/transport/
test/java/org/apache/activemq/transport/discovery/
Author: gtully
Date: Mon Jul 26 13:34:24 2010
New Revision: 979277
URL: http://svn.apache.org/viewvc?rev=979277&view=rev
Log:
resolve: https://issues.apache.org/activemq/browse/AMQ-2849 - patch applied with thanks
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/StubCompositeTransport.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransportFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransportFactory.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java?rev=979277&r1=979276&r2=979277&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java Mon Jul 26 13:34:24 2010
@@ -74,9 +74,10 @@ public class DiscoveryTransport extends
if (url != null) {
try {
URI uri = new URI(url);
- serviceURIs.put(event.getServiceName(), uri);
LOG.info("Adding new broker connection URL: " + uri);
- next.add(false,new URI[] {URISupport.applyParameters(uri, parameters)});
+ uri = URISupport.applyParameters(uri, parameters);
+ serviceURIs.put(event.getServiceName(), uri);
+ next.add(false,new URI[] {uri});
} catch (URISyntaxException e) {
LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransportFactory.java?rev=979277&r1=979276&r2=979277&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransportFactory.java Mon Jul 26 13:34:24 2010
@@ -21,8 +21,10 @@ import java.net.URI;
import java.util.HashMap;
import java.util.Map;
+import org.apache.activemq.transport.CompositeTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.failover.FailoverTransport;
import org.apache.activemq.transport.failover.FailoverTransportFactory;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport.CompositeData;
@@ -32,14 +34,30 @@ import org.apache.activemq.util.URISuppo
*/
public class DiscoveryTransportFactory extends FailoverTransportFactory {
- public Transport createTransport(CompositeData compositData) throws IOException {
- Map<String, String> parameters = new HashMap<String, String>(compositData.getParameters());
- DiscoveryTransport transport = new DiscoveryTransport(createTransport(parameters));
-
- DiscoveryAgent discoveryAgent = DiscoveryAgentFactory.createDiscoveryAgent(compositData.getComponents()[0]);
- transport.setDiscoveryAgent(discoveryAgent);
+ public Transport createTransport(CompositeData compositeData) throws IOException {
+ Map<String, String> parameters = new HashMap<String, String>(compositeData.getParameters());
+ FailoverTransport failoverTransport = createTransport(parameters);
+ return createTransport(failoverTransport, compositeData);
+ }
+
+ /**
+ * Creates a transport that reports discovered brokers to a specific composite transport.
+ *
+ * @param compositeTransport transport to report discovered brokers to
+ * @param compositeData used to apply parameters to this transport
+ * @return a transport that reports discovered brokers to a specific composite transport.
+ * @throws IOException
+ */
+ public static DiscoveryTransport createTransport(CompositeTransport compositeTransport, CompositeData compositeData) throws IOException {
+ DiscoveryTransport transport = new DiscoveryTransport(compositeTransport);
+
+ Map<String, String> parameters = new HashMap<String, String>(compositeData.getParameters());
IntrospectionSupport.setProperties(transport, parameters);
transport.setParameters(parameters);
+
+ URI discoveryAgentURI = compositeData.getComponents()[0];
+ DiscoveryAgent discoveryAgent = DiscoveryAgentFactory.createDiscoveryAgent(discoveryAgentURI);
+ transport.setDiscoveryAgent(discoveryAgent);
return transport;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransportFactory.java?rev=979277&r1=979276&r2=979277&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransportFactory.java Mon Jul 26 13:34:24 2010
@@ -27,9 +27,8 @@ import org.apache.activemq.transport.Res
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
-import org.apache.activemq.transport.discovery.DiscoveryAgent;
-import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
import org.apache.activemq.transport.discovery.DiscoveryTransport;
+import org.apache.activemq.transport.discovery.DiscoveryTransportFactory;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.util.URISupport.CompositeData;
@@ -62,19 +61,14 @@ public class FanoutTransportFactory exte
* @throws URISyntaxException
*/
public Transport createTransport(URI location) throws IOException, URISyntaxException {
-
CompositeData compositeData = URISupport.parseComposite(location);
Map<String, String> parameters = new HashMap<String, String>(compositeData.getParameters());
- DiscoveryTransport transport = new DiscoveryTransport(createTransport(parameters));
-
- DiscoveryAgent discoveryAgent = DiscoveryAgentFactory.createDiscoveryAgent(compositeData.getComponents()[0]);
- transport.setDiscoveryAgent(discoveryAgent);
-
- return transport;
-
+ FanoutTransport fanoutTransport = createTransport(parameters);
+ DiscoveryTransport discoveryTransport = DiscoveryTransportFactory.createTransport(fanoutTransport, compositeData);
+ return discoveryTransport;
}
- public FanoutTransport createTransport(Map parameters) throws IOException {
+ public FanoutTransport createTransport(Map<String,String> parameters) throws IOException {
FanoutTransport transport = new FanoutTransport();
IntrospectionSupport.setProperties(transport, parameters);
return transport;
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/StubCompositeTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/StubCompositeTransport.java?rev=979277&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/StubCompositeTransport.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/StubCompositeTransport.java Mon Jul 26 13:34:24 2010
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ *
+ */
+public class StubCompositeTransport extends StubTransport implements CompositeTransport
+{
+ private List<URI> transportURIs = new ArrayList<URI>();
+
+ /**
+ * @see org.apache.activemq.transport.CompositeTransport#add(java.net.URI[])
+ */
+ public void add(boolean rebalance, URI[] uris)
+ {
+ transportURIs.addAll(Arrays.asList(uris));
+ }
+
+ /**
+ * @see org.apache.activemq.transport.CompositeTransport#remove(java.net.URI[])
+ */
+ public void remove(boolean rebalance, URI[] uris)
+ {
+ transportURIs.removeAll(Arrays.asList(uris));
+ }
+
+ public URI[] getTransportURIs()
+ {
+ return transportURIs.toArray(new URI[0]);
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/StubCompositeTransport.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/StubCompositeTransport.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java?rev=979277&r1=979276&r2=979277&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java Mon Jul 26 13:34:24 2010
@@ -17,6 +17,7 @@
package org.apache.activemq.transport.discovery;
import java.net.URI;
+import java.util.Map;
import java.util.Vector;
import javax.jms.Connection;
@@ -26,6 +27,10 @@ import org.apache.activemq.ActiveMQConne
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.DiscoveryEvent;
+import org.apache.activemq.transport.StubCompositeTransport;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.util.URISupport.CompositeData;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -112,4 +117,37 @@ public class DiscoveryTransportNoBrokerT
assertTrue("took at least initialReconnectDelay time: " + duration + " e:" + expected, duration >= initialReconnectDelay);
}
}
+
+ public void testSetDiscoveredBrokerProperties() throws Exception {
+ final String extraParameterName = "connectionTimeout";
+ final String extraParameterValue = "3000";
+ final URI uri = new URI("discovery:(multicast://default)?initialReconnectDelay=100&" + extraParameterName + "=" + extraParameterValue);
+ CompositeData compositeData = URISupport.parseComposite(uri);
+
+ StubCompositeTransport compositeTransport = new StubCompositeTransport();
+ DiscoveryTransport discoveryTransport = DiscoveryTransportFactory.createTransport(compositeTransport, compositeData);
+
+ discoveryTransport.onServiceAdd(new DiscoveryEvent("tcp://localhost:61616"));
+ assertEquals("expected added URI after discovery event", compositeTransport.getTransportURIs().length, 1);
+
+ URI discoveredServiceURI = compositeTransport.getTransportURIs()[0];
+ Map<String, String> parameters = URISupport.parseParamters(discoveredServiceURI);
+ assertTrue("unable to add parameter to discovered service", parameters.containsKey(extraParameterName));
+ assertEquals("incorrect value for parameter added to discovered service", parameters.get(extraParameterName), extraParameterValue);
+ }
+
+ public void testAddRemoveDiscoveredBroker() throws Exception {
+ final URI uri = new URI("discovery:(multicast://default)?initialReconnectDelay=100&connectionTimeout=3000");
+ CompositeData compositeData = URISupport.parseComposite(uri);
+
+ StubCompositeTransport compositeTransport = new StubCompositeTransport();
+ DiscoveryTransport discoveryTransport = DiscoveryTransportFactory.createTransport(compositeTransport, compositeData);
+
+ final String serviceName = "tcp://localhost:61616";
+ discoveryTransport.onServiceAdd(new DiscoveryEvent(serviceName));
+ assertEquals("expected added URI after discovery event", 1, compositeTransport.getTransportURIs().length);
+
+ discoveryTransport.onServiceRemove(new DiscoveryEvent(serviceName));
+ assertEquals("expected URI removed after discovery event", 0, compositeTransport.getTransportURIs().length);
+ }
}