You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/04/10 19:58:33 UTC
svn commit: r763993 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/network/ main/java/org/apache/activemq/util/
test/java/org/apache/activemq/usecases/ test/java/org/apache/activemq/util/
Author: dejanb
Date: Fri Apr 10 17:58:32 2009
New Revision: 763993
URL: http://svn.apache.org/viewvc?rev=763993&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-2086 - duplex connector and excluded destinations
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ListEditor.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IntrospectionSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TestBrokerConnectionDuplexExcludedDestinations.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/ReflectionSupportTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SimplePojo.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=763993&r1=763992&r2=763993&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri Apr 10 17:58:32 2009
@@ -433,6 +433,15 @@
} else if (command.isBrokerInfo()) {
lastConnectSucceeded.set(true);
remoteBrokerInfo = (BrokerInfo)command;
+ Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
+ try {
+ IntrospectionSupport.getProperties(configuration, props, null);
+ excludedDestinations = configuration.getExcludedDestinations().toArray(new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
+ staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
+ dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
+ } catch (Throwable t) {
+ LOG.error("Error mapping remote destinations", t);
+ }
serviceRemoteBrokerInfo(command);
// Let the local broker know the remote broker's ID.
localBroker.oneway(command);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java?rev=763993&r1=763992&r2=763993&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java Fri Apr 10 17:58:32 2009
@@ -16,6 +16,10 @@
*/
package org.apache.activemq.network;
+import java.util.List;
+
+import org.apache.activemq.command.ActiveMQDestination;
+
/**
* Configuration for a NetworkBridge
*
@@ -36,8 +40,14 @@
private String password;
private String destinationFilter = ">";
private String name = null;
+
+ private List<ActiveMQDestination> excludedDestinations;
+ private List<ActiveMQDestination> dynamicallyIncludedDestinations;
+ private List<ActiveMQDestination> staticallyIncludedDestinations;
+
private boolean suppressDuplicateQueueSubscriptions = false;
+
/**
* @return the conduitSubscriptions
*/
@@ -224,6 +234,35 @@
this.name = name;
}
+ public List<ActiveMQDestination> getExcludedDestinations() {
+ return excludedDestinations;
+ }
+
+ public void setExcludedDestinations(
+ List<ActiveMQDestination> excludedDestinations) {
+ this.excludedDestinations = excludedDestinations;
+ }
+
+ public List<ActiveMQDestination> getDynamicallyIncludedDestinations() {
+ return dynamicallyIncludedDestinations;
+ }
+
+ public void setDynamicallyIncludedDestinations(
+ List<ActiveMQDestination> dynamicallyIncludedDestinations) {
+ this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
+ }
+
+ public List<ActiveMQDestination> getStaticallyIncludedDestinations() {
+ return staticallyIncludedDestinations;
+ }
+
+ public void setStaticallyIncludedDestinations(
+ List<ActiveMQDestination> staticallyIncludedDestinations) {
+ this.staticallyIncludedDestinations = staticallyIncludedDestinations;
+ }
+
+
+
public boolean isSuppressDuplicateQueueSubscriptions() {
return suppressDuplicateQueueSubscriptions;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IntrospectionSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IntrospectionSupport.java?rev=763993&r1=763992&r2=763993&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IntrospectionSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IntrospectionSupport.java Fri Apr 10 17:58:32 2009
@@ -21,8 +21,6 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
-import java.net.URI;
-import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
@@ -32,8 +30,23 @@
import java.util.Map.Entry;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.commons.lang.ArrayUtils;
+
+
public final class IntrospectionSupport {
+
+ static {
+ // find Spring and ActiveMQ specific property editors
+ String[] searchPath = (String[])ArrayUtils.addAll(
+ PropertyEditorManager.getEditorSearchPath(),
+ new String[] {
+ "org.springframework.beans.propertyeditors"
+ , "org.apache.activemq.util"
+ }
+ );
+ PropertyEditorManager.setEditorSearchPath(searchPath);
+ }
private IntrospectionSupport() {
}
@@ -177,27 +190,21 @@
}
}
- private static Object convert(Object value, Class type) throws URISyntaxException {
+ private static Object convert(Object value, Class type) {
PropertyEditor editor = PropertyEditorManager.findEditor(type);
if (editor != null) {
editor.setAsText(value.toString());
return editor.getValue();
}
- if (type == URI.class) {
- return new URI(value.toString());
- }
return null;
}
- private static String convertToString(Object value, Class type) throws URISyntaxException {
+ public static String convertToString(Object value, Class type) {
PropertyEditor editor = PropertyEditorManager.findEditor(type);
if (editor != null) {
editor.setValue(value);
return editor.getAsText();
}
- if (type == URI.class) {
- return ((URI)value).toString();
- }
return null;
}
@@ -219,12 +226,7 @@
if (PropertyEditorManager.findEditor(clazz) != null) {
return true;
}
- if (clazz == URI.class) {
- return true;
- }
- if (clazz == Boolean.class) {
- return true;
- }
+
return false;
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ListEditor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ListEditor.java?rev=763993&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ListEditor.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ListEditor.java Fri Apr 10 17:58:32 2009
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.beans.PropertyEditorSupport;
+import java.util.ArrayList;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.springframework.util.StringUtils;
+
+/**
+ * Used to serialize lists of ActiveMQDestinations.
+ * @see org.apache.activemq.util.IntrospectionSupport
+ */
+public class ListEditor extends PropertyEditorSupport {
+
+ public static final String DEFAULT_SEPARATOR = ",";
+
+ public String getAsText() {
+ return getValue().toString();
+ }
+
+
+ public void setAsText(String text) throws IllegalArgumentException {
+ text = text.substring(1, text.length() - 1);
+ String[] array = StringUtils.delimitedListToStringArray(text, ListEditor.DEFAULT_SEPARATOR, null);
+ ArrayList<ActiveMQDestination> list = new ArrayList<ActiveMQDestination>();
+ for (String item : array) {
+ list.add(ActiveMQDestination.createDestination(item.trim(), ActiveMQDestination.QUEUE_TYPE));
+ }
+ setValue(list);
+ }
+
+}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TestBrokerConnectionDuplexExcludedDestinations.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TestBrokerConnectionDuplexExcludedDestinations.java?rev=763993&r1=763992&r2=763993&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TestBrokerConnectionDuplexExcludedDestinations.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TestBrokerConnectionDuplexExcludedDestinations.java Fri Apr 10 17:58:32 2009
@@ -36,65 +36,92 @@
import org.apache.activemq.broker.BrokerService;
public class TestBrokerConnectionDuplexExcludedDestinations extends TestCase {
-
- public void testBrokerConnectionDuplexPropertiesPropagation()
- throws Exception {
-
+
+ BrokerService receiverBroker;
+ BrokerService senderBroker;
+
+ Connection hubConnection;
+ Session hubSession;
+
+ Connection spokeConnection;
+ Session spokeSession;
+
+ public void setUp() throws Exception {
// Hub broker
String configFileName = "org/apache/activemq/usecases/receiver-duplex.xml";
URI uri = new URI("xbean:" + configFileName);
- BrokerService receiverBroker = BrokerFactory.createBroker(uri);
+ receiverBroker = BrokerFactory.createBroker(uri);
receiverBroker.setPersistent(false);
receiverBroker.setBrokerName("Hub");
// Spoke broker
configFileName = "org/apache/activemq/usecases/sender-duplex.xml";
uri = new URI("xbean:" + configFileName);
- BrokerService senderBroker = BrokerFactory.createBroker(uri);
+ senderBroker = BrokerFactory.createBroker(uri);
senderBroker.setPersistent(false);
- receiverBroker.setBrokerName("Spoke");
+ senderBroker.setBrokerName("Spoke");
// Start both Hub and Spoke broker
receiverBroker.start();
senderBroker.start();
+
+ // create hub session
+ ConnectionFactory cfHub = new ActiveMQConnectionFactory("tcp://localhost:62002");
- final ConnectionFactory cfHub = new ActiveMQConnectionFactory(
- "tcp://localhost:62002");
- final Connection hubConnection = cfHub.createConnection();
+ hubConnection = cfHub.createConnection();
hubConnection.start();
- final Session hubSession = hubConnection.createSession(false,
- Session.AUTO_ACKNOWLEDGE);
- final MessageProducer hubProducer = hubSession.createProducer(null);
+ hubSession = hubConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // create spoke session
+ ConnectionFactory cfSpoke = new ActiveMQConnectionFactory("tcp://localhost:62001");
+ spokeConnection = cfSpoke.createConnection();
+ spokeConnection.start();
+ spokeSession = spokeConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void tearDown() throws Exception {
+ hubSession.close();
+ hubConnection.stop();
+ hubConnection.close();
+
+ spokeSession.close();
+ spokeConnection.stop();
+ spokeConnection.close();
+
+ senderBroker.stop();
+ receiverBroker.stop();
+ }
+
+ public void testDuplexSendFromHubToSpoke()
+ throws Exception {
+
+ //create hub producer
+ MessageProducer hubProducer = hubSession.createProducer(null);
hubProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
hubProducer.setDisableMessageID(true);
hubProducer.setDisableMessageTimestamp(true);
- final Queue excludedQueueHub = hubSession.createQueue("exclude.test.foo");
- final TextMessage excludedMsgHub = hubSession.createTextMessage();
+ Queue excludedQueueHub = hubSession.createQueue("exclude.test.foo");
+ TextMessage excludedMsgHub = hubSession.createTextMessage();
excludedMsgHub.setText(excludedQueueHub.toString());
- final Queue includedQueueHub = hubSession.createQueue("include.test.foo");
+ Queue includedQueueHub = hubSession.createQueue("include.test.foo");
- final TextMessage includedMsgHub = hubSession.createTextMessage();
- excludedMsgHub.setText(includedQueueHub.toString());
+ TextMessage includedMsgHub = hubSession.createTextMessage();
+ includedMsgHub.setText(includedQueueHub.toString());
// Sending from Hub queue
hubProducer.send(excludedQueueHub, excludedMsgHub);
hubProducer.send(includedQueueHub, includedMsgHub);
- final ConnectionFactory cfSpoke = new ActiveMQConnectionFactory(
- "tcp://localhost:62001");
- final Connection spokeConnection = cfSpoke.createConnection();
- spokeConnection.start();
- final Session spokeSession = spokeConnection.createSession(false,
- Session.AUTO_ACKNOWLEDGE);
- final Queue excludedQueueSpoke = spokeSession.createQueue("exclude.test.foo");
- final MessageConsumer excludedConsumerSpoke = spokeSession
- .createConsumer(excludedQueueSpoke);
-
- final Queue includedQueueSpoke = spokeSession.createQueue("include.test.foo");
- final MessageConsumer includedConsumerSpoke = spokeSession
- .createConsumer(includedQueueSpoke);
+
+ Queue excludedQueueSpoke = spokeSession.createQueue("exclude.test.foo");
+ MessageConsumer excludedConsumerSpoke = spokeSession.createConsumer(excludedQueueSpoke);
+
+ Thread.sleep(100);
+
+ Queue includedQueueSpoke = spokeSession.createQueue("include.test.foo");
+ MessageConsumer includedConsumerSpoke = spokeSession.createConsumer(includedQueueSpoke);
// Receiving from excluded Spoke queue
Message msg = excludedConsumerSpoke.receive(200);
@@ -102,19 +129,16 @@
// Receiving from included Spoke queue
msg = includedConsumerSpoke.receive(200);
- assertEquals(msg, includedMsgHub);
+ assertEquals(includedMsgHub, msg);
+
+ // we should be able to receive excluded queue message on Hub
+ MessageConsumer excludedConsumerHub = hubSession.createConsumer(excludedQueueHub);
+ msg = excludedConsumerHub.receive(200);;
+ assertEquals(excludedMsgHub, msg);
- excludedConsumerSpoke.close();
- hubSession.close();
- hubConnection.stop();
- hubConnection.close();
hubProducer.close();
- spokeSession.close();
- spokeConnection.stop();
- spokeConnection.close();
-
- senderBroker.stop();
- receiverBroker.stop();
+ excludedConsumerSpoke.close();
}
+
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/ReflectionSupportTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/ReflectionSupportTest.java?rev=763993&r1=763992&r2=763993&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/ReflectionSupportTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/ReflectionSupportTest.java Fri Apr 10 17:58:32 2009
@@ -18,11 +18,29 @@
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
import junit.framework.TestCase;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+
public class ReflectionSupportTest extends TestCase {
+
+ List<ActiveMQDestination> favorites = new ArrayList<ActiveMQDestination>();
+ String favoritesString = "[queue://test, topic://test]";
+ List<ActiveMQDestination> nonFavorites = new ArrayList<ActiveMQDestination>();
+ String nonFavoritesString = "[topic://test1]";
+
+ public void setUp() {
+ favorites.add(new ActiveMQQueue("test"));
+ favorites.add(new ActiveMQTopic("test"));
+ nonFavorites.add(new ActiveMQTopic("test1"));
+ }
public void testSetProperties() throws URISyntaxException {
SimplePojo pojo = new SimplePojo();
@@ -30,7 +48,10 @@
map.put("age", "27");
map.put("name", "Hiram");
map.put("enabled", "true");
- map.put("uri", "test://value");
+ map.put("uri", "test://value");
+ map.put("favorites", favoritesString);
+ map.put("nonFavorites", nonFavoritesString);
+ map.put("others", null);
IntrospectionSupport.setProperties(pojo, map);
@@ -38,5 +59,29 @@
assertEquals("Hiram", pojo.getName());
assertEquals(true, pojo.isEnabled());
assertEquals(new URI("test://value"), pojo.getUri());
+ assertEquals(favorites, pojo.getFavorites());
+ assertEquals(nonFavorites, pojo.getNonFavorites());
+ assertNull(pojo.getOthers());
+ }
+
+ public void testGetProperties() {
+ SimplePojo pojo = new SimplePojo();
+ pojo.setAge(31);
+ pojo.setName("Dejan");
+ pojo.setEnabled(true);
+ pojo.setFavorites(favorites);
+ pojo.setNonFavorites(nonFavorites);
+ pojo.setOthers(null);
+
+ Properties props = new Properties();
+
+ IntrospectionSupport.getProperties(pojo, props, null);
+
+ assertEquals("Dejan", props.get("name"));
+ assertEquals("31", props.get("age"));
+ assertEquals("True", props.get("enabled"));
+ assertEquals(favoritesString, props.get("favorites"));
+ assertEquals(nonFavoritesString, props.get("nonFavorites"));
+ assertNull(props.get("others"));
}
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SimplePojo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SimplePojo.java?rev=763993&r1=763992&r2=763993&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SimplePojo.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SimplePojo.java Fri Apr 10 17:58:32 2009
@@ -17,6 +17,10 @@
package org.apache.activemq.util;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.command.ActiveMQDestination;
public class SimplePojo {
@@ -24,6 +28,9 @@
int age;
boolean enabled;
URI uri;
+ List<ActiveMQDestination> favorites = new ArrayList<ActiveMQDestination>();
+ List<ActiveMQDestination> nonFavorites = new ArrayList<ActiveMQDestination>();
+ List<ActiveMQDestination> others = new ArrayList<ActiveMQDestination>();
public int getAge() {
return age;
@@ -49,5 +56,23 @@
public void setUri(URI uri) {
this.uri = uri;
}
+ public List<ActiveMQDestination> getFavorites() {
+ return favorites;
+ }
+ public void setFavorites(List<ActiveMQDestination> favorites) {
+ this.favorites = favorites;
+ }
+ public List<ActiveMQDestination> getNonFavorites() {
+ return nonFavorites;
+ }
+ public void setNonFavorites(List<ActiveMQDestination> nonFavorites) {
+ this.nonFavorites = nonFavorites;
+ }
+ public List<ActiveMQDestination> getOthers() {
+ return others;
+ }
+ public void setOthers(List<ActiveMQDestination> others) {
+ this.others = others;
+ }
}