You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/02/11 14:07:05 UTC
svn commit: r743320 - in /activemq/trunk/activemq-core: ./
src/main/java/org/apache/activemq/network/ src/test/java/org/apache/activemq/
src/test/java/org/apache/activemq/usecases/
Author: dejanb
Date: Wed Feb 11 13:07:04 2009
New Revision: 743320
URL: http://svn.apache.org/viewvc?rev=743320&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-2104
Modified:
activemq/trunk/activemq-core/pom.xml
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=743320&r1=743319&r2=743320&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Wed Feb 11 13:07:04 2009
@@ -425,11 +425,7 @@
<!-- m2 tests failing since move from assembly -->
<exclude>**/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.*</exclude>
<exclude>**/TwoBrokerQueueClientsReconnectTest.*</exclude>
- <exclude>**/ThreeBrokerQueueNetworkUsingTcpTest.*</exclude>
<exclude>**/QueueConsumerCloseAndReconnectTest.*</exclude>
- <exclude>**/ThreeBrokerQueueNetworkTest.*</exclude>
- <exclude>**/ThreeBrokerTopicNetworkTest.*</exclude>
- <exclude>**/ThreeBrokerTopicNetworkUsingTcpTest.*</exclude>
<exclude>**/TwoBrokerMulticastQueueTest.*</exclude>
<!-- TODO move to optional module... -->
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java?rev=743320&r1=743319&r2=743320&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java Wed Feb 11 13:07:04 2009
@@ -37,7 +37,7 @@
DemandSubscription(ConsumerInfo info) {
remoteInfo = info;
localInfo = info.copy();
- localInfo.setSelector(null);
+ localInfo.setSelector(info.getSelector());
localInfo.setBrokerPath(info.getBrokerPath());
localInfo.setNetworkSubscription(true);
remoteSubsIds.add(info.getConsumerId());
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=743320&r1=743319&r2=743320&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java Wed Feb 11 13:07:04 2009
@@ -189,13 +189,21 @@
}
protected MessageConsumer createConsumer(String brokerName, Destination dest) throws Exception {
- return createConsumer(brokerName, dest, null);
+ return createConsumer(brokerName, dest, null, null);
}
+ protected MessageConsumer createConsumer(String brokerName, Destination dest, String messageSelector) throws Exception {
+ return createConsumer(brokerName, dest, null, messageSelector);
+ }
+
protected MessageConsumer createConsumer(String brokerName, Destination dest, CountDownLatch latch) throws Exception {
+ return createConsumer(brokerName, dest, latch, null);
+ }
+
+ protected MessageConsumer createConsumer(String brokerName, Destination dest, CountDownLatch latch, String messageSelector) throws Exception {
BrokerItem brokerItem = brokers.get(brokerName);
if (brokerItem != null) {
- return brokerItem.createConsumer(dest, latch);
+ return brokerItem.createConsumer(dest, latch, messageSelector);
}
return null;
}
@@ -257,6 +265,10 @@
protected void sendMessages(String brokerName, Destination destination, int count) throws Exception {
+ sendMessages(brokerName, destination, count, null);
+ }
+
+ protected void sendMessages(String brokerName, Destination destination, int count, HashMap<String, Object>properties) throws Exception {
BrokerItem brokerItem = brokers.get(brokerName);
Connection conn = brokerItem.createConnection();
@@ -268,6 +280,11 @@
for (int i = 0; i < count; i++) {
TextMessage msg = createTextMessage(sess, conn.getClientID() + ": Message-" + i);
+ if (properties != null) {
+ for (String propertyName : properties.keySet()) {
+ msg.setObjectProperty(propertyName, properties.get(propertyName));
+ }
+ }
producer.send(msg);
onSend(i, msg);
}
@@ -368,22 +385,26 @@
}
public MessageConsumer createConsumer(Destination dest) throws Exception {
- return createConsumer(dest, null);
+ return createConsumer(dest, null, null);
+ }
+
+ public MessageConsumer createConsumer(Destination dest, String messageSelector) throws Exception {
+ return createConsumer(dest, null, messageSelector);
}
- public MessageConsumer createConsumer(Destination dest, CountDownLatch latch) throws Exception {
+ public MessageConsumer createConsumer(Destination dest, CountDownLatch latch, String messageSelector) throws Exception {
Connection c = createConnection();
c.start();
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
- return createConsumerWithSession(dest, s, latch);
+ return createConsumerWithSession(dest, s, latch, messageSelector);
}
public MessageConsumer createConsumerWithSession(Destination dest, Session sess) throws Exception {
- return createConsumerWithSession(dest, sess, null);
+ return createConsumerWithSession(dest, sess, null, null);
}
- public MessageConsumer createConsumerWithSession(Destination dest, Session sess, CountDownLatch latch) throws Exception {
- MessageConsumer client = sess.createConsumer(dest);
+ public MessageConsumer createConsumerWithSession(Destination dest, Session sess, CountDownLatch latch, String messageSelector) throws Exception {
+ MessageConsumer client = sess.createConsumer(dest, messageSelector);
MessageIdList messageIdList = new MessageIdList();
messageIdList.setCountDownLatch(latch);
messageIdList.setParent(allMessages);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java?rev=743320&r1=743319&r2=743320&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java Wed Feb 11 13:07:04 2009
@@ -17,6 +17,7 @@
package org.apache.activemq.usecases;
import java.net.URI;
+import java.util.HashMap;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
@@ -87,6 +88,88 @@
// Total received should be 100
assertEquals(MESSAGE_COUNT, msgsA.getMessageCount() + msgsC.getMessageCount());
}
+
+ /**
+ * BrokerA <- BrokerB -> BrokerC
+ */
+ public void testBAandBCbrokerNetworkWithSelectorsSendFirst() throws Exception {
+ // Setup broker networks
+ bridgeBrokers("BrokerB", "BrokerA");
+ bridgeBrokers("BrokerB", "BrokerC");
+
+ startAllBrokers();
+
+ // Setup destination
+ Destination dest = createDestination("TEST.FOO", false);
+
+
+ // Send messages for broker A
+ HashMap<String, Object> props = new HashMap<String, Object>();
+ props.put("broker", "BROKER_A");
+ sendMessages("BrokerB", dest, MESSAGE_COUNT, props);
+
+ //Send messages for broker C
+ props.clear();
+ props.put("broker", "BROKER_C");
+ sendMessages("BrokerB", dest, MESSAGE_COUNT, props);
+
+ // Setup consumers
+ MessageConsumer clientA = createConsumer("BrokerA", dest, "broker = 'BROKER_A'");
+ MessageConsumer clientC = createConsumer("BrokerC", dest, "broker = 'BROKER_C'");
+ Thread.sleep(2000); //et subscriptions get propagated
+
+ // Let's try to wait for any messages.
+ //Thread.sleep(1000);
+
+ // Get message count
+ MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+ MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+
+ // Total received should be 100
+ assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
+ assertEquals(MESSAGE_COUNT, msgsC.getMessageCount());
+ }
+
+ /**
+ * BrokerA <- BrokerB -> BrokerC
+ */
+ public void testBAandBCbrokerNetworkWithSelectorsSubscribeFirst() throws Exception {
+ // Setup broker networks
+ bridgeBrokers("BrokerB", "BrokerA");
+ bridgeBrokers("BrokerB", "BrokerC");
+
+ startAllBrokers();
+
+ // Setup destination
+ Destination dest = createDestination("TEST.FOO", false);
+
+ // Setup consumers
+ MessageConsumer clientA = createConsumer("BrokerA", dest, "broker = 'BROKER_A'");
+ MessageConsumer clientC = createConsumer("BrokerC", dest, "broker = 'BROKER_C'");
+ Thread.sleep(2000); //et subscriptions get propagated
+
+
+ // Send messages for broker A
+ HashMap<String, Object> props = new HashMap<String, Object>();
+ props.put("broker", "BROKER_A");
+ sendMessages("BrokerB", dest, MESSAGE_COUNT, props);
+
+ //Send messages for broker C
+ props.clear();
+ props.put("broker", "BROKER_C");
+ sendMessages("BrokerB", dest, MESSAGE_COUNT, props);
+
+ // Let's try to wait for any messages.
+ Thread.sleep(1000);
+
+ // Get message count
+ MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+ MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+
+ // Total received should be 100
+ assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
+ assertEquals(MESSAGE_COUNT, msgsC.getMessageCount());
+ }
/**
* BrokerA -> BrokerB <- BrokerC