You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/02/11 14:07:05 UTC

svn commit: r743320 - in /activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/network/ src/test/java/org/apache/activemq/ src/test/java/org/apache/activemq/usecases/

Author: dejanb
Date: Wed Feb 11 13:07:04 2009
New Revision: 743320

URL: http://svn.apache.org/viewvc?rev=743320&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-2104

Modified:
    activemq/trunk/activemq-core/pom.xml
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java

Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=743320&r1=743319&r2=743320&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Wed Feb 11 13:07:04 2009
@@ -425,11 +425,7 @@
             <!-- m2 tests failing since move from assembly  -->
             <exclude>**/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.*</exclude>
             <exclude>**/TwoBrokerQueueClientsReconnectTest.*</exclude>
-            <exclude>**/ThreeBrokerQueueNetworkUsingTcpTest.*</exclude>
             <exclude>**/QueueConsumerCloseAndReconnectTest.*</exclude>
-            <exclude>**/ThreeBrokerQueueNetworkTest.*</exclude>
-            <exclude>**/ThreeBrokerTopicNetworkTest.*</exclude>
-            <exclude>**/ThreeBrokerTopicNetworkUsingTcpTest.*</exclude>
             <exclude>**/TwoBrokerMulticastQueueTest.*</exclude>
 
             <!-- TODO move to optional module...  -->

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java?rev=743320&r1=743319&r2=743320&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java Wed Feb 11 13:07:04 2009
@@ -37,7 +37,7 @@
     DemandSubscription(ConsumerInfo info) {
         remoteInfo = info;
         localInfo = info.copy();
-        localInfo.setSelector(null);
+        localInfo.setSelector(info.getSelector());
         localInfo.setBrokerPath(info.getBrokerPath());
         localInfo.setNetworkSubscription(true);
         remoteSubsIds.add(info.getConsumerId());    

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=743320&r1=743319&r2=743320&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java Wed Feb 11 13:07:04 2009
@@ -189,13 +189,21 @@
     }
 
     protected MessageConsumer createConsumer(String brokerName, Destination dest) throws Exception {
-        return createConsumer(brokerName, dest, null);
+        return createConsumer(brokerName, dest, null, null);
     }
 
+    protected MessageConsumer createConsumer(String brokerName, Destination dest, String messageSelector) throws Exception {
+        return createConsumer(brokerName, dest, null, messageSelector);
+    }
+    
     protected MessageConsumer createConsumer(String brokerName, Destination dest, CountDownLatch latch) throws Exception {
+    	return createConsumer(brokerName, dest, latch, null);
+    }
+    
+    protected MessageConsumer createConsumer(String brokerName, Destination dest, CountDownLatch latch, String messageSelector) throws Exception {
         BrokerItem brokerItem = brokers.get(brokerName);
         if (brokerItem != null) {
-            return brokerItem.createConsumer(dest, latch);
+            return brokerItem.createConsumer(dest, latch, messageSelector);
         }
         return null;
     }
@@ -257,6 +265,10 @@
 
 
     protected void sendMessages(String brokerName, Destination destination, int count) throws Exception {
+    	sendMessages(brokerName, destination, count, null);
+    }
+    
+    protected void sendMessages(String brokerName, Destination destination, int count, HashMap<String, Object>properties) throws Exception {
         BrokerItem brokerItem = brokers.get(brokerName);
 
         Connection conn = brokerItem.createConnection();
@@ -268,6 +280,11 @@
 
         for (int i = 0; i < count; i++) {
             TextMessage msg = createTextMessage(sess, conn.getClientID() + ": Message-" + i);
+            if (properties != null) {
+            	for (String propertyName : properties.keySet()) {
+            		msg.setObjectProperty(propertyName, properties.get(propertyName));
+            	}
+            }
             producer.send(msg);
             onSend(i, msg);
         }
@@ -368,22 +385,26 @@
         }
 
         public MessageConsumer createConsumer(Destination dest) throws Exception {
-            return createConsumer(dest, null);
+            return createConsumer(dest, null, null);
+        }
+        
+        public MessageConsumer createConsumer(Destination dest, String messageSelector) throws Exception {
+        	return createConsumer(dest, null, messageSelector);
         }
 
-        public MessageConsumer createConsumer(Destination dest, CountDownLatch latch) throws Exception {
+        public MessageConsumer createConsumer(Destination dest, CountDownLatch latch, String messageSelector) throws Exception {
             Connection c = createConnection();
             c.start();
             Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            return createConsumerWithSession(dest, s, latch);
+            return createConsumerWithSession(dest, s, latch, messageSelector);
         }
 
         public MessageConsumer createConsumerWithSession(Destination dest, Session sess) throws Exception {
-            return createConsumerWithSession(dest, sess, null);
+            return createConsumerWithSession(dest, sess, null, null);
         }
 
-        public MessageConsumer createConsumerWithSession(Destination dest, Session sess, CountDownLatch latch) throws Exception {
-            MessageConsumer client = sess.createConsumer(dest);
+        public MessageConsumer createConsumerWithSession(Destination dest, Session sess, CountDownLatch latch, String messageSelector) throws Exception {
+            MessageConsumer client = sess.createConsumer(dest, messageSelector);
             MessageIdList messageIdList = new MessageIdList();
             messageIdList.setCountDownLatch(latch);
             messageIdList.setParent(allMessages);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java?rev=743320&r1=743319&r2=743320&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java Wed Feb 11 13:07:04 2009
@@ -17,6 +17,7 @@
 package org.apache.activemq.usecases;
 
 import java.net.URI;
+import java.util.HashMap;
 
 import javax.jms.Destination;
 import javax.jms.MessageConsumer;
@@ -87,6 +88,88 @@
         // Total received should be 100
         assertEquals(MESSAGE_COUNT, msgsA.getMessageCount() + msgsC.getMessageCount());
     }
+    
+    /**
+     * BrokerA <- BrokerB -> BrokerC
+     */
+    public void testBAandBCbrokerNetworkWithSelectorsSendFirst() throws Exception {
+    	// Setup broker networks
+        bridgeBrokers("BrokerB", "BrokerA");
+        bridgeBrokers("BrokerB", "BrokerC");
+
+        startAllBrokers();
+
+        // Setup destination
+        Destination dest = createDestination("TEST.FOO", false);
+
+        
+        // Send messages for broker A
+        HashMap<String, Object> props = new HashMap<String, Object>();
+        props.put("broker", "BROKER_A");
+        sendMessages("BrokerB", dest, MESSAGE_COUNT, props);
+
+        //Send messages for broker C
+        props.clear();
+        props.put("broker", "BROKER_C");
+        sendMessages("BrokerB", dest, MESSAGE_COUNT, props);
+        
+        // Setup consumers
+        MessageConsumer clientA = createConsumer("BrokerA", dest, "broker = 'BROKER_A'");
+        MessageConsumer clientC = createConsumer("BrokerC", dest, "broker = 'BROKER_C'");
+        Thread.sleep(2000); //et subscriptions get propagated
+        
+        // Let's try to wait for any messages.
+        //Thread.sleep(1000);
+
+        // Get message count
+        MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+        MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+        
+        // Total received should be 100
+        assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
+        assertEquals(MESSAGE_COUNT, msgsC.getMessageCount());
+    }
+    
+    /**
+     * BrokerA <- BrokerB -> BrokerC
+     */
+    public void testBAandBCbrokerNetworkWithSelectorsSubscribeFirst() throws Exception {
+    	// Setup broker networks
+        bridgeBrokers("BrokerB", "BrokerA");
+        bridgeBrokers("BrokerB", "BrokerC");
+
+        startAllBrokers();
+
+        // Setup destination
+        Destination dest = createDestination("TEST.FOO", false);
+
+        // Setup consumers
+        MessageConsumer clientA = createConsumer("BrokerA", dest, "broker = 'BROKER_A'");
+        MessageConsumer clientC = createConsumer("BrokerC", dest, "broker = 'BROKER_C'");
+        Thread.sleep(2000); //et subscriptions get propagated
+        
+        
+        // Send messages for broker A
+        HashMap<String, Object> props = new HashMap<String, Object>();
+        props.put("broker", "BROKER_A");
+        sendMessages("BrokerB", dest, MESSAGE_COUNT, props);
+
+        //Send messages for broker C
+        props.clear();
+        props.put("broker", "BROKER_C");
+        sendMessages("BrokerB", dest, MESSAGE_COUNT, props);
+        
+        // Let's try to wait for any messages.
+        Thread.sleep(1000);
+
+        // Get message count
+        MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+        MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+        
+        // Total received should be 100
+        assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
+        assertEquals(MESSAGE_COUNT, msgsC.getMessageCount());
+    }    
 
     /**
      * BrokerA -> BrokerB <- BrokerC