You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/04/10 19:58:33 UTC

svn commit: r763993 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/network/ main/java/org/apache/activemq/util/ test/java/org/apache/activemq/usecases/ test/java/org/apache/activemq/util/

Author: dejanb
Date: Fri Apr 10 17:58:32 2009
New Revision: 763993

URL: http://svn.apache.org/viewvc?rev=763993&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-2086 - duplex connector and excluded destinations

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ListEditor.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IntrospectionSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TestBrokerConnectionDuplexExcludedDestinations.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/ReflectionSupportTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SimplePojo.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=763993&r1=763992&r2=763993&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri Apr 10 17:58:32 2009
@@ -433,6 +433,15 @@
                 } else if (command.isBrokerInfo()) {
                     lastConnectSucceeded.set(true);
                     remoteBrokerInfo = (BrokerInfo)command;
+                    Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
+                    try {
+                    	IntrospectionSupport.getProperties(configuration, props, null);
+                    	excludedDestinations = configuration.getExcludedDestinations().toArray(new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
+                    	staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
+                    	dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
+                    } catch (Throwable t) {
+                    	LOG.error("Error mapping remote destinations", t);
+                    }
                     serviceRemoteBrokerInfo(command);
                     // Let the local broker know the remote broker's ID.
                     localBroker.oneway(command);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java?rev=763993&r1=763992&r2=763993&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java Fri Apr 10 17:58:32 2009
@@ -16,6 +16,10 @@
  */
 package org.apache.activemq.network;
 
+import java.util.List;
+
+import org.apache.activemq.command.ActiveMQDestination;
+
 /**
  * Configuration for a NetworkBridge
  * 
@@ -36,8 +40,14 @@
     private String password;
     private String destinationFilter = ">";
     private String name = null;
+    
+    private List<ActiveMQDestination> excludedDestinations;
+    private List<ActiveMQDestination> dynamicallyIncludedDestinations;
+    private List<ActiveMQDestination> staticallyIncludedDestinations;
+
     private boolean suppressDuplicateQueueSubscriptions = false;
 
+
     /**
      * @return the conduitSubscriptions
      */
@@ -224,6 +234,35 @@
         this.name = name;
     }
 
+	public List<ActiveMQDestination> getExcludedDestinations() {
+		return excludedDestinations;
+	}
+
+	public void setExcludedDestinations(
+			List<ActiveMQDestination> excludedDestinations) {
+		this.excludedDestinations = excludedDestinations;
+	}
+
+	public List<ActiveMQDestination> getDynamicallyIncludedDestinations() {
+		return dynamicallyIncludedDestinations;
+	}
+
+	public void setDynamicallyIncludedDestinations(
+			List<ActiveMQDestination> dynamicallyIncludedDestinations) {
+		this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
+	}
+
+	public List<ActiveMQDestination> getStaticallyIncludedDestinations() {
+		return staticallyIncludedDestinations;
+	}
+
+	public void setStaticallyIncludedDestinations(
+			List<ActiveMQDestination> staticallyIncludedDestinations) {
+		this.staticallyIncludedDestinations = staticallyIncludedDestinations;
+	}
+	
+    
+
     public boolean isSuppressDuplicateQueueSubscriptions() {
         return suppressDuplicateQueueSubscriptions;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IntrospectionSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IntrospectionSupport.java?rev=763993&r1=763992&r2=763993&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IntrospectionSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IntrospectionSupport.java Fri Apr 10 17:58:32 2009
@@ -21,8 +21,6 @@
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -32,8 +30,23 @@
 import java.util.Map.Entry;
 
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.commons.lang.ArrayUtils;
+
+
 
 public final class IntrospectionSupport {
+	
+	static {
+		// find Spring and ActiveMQ specific property editors
+		 String[] searchPath = (String[])ArrayUtils.addAll(
+				 PropertyEditorManager.getEditorSearchPath(), 
+				 new String[] {
+					"org.springframework.beans.propertyeditors"
+				  , "org.apache.activemq.util"
+				 }
+			);
+		PropertyEditorManager.setEditorSearchPath(searchPath);
+	}
     
     private IntrospectionSupport() {
     }
@@ -177,27 +190,21 @@
         }
     }
 
-    private static Object convert(Object value, Class type) throws URISyntaxException {
+    private static Object convert(Object value, Class type) {
         PropertyEditor editor = PropertyEditorManager.findEditor(type);
         if (editor != null) {
             editor.setAsText(value.toString());
             return editor.getValue();
         }
-        if (type == URI.class) {
-            return new URI(value.toString());
-        }
         return null;
     }
 
-    private static String convertToString(Object value, Class type) throws URISyntaxException {
+    public static String convertToString(Object value, Class type) {
         PropertyEditor editor = PropertyEditorManager.findEditor(type);
         if (editor != null) {
             editor.setValue(value);
             return editor.getAsText();
         }
-        if (type == URI.class) {
-            return ((URI)value).toString();
-        }
         return null;
     }
 
@@ -219,12 +226,7 @@
         if (PropertyEditorManager.findEditor(clazz) != null) {
             return true;
         }
-        if (clazz == URI.class) {
-            return true;
-        }
-        if (clazz == Boolean.class) {
-            return true;
-        }
+        	
         return false;
     }
 

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ListEditor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ListEditor.java?rev=763993&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ListEditor.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ListEditor.java Fri Apr 10 17:58:32 2009
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.beans.PropertyEditorSupport;
+import java.util.ArrayList;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.springframework.util.StringUtils;
+
+/**
+ * Used to serialize lists of ActiveMQDestinations. 
+ * @see org.apache.activemq.util.IntrospectionSupport
+ */
+public class ListEditor extends PropertyEditorSupport {
+	
+	public static final String DEFAULT_SEPARATOR = ",";
+
+	public String getAsText() {
+		return getValue().toString();
+	}
+
+
+	public void setAsText(String text) throws IllegalArgumentException {
+		text = text.substring(1, text.length() - 1);
+        String[] array = StringUtils.delimitedListToStringArray(text, ListEditor.DEFAULT_SEPARATOR, null);
+        ArrayList<ActiveMQDestination> list = new ArrayList<ActiveMQDestination>();
+		for (String item : array) {
+			list.add(ActiveMQDestination.createDestination(item.trim(), ActiveMQDestination.QUEUE_TYPE));
+		}
+		setValue(list);
+	}
+	
+}

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TestBrokerConnectionDuplexExcludedDestinations.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TestBrokerConnectionDuplexExcludedDestinations.java?rev=763993&r1=763992&r2=763993&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TestBrokerConnectionDuplexExcludedDestinations.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TestBrokerConnectionDuplexExcludedDestinations.java Fri Apr 10 17:58:32 2009
@@ -36,65 +36,92 @@
 import org.apache.activemq.broker.BrokerService;
 
 public class TestBrokerConnectionDuplexExcludedDestinations extends TestCase {
-
-	public void testBrokerConnectionDuplexPropertiesPropagation()
-			throws Exception {
-
+	
+	BrokerService receiverBroker;
+	BrokerService senderBroker;
+	
+	Connection hubConnection;
+	Session hubSession;
+	
+	Connection spokeConnection;
+	Session spokeSession;
+	
+	public void setUp() throws Exception {
 		// Hub broker
 		String configFileName = "org/apache/activemq/usecases/receiver-duplex.xml";
 		URI uri = new URI("xbean:" + configFileName);
-		BrokerService receiverBroker = BrokerFactory.createBroker(uri);
+		receiverBroker = BrokerFactory.createBroker(uri);
 		receiverBroker.setPersistent(false);
 		receiverBroker.setBrokerName("Hub");
 
 		// Spoke broker
 		configFileName = "org/apache/activemq/usecases/sender-duplex.xml";
 		uri = new URI("xbean:" + configFileName);
-		BrokerService senderBroker = BrokerFactory.createBroker(uri);
+		senderBroker = BrokerFactory.createBroker(uri);
 		senderBroker.setPersistent(false);
-		receiverBroker.setBrokerName("Spoke");
+		senderBroker.setBrokerName("Spoke");
 
 		// Start both Hub and Spoke broker
 		receiverBroker.start();
 		senderBroker.start();
+		
+		// create hub session
+		ConnectionFactory cfHub = new ActiveMQConnectionFactory("tcp://localhost:62002");
 
-		final ConnectionFactory cfHub = new ActiveMQConnectionFactory(
-				"tcp://localhost:62002");
-		final Connection hubConnection = cfHub.createConnection();
+		hubConnection = cfHub.createConnection();
 		hubConnection.start();
-		final Session hubSession = hubConnection.createSession(false,
-				Session.AUTO_ACKNOWLEDGE);
-		final MessageProducer hubProducer = hubSession.createProducer(null);
+		hubSession = hubConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		// create spoke session
+		ConnectionFactory cfSpoke = new ActiveMQConnectionFactory("tcp://localhost:62001");
+		spokeConnection = cfSpoke.createConnection();
+		spokeConnection.start();
+		spokeSession = spokeConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+	}
+	
+	public void tearDown() throws Exception {
+		hubSession.close();
+		hubConnection.stop();
+		hubConnection.close();
+		
+		spokeSession.close();
+		spokeConnection.stop();
+		spokeConnection.close();
+
+		senderBroker.stop();
+		receiverBroker.stop();
+	}
+
+	public void testDuplexSendFromHubToSpoke()
+			throws Exception {
+
+		//create hub producer
+		MessageProducer hubProducer = hubSession.createProducer(null);
 		hubProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 		hubProducer.setDisableMessageID(true);
 		hubProducer.setDisableMessageTimestamp(true);
 
-		final Queue excludedQueueHub = hubSession.createQueue("exclude.test.foo");
-		final TextMessage excludedMsgHub = hubSession.createTextMessage();
+		Queue excludedQueueHub = hubSession.createQueue("exclude.test.foo");
+		TextMessage excludedMsgHub = hubSession.createTextMessage();
 		excludedMsgHub.setText(excludedQueueHub.toString());
 		
-		final Queue includedQueueHub = hubSession.createQueue("include.test.foo");
+		Queue includedQueueHub = hubSession.createQueue("include.test.foo");
 
-		final TextMessage includedMsgHub = hubSession.createTextMessage();
-		excludedMsgHub.setText(includedQueueHub.toString());		
+		TextMessage includedMsgHub = hubSession.createTextMessage();
+		includedMsgHub.setText(includedQueueHub.toString());		
 
 		// Sending from Hub queue
 		hubProducer.send(excludedQueueHub, excludedMsgHub);
 		hubProducer.send(includedQueueHub, includedMsgHub);
 
-		final ConnectionFactory cfSpoke = new ActiveMQConnectionFactory(
-				"tcp://localhost:62001");
-		final Connection spokeConnection = cfSpoke.createConnection();
-		spokeConnection.start();
-		final Session spokeSession = spokeConnection.createSession(false,
-				Session.AUTO_ACKNOWLEDGE);
-		final Queue excludedQueueSpoke = spokeSession.createQueue("exclude.test.foo");
-		final MessageConsumer excludedConsumerSpoke = spokeSession
-				.createConsumer(excludedQueueSpoke);
-
-		final Queue includedQueueSpoke = spokeSession.createQueue("include.test.foo");
-		final MessageConsumer includedConsumerSpoke = spokeSession
-				.createConsumer(includedQueueSpoke);		
+
+		Queue excludedQueueSpoke = spokeSession.createQueue("exclude.test.foo");
+		MessageConsumer excludedConsumerSpoke = spokeSession.createConsumer(excludedQueueSpoke);
+		
+		Thread.sleep(100);
+
+	    Queue includedQueueSpoke = spokeSession.createQueue("include.test.foo");
+		MessageConsumer includedConsumerSpoke = spokeSession.createConsumer(includedQueueSpoke);		
 		
 		// Receiving from excluded Spoke queue
 		Message msg = excludedConsumerSpoke.receive(200);
@@ -102,19 +129,16 @@
 		
 		// Receiving from included Spoke queue
 		msg = includedConsumerSpoke.receive(200);
-		assertEquals(msg, includedMsgHub);
+		assertEquals(includedMsgHub, msg);
+		
+		// we should be able to receive excluded queue message on Hub
+		MessageConsumer excludedConsumerHub = hubSession.createConsumer(excludedQueueHub);
+		msg = excludedConsumerHub.receive(200);;
+		assertEquals(excludedMsgHub, msg);
 
-		excludedConsumerSpoke.close();
-		hubSession.close();
-		hubConnection.stop();
-		hubConnection.close();
 		hubProducer.close();
-		spokeSession.close();
-		spokeConnection.stop();
-		spokeConnection.close();
-
-		senderBroker.stop();
-		receiverBroker.stop();
+		excludedConsumerSpoke.close();
 
 	}
+	
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/ReflectionSupportTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/ReflectionSupportTest.java?rev=763993&r1=763992&r2=763993&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/ReflectionSupportTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/ReflectionSupportTest.java Fri Apr 10 17:58:32 2009
@@ -18,11 +18,29 @@
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
 
 import junit.framework.TestCase;
 
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+
 public class ReflectionSupportTest extends TestCase {
+	
+    List<ActiveMQDestination> favorites = new ArrayList<ActiveMQDestination>();
+    String favoritesString = "[queue://test, topic://test]";
+    List<ActiveMQDestination> nonFavorites = new ArrayList<ActiveMQDestination>();
+    String nonFavoritesString = "[topic://test1]";
+    
+    public void setUp() {
+        favorites.add(new ActiveMQQueue("test"));
+        favorites.add(new ActiveMQTopic("test"));
+        nonFavorites.add(new ActiveMQTopic("test1"));
+    }
 
     public void testSetProperties() throws URISyntaxException {
         SimplePojo pojo = new SimplePojo();
@@ -30,7 +48,10 @@
         map.put("age", "27");
         map.put("name", "Hiram");
         map.put("enabled", "true");
-        map.put("uri", "test://value");
+        map.put("uri", "test://value");        
+        map.put("favorites", favoritesString);
+        map.put("nonFavorites", nonFavoritesString);
+        map.put("others", null);
         
         IntrospectionSupport.setProperties(pojo, map);
         
@@ -38,5 +59,29 @@
         assertEquals("Hiram", pojo.getName());
         assertEquals(true, pojo.isEnabled());
         assertEquals(new URI("test://value"), pojo.getUri());
+        assertEquals(favorites, pojo.getFavorites());
+        assertEquals(nonFavorites, pojo.getNonFavorites());
+        assertNull(pojo.getOthers());
+    }
+    
+    public void testGetProperties() {
+    	SimplePojo pojo = new SimplePojo();
+    	pojo.setAge(31);
+    	pojo.setName("Dejan");
+    	pojo.setEnabled(true);
+    	pojo.setFavorites(favorites);
+    	pojo.setNonFavorites(nonFavorites);
+    	pojo.setOthers(null);
+    	
+    	Properties props = new Properties();
+    	
+    	IntrospectionSupport.getProperties(pojo, props, null);
+    	
+    	assertEquals("Dejan", props.get("name"));
+    	assertEquals("31", props.get("age"));
+    	assertEquals("True", props.get("enabled"));
+    	assertEquals(favoritesString, props.get("favorites"));
+    	assertEquals(nonFavoritesString, props.get("nonFavorites"));
+    	assertNull(props.get("others"));
     }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SimplePojo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SimplePojo.java?rev=763993&r1=763992&r2=763993&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SimplePojo.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SimplePojo.java Fri Apr 10 17:58:32 2009
@@ -17,6 +17,10 @@
 package org.apache.activemq.util;
 
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.command.ActiveMQDestination;
 
 public class SimplePojo {
 
@@ -24,6 +28,9 @@
     int age;
     boolean enabled;
     URI uri;
+    List<ActiveMQDestination> favorites = new ArrayList<ActiveMQDestination>();
+    List<ActiveMQDestination> nonFavorites = new ArrayList<ActiveMQDestination>();
+    List<ActiveMQDestination> others = new ArrayList<ActiveMQDestination>();
     
     public int getAge() {
         return age;
@@ -49,5 +56,23 @@
     public void setUri(URI uri) {
         this.uri = uri;
     }
+	public List<ActiveMQDestination> getFavorites() {
+		return favorites;
+	}
+	public void setFavorites(List<ActiveMQDestination> favorites) {
+		this.favorites = favorites;
+	}
+	public List<ActiveMQDestination> getNonFavorites() {
+		return nonFavorites;
+	}
+	public void setNonFavorites(List<ActiveMQDestination> nonFavorites) {
+		this.nonFavorites = nonFavorites;
+	}
+	public List<ActiveMQDestination> getOthers() {
+		return others;
+	}
+	public void setOthers(List<ActiveMQDestination> others) {
+		this.others = others;
+	}
     
 }