You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/04/10 19:31:40 UTC
svn commit: r763983 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/command/
main/java/org/apache/activemq/network/ test/java/org/apache/activemq/
test/java/org/apache/activemq/usecases/
Author: gtully
Date: Fri Apr 10 17:31:40 2009
New Revision: 763983
URL: http://svn.apache.org/viewvc?rev=763983&view=rev
Log:
some additional tests for AMQ-2198|https://issues.apache.org/activemq/browse/AMQ-2198, also additional logging
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=763983&r1=763982&r2=763983&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Fri Apr 10 17:31:40 2009
@@ -113,7 +113,7 @@
}
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
- LOG.debug("Adding destination: " + destination);
+ LOG.debug(broker.getBrokerName() + " adding destination: " + destination);
synchronized (destinationsMutex) {
Destination dest = destinations.get(destination);
if (dest == null) {
@@ -216,7 +216,7 @@
}
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
- LOG.debug("Adding consumer: " + info.getConsumerId());
+ LOG.debug(broker.getBrokerName() + " adding consumer: " + info.getConsumerId() + " for destination: " + info.getDestination());
ActiveMQDestination destination = info.getDestination();
if (destination != null && !destination.isPattern() && !destination.isComposite()) {
// lets auto-create the destination
@@ -308,7 +308,7 @@
}
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
- LOG.debug("Removing consumer: " + info.getConsumerId());
+ LOG.debug(broker.getBrokerName() + " removing consumer: " + info.getConsumerId() + " for destination: " + info.getDestination());
Subscription sub = subscriptions.remove(info.getConsumerId());
//The sub could be removed elsewhere - see ConnectionSplitBroker
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java?rev=763983&r1=763982&r2=763983&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java Fri Apr 10 17:31:40 2009
@@ -17,6 +17,7 @@
package org.apache.activemq.command;
import java.io.IOException;
+import java.util.Arrays;
import javax.jms.JMSException;
@@ -75,7 +76,9 @@
if (contains(message.getBrokerPath(), networkBrokerId)) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Message all ready routed once through this broker - ignoring: " + message);
+ LOG.trace("Message all ready routed once through this broker ("
+ + networkBrokerId + "), path: "
+ + Arrays.toString(message.getBrokerPath()) + " - ignoring: " + message);
}
return false;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java?rev=763983&r1=763982&r2=763983&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java Fri Apr 10 17:31:40 2009
@@ -64,7 +64,7 @@
DemandSubscription ds = (DemandSubscription)i.next();
if (filter.matches(ds.getLocalInfo().getDestination())) {
if (LOG.isDebugEnabled()) {
- LOG.debug(configuration.getBrokerName() + " matched exsting sub (add interest) for : " + ds.getRemoteInfo()
+ LOG.debug(configuration.getBrokerName() + " matched (add interest) to exsting sub for: " + ds.getRemoteInfo()
+ " with sub: " + info);
}
// add the interest in the subscription
@@ -84,18 +84,17 @@
for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) {
DemandSubscription ds = (DemandSubscription)i.next();
- ds.remove(id);
- if (ds.isEmpty()) {
- tmpList.add(ds);
- } else {
+ if (ds.remove(id)) {
if (LOG.isDebugEnabled()) {
- LOG.debug(configuration.getBrokerName() + " removing interest in sub on " + localBroker + " from " + remoteBrokerName + " : " + ds.getRemoteInfo());
+ LOG.debug(configuration.getBrokerName() + " removing interest in sub on " + localBroker + " from " + remoteBrokerName + " : sub: " + id + " existing matched sub: " + ds.getRemoteInfo());
}
}
+ if (ds.isEmpty()) {
+ tmpList.add(ds);
+ }
}
for (Iterator<DemandSubscription> i = tmpList.iterator(); i.hasNext();) {
DemandSubscription ds = i.next();
- subscriptionMapByLocalId.remove(ds.getRemoteInfo().getConsumerId());
removeSubscription(ds);
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " removing sub on " + localBroker + " from " + remoteBrokerName + " : " + ds.getRemoteInfo());
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=763983&r1=763982&r2=763983&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri Apr 10 17:31:40 2009
@@ -617,6 +617,9 @@
protected void removeSubscription(DemandSubscription sub) throws IOException {
if (sub != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId());
+ }
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=763983&r1=763982&r2=763983&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java Fri Apr 10 17:31:40 2009
@@ -113,12 +113,16 @@
}
- // This will interconnect all brokes using multicast
+ // This will interconnect all brokers using multicast
protected void bridgeAllBrokers() throws Exception {
- bridgeAllBrokers("default", 1, false);
+ bridgeAllBrokers("default", 1, false, false);
}
-
+
protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs) throws Exception {
+ bridgeAllBrokers(groupName, ttl, suppressduplicateQueueSubs, false);
+ }
+
+ protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs, boolean decreasePriority) throws Exception {
Collection<BrokerItem> brokerList = brokers.values();
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
BrokerService broker = i.next().broker;
@@ -134,6 +138,7 @@
NetworkConnector nc = broker.addNetworkConnector("multicast://default?group=" + groupName);
nc.setNetworkTTL(ttl);
nc.setSuppressDuplicateQueueSubscriptions(suppressduplicateQueueSubs);
+ nc.setDecreaseNetworkConsumerPriority(decreasePriority);
}
// Multicasting may take longer to setup
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java?rev=763983&r1=763982&r2=763983&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java Fri Apr 10 17:31:40 2009
@@ -20,6 +20,8 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
@@ -30,11 +32,14 @@
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.MessageIdList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* @version $Revision: 1.1.1.1 $
*/
public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
+ private static final Log LOG = LogFactory.getLog(ThreeBrokerQueueNetworkTest.class);
protected static final int MESSAGE_COUNT = 100;
/**
@@ -243,9 +248,6 @@
assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount());
}
- /**
- * BrokerA <-> BrokerB <-> BrokerC
- */
public void testAllConnectedUsingMulticast() throws Exception {
// Setup broker networks
bridgeAllBrokers();
@@ -276,6 +278,156 @@
assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount());
}
+
+ public void testAllConnectedUsingMulticastProducerConsumerOnA() throws Exception {
+ bridgeAllBrokers("default", 3, false);
+ startAllBrokers();
+
+ // Setup destination
+ Destination dest = createDestination("TEST.FOO", false);
+
+ // Setup consumers
+ int messageCount = 2000;
+ CountDownLatch messagesReceived = new CountDownLatch(messageCount);
+ MessageConsumer clientA = createConsumer("BrokerA", dest, messagesReceived);
+
+ // Let's try to wait for advisory percolation.
+ Thread.sleep(1000);
+
+ // Send messages
+ sendMessages("BrokerA", dest, messageCount);
+
+ assertTrue(messagesReceived.await(30, TimeUnit.SECONDS));
+
+ // Get message count
+ MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+ assertEquals(messageCount, msgsA.getMessageCount());
+ }
+
+ public void testAllConnectedWithSpare() throws Exception {
+ bridgeAllBrokers("default", 3, false);
+ startAllBrokers();
+
+ // Setup destination
+ Destination dest = createDestination("TEST.FOO", false);
+
+ // Setup consumers
+ int messageCount = 2000;
+ CountDownLatch messagesReceived = new CountDownLatch(messageCount);
+ MessageConsumer clientA = createConsumer("BrokerA", dest, messagesReceived);
+
+ // ensure advisory percolation.
+ Thread.sleep(1000);
+
+ // Send messages
+ sendMessages("BrokerB", dest, messageCount);
+
+ assertTrue(messagesReceived.await(30, TimeUnit.SECONDS));
+
+ // Get message count
+ MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+ assertEquals(messageCount, msgsA.getMessageCount());
+ }
+
+ public void testMigrateConsumerStuckMessages() throws Exception {
+ boolean suppressQueueDuplicateSubscriptions = false;
+ bridgeAllBrokers("default", 3, suppressQueueDuplicateSubscriptions);
+ startAllBrokers();
+
+ // Setup destination
+ Destination dest = createDestination("TEST.FOO", false);
+
+ // Setup consumers
+ LOG.info("Consumer on A");
+ MessageConsumer clientA = createConsumer("BrokerA", dest);
+
+ // ensure advisors have percolated
+ Thread.sleep(500);
+
+ LOG.info("Consumer on B");
+ int messageCount = 2000;
+
+ // will only get half of the messages
+ CountDownLatch messagesReceived = new CountDownLatch(messageCount/2);
+ MessageConsumer clientB = createConsumer("BrokerB", dest, messagesReceived);
+
+ // ensure advisors have percolated
+ Thread.sleep(500);
+
+ LOG.info("Close consumer on A");
+ clientA.close();
+
+ // ensure advisors have percolated
+ Thread.sleep(500);
+
+ LOG.info("Send to B");
+ sendMessages("BrokerB", dest, messageCount);
+
+ // Let's try to wait for any messages.
+ assertTrue(messagesReceived.await(30, TimeUnit.SECONDS));
+
+ // Get message count
+ MessageIdList msgs = getConsumerMessages("BrokerB", clientB);
+
+ // see will any more arrive
+ Thread.sleep(500);
+ assertEquals(messageCount/2, msgs.getMessageCount());
+
+ // pick up the stuck messages
+ messagesReceived = new CountDownLatch(messageCount/2);
+ clientA = createConsumer("BrokerA", dest, messagesReceived);
+ // Let's try to wait for any messages.
+ assertTrue(messagesReceived.await(30, TimeUnit.SECONDS));
+
+ msgs = getConsumerMessages("BrokerA", clientA);
+ assertEquals(messageCount/2, msgs.getMessageCount());
+ }
+
+ // use case: for maintenance, migrate consumers and producers from one
+ // node in the network to another so node can be replaced/updated
+ public void testMigrateConsumer() throws Exception {
+ boolean suppressQueueDuplicateSubscriptions = true;
+ boolean decreaseNetworkConsumerPriority = true;
+ bridgeAllBrokers("default", 3, suppressQueueDuplicateSubscriptions, decreaseNetworkConsumerPriority);
+ startAllBrokers();
+
+ // Setup destination
+ Destination dest = createDestination("TEST.FOO", false);
+
+ // Setup consumers
+ LOG.info("Consumer on A");
+ MessageConsumer clientA = createConsumer("BrokerA", dest);
+
+ // ensure advisors have percolated
+ Thread.sleep(500);
+
+ LOG.info("Consumer on B");
+ int messageCount = 2000;
+ CountDownLatch messagesReceived = new CountDownLatch(messageCount);
+ MessageConsumer clientB = createConsumer("BrokerB", dest, messagesReceived);
+
+ // make the consumer slow so that any network consumer has a chance, even
+ // if it has a lower priority
+ MessageIdList msgs = getConsumerMessages("BrokerB", clientB);
+ msgs.setProcessingDelay(10);
+
+ // ensure advisors have percolated
+ Thread.sleep(500);
+
+ LOG.info("Close consumer on A");
+ clientA.close();
+
+ // ensure advisors have percolated
+ Thread.sleep(500);
+
+ LOG.info("Send to B");
+ sendMessages("BrokerB", dest, messageCount);
+
+ // Let's try to wait for any messages.
+ assertTrue(messagesReceived.await(30, TimeUnit.SECONDS));
+ assertEquals(messageCount, msgs.getMessageCount());
+ }
+
public void testNoDuplicateQueueSubs() throws Exception {
bridgeAllBrokers("default", 3, true);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java?rev=763983&r1=763982&r2=763983&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java Fri Apr 10 17:31:40 2009
@@ -98,6 +98,40 @@
assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
}
+
+ /**
+ * BrokerA -> BrokerB && BrokerB -> BrokerA
+ */
+ public void testDuplexStaticRemoteBrokerHasNoConsumer() throws Exception {
+ // Setup broker networks
+ boolean dynamicOnly = true;
+ int networkTTL = 2;
+ boolean conduit = true;
+ bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL, conduit);
+ bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL, conduit);
+
+ startAllBrokers();
+
+ // Setup destination
+ Destination dest = createDestination("TEST.FOO", false);
+
+ // Setup consumers
+ MessageConsumer clientA = createConsumer("BrokerA", dest);
+
+ Thread.sleep(2*1000);
+
+ int messageCount = 2000;
+ // Send messages
+ sendMessages("BrokerA", dest, messageCount);
+
+ // Get message count
+ MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+
+ msgsA.waitForMessagesToArrive(messageCount);
+
+ assertEquals(messageCount, msgsA.getMessageCount());
+
+ }
public void setUp() throws Exception {
super.setAutoFail(true);