You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/04/10 19:31:40 UTC

svn commit: r763983 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/command/ main/java/org/apache/activemq/network/ test/java/org/apache/activemq/ test/java/org/apache/activemq/usecases/

Author: gtully
Date: Fri Apr 10 17:31:40 2009
New Revision: 763983

URL: http://svn.apache.org/viewvc?rev=763983&view=rev
Log:
some additional tests for AMQ-2198|https://issues.apache.org/activemq/browse/AMQ-2198, also additional logging

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=763983&r1=763982&r2=763983&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Fri Apr 10 17:31:40 2009
@@ -113,7 +113,7 @@
     }
 
     public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
-        LOG.debug("Adding destination: " + destination);
+        LOG.debug(broker.getBrokerName() + " adding destination: " + destination);
         synchronized (destinationsMutex) {
             Destination dest = destinations.get(destination);
             if (dest == null) {
@@ -216,7 +216,7 @@
     }
 
     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
-        LOG.debug("Adding consumer: " + info.getConsumerId());
+        LOG.debug(broker.getBrokerName() + " adding consumer: " + info.getConsumerId() + " for destination: " + info.getDestination());
         ActiveMQDestination destination = info.getDestination();
         if (destination != null && !destination.isPattern() && !destination.isComposite()) {
             // lets auto-create the destination
@@ -308,7 +308,7 @@
     }
 
     public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
-        LOG.debug("Removing consumer: " + info.getConsumerId());
+        LOG.debug(broker.getBrokerName() + " removing consumer: " + info.getConsumerId() + " for destination: " + info.getDestination());
 
         Subscription sub = subscriptions.remove(info.getConsumerId());
         //The sub could be removed elsewhere - see ConnectionSplitBroker

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java?rev=763983&r1=763982&r2=763983&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java Fri Apr 10 17:31:40 2009
@@ -17,6 +17,7 @@
 package org.apache.activemq.command;
 
 import java.io.IOException;
+import java.util.Arrays;
 
 import javax.jms.JMSException;
 
@@ -75,7 +76,9 @@
 
         if (contains(message.getBrokerPath(), networkBrokerId)) {
             if (LOG.isTraceEnabled()) {
-                LOG.trace("Message all ready routed once through this broker - ignoring: " + message);
+                LOG.trace("Message all ready routed once through this broker ("
+                        + networkBrokerId + "), path: "
+                        + Arrays.toString(message.getBrokerPath()) + " - ignoring: " + message);
             }
             return false;
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java?rev=763983&r1=763982&r2=763983&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java Fri Apr 10 17:31:40 2009
@@ -64,7 +64,7 @@
             DemandSubscription ds = (DemandSubscription)i.next();
             if (filter.matches(ds.getLocalInfo().getDestination())) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug(configuration.getBrokerName() + " matched exsting sub (add interest) for : " + ds.getRemoteInfo()
+                    LOG.debug(configuration.getBrokerName() + " matched (add interest) to exsting sub for: " + ds.getRemoteInfo()
                             + " with sub: " + info);
                 }
                 // add the interest in the subscription
@@ -84,18 +84,17 @@
 
         for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) {
             DemandSubscription ds = (DemandSubscription)i.next();
-            ds.remove(id);
-            if (ds.isEmpty()) {
-                tmpList.add(ds);
-            } else {
+            if (ds.remove(id)) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug(configuration.getBrokerName() + " removing interest in sub on " + localBroker + " from " + remoteBrokerName + " :  " + ds.getRemoteInfo());
+                    LOG.debug(configuration.getBrokerName() + " removing interest in sub on " + localBroker + " from " + remoteBrokerName + " : sub: " + id  + " existing matched sub: " + ds.getRemoteInfo());
                 }
             }
+            if (ds.isEmpty()) {
+                tmpList.add(ds);
+            }
         }
         for (Iterator<DemandSubscription> i = tmpList.iterator(); i.hasNext();) {
             DemandSubscription ds = i.next();
-            subscriptionMapByLocalId.remove(ds.getRemoteInfo().getConsumerId());
             removeSubscription(ds);
             if (LOG.isDebugEnabled()) {
                 LOG.debug(configuration.getBrokerName() + " removing sub on " + localBroker + " from " + remoteBrokerName + " :  " + ds.getRemoteInfo());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=763983&r1=763982&r2=763983&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri Apr 10 17:31:40 2009
@@ -617,6 +617,9 @@
 
     protected void removeSubscription(DemandSubscription sub) throws IOException {
         if (sub != null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId());
+            }
             subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
             localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
         }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=763983&r1=763982&r2=763983&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java Fri Apr 10 17:31:40 2009
@@ -113,12 +113,16 @@
 
     }
 
-    // This will interconnect all brokes using multicast
+    // This will interconnect all brokers using multicast
     protected void bridgeAllBrokers() throws Exception {
-        bridgeAllBrokers("default", 1, false);
+        bridgeAllBrokers("default", 1, false, false);
     }
-
+    
     protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs) throws Exception {
+        bridgeAllBrokers(groupName, ttl, suppressduplicateQueueSubs, false);
+    }
+
+    protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs, boolean decreasePriority) throws Exception {
         Collection<BrokerItem> brokerList = brokers.values();
         for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
             BrokerService broker = i.next().broker;
@@ -134,6 +138,7 @@
             NetworkConnector nc = broker.addNetworkConnector("multicast://default?group=" + groupName);
             nc.setNetworkTTL(ttl);
             nc.setSuppressDuplicateQueueSubscriptions(suppressduplicateQueueSubs);
+            nc.setDecreaseNetworkConsumerPriority(decreasePriority);
         }
 
         // Multicasting may take longer to setup

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java?rev=763983&r1=763982&r2=763983&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java Fri Apr 10 17:31:40 2009
@@ -20,6 +20,8 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Destination;
 import javax.jms.MessageConsumer;
@@ -30,11 +32,14 @@
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.util.MessageIdList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * @version $Revision: 1.1.1.1 $
  */
 public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
+    private static final Log LOG = LogFactory.getLog(ThreeBrokerQueueNetworkTest.class);
     protected static final int MESSAGE_COUNT = 100;
 
     /**
@@ -243,9 +248,6 @@
         assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount());
     }
 
-    /**
-     * BrokerA <-> BrokerB <-> BrokerC
-     */
     public void testAllConnectedUsingMulticast() throws Exception {
         // Setup broker networks
         bridgeAllBrokers();
@@ -276,6 +278,156 @@
         assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount());
     }
 
+    
+    public void testAllConnectedUsingMulticastProducerConsumerOnA() throws Exception {
+        bridgeAllBrokers("default", 3, false);
+        startAllBrokers();
+
+        // Setup destination
+        Destination dest = createDestination("TEST.FOO", false);
+
+        // Setup consumers
+        int messageCount = 2000;
+        CountDownLatch messagesReceived = new CountDownLatch(messageCount);
+        MessageConsumer clientA = createConsumer("BrokerA", dest, messagesReceived);
+
+        // Let's try to wait for advisory percolation.
+        Thread.sleep(1000);
+
+        // Send messages
+        sendMessages("BrokerA", dest, messageCount);
+     
+        assertTrue(messagesReceived.await(30, TimeUnit.SECONDS));
+        
+        // Get message count
+        MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+        assertEquals(messageCount, msgsA.getMessageCount());
+    }
+
+    public void testAllConnectedWithSpare() throws Exception {
+        bridgeAllBrokers("default", 3, false);
+        startAllBrokers();
+
+        // Setup destination
+        Destination dest = createDestination("TEST.FOO", false);
+
+        // Setup consumers
+        int messageCount = 2000;
+        CountDownLatch messagesReceived = new CountDownLatch(messageCount);
+        MessageConsumer clientA = createConsumer("BrokerA", dest, messagesReceived);
+
+        // ensure advisory percolation.
+        Thread.sleep(1000);
+
+        // Send messages
+        sendMessages("BrokerB", dest, messageCount);
+
+        assertTrue(messagesReceived.await(30, TimeUnit.SECONDS));
+        
+        // Get message count
+        MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+        assertEquals(messageCount, msgsA.getMessageCount());
+    }
+    
+    public void testMigrateConsumerStuckMessages() throws Exception {
+        boolean suppressQueueDuplicateSubscriptions = false;
+        bridgeAllBrokers("default", 3, suppressQueueDuplicateSubscriptions);
+        startAllBrokers();
+
+        // Setup destination
+        Destination dest = createDestination("TEST.FOO", false);    
+        
+        // Setup consumers
+        LOG.info("Consumer on A");
+        MessageConsumer clientA = createConsumer("BrokerA", dest);
+        
+        // ensure advisors have percolated
+        Thread.sleep(500);
+        
+        LOG.info("Consumer on B");
+        int messageCount = 2000;
+        
+        // will only get half of the messages
+        CountDownLatch messagesReceived = new CountDownLatch(messageCount/2);
+        MessageConsumer clientB = createConsumer("BrokerB", dest, messagesReceived);
+          
+        // ensure advisors have percolated
+        Thread.sleep(500);
+
+        LOG.info("Close consumer on A");
+        clientA.close();
+
+        // ensure advisors have percolated
+        Thread.sleep(500);
+       
+        LOG.info("Send to B"); 
+        sendMessages("BrokerB", dest, messageCount);
+
+        // Let's try to wait for any messages.
+        assertTrue(messagesReceived.await(30, TimeUnit.SECONDS));
+
+        // Get message count
+        MessageIdList msgs = getConsumerMessages("BrokerB", clientB);
+        
+        // see will any more arrive
+        Thread.sleep(500);        
+        assertEquals(messageCount/2, msgs.getMessageCount());
+        
+        // pick up the stuck messages
+        messagesReceived = new CountDownLatch(messageCount/2);
+        clientA = createConsumer("BrokerA", dest, messagesReceived);
+        // Let's try to wait for any messages.
+        assertTrue(messagesReceived.await(30, TimeUnit.SECONDS));
+        
+        msgs = getConsumerMessages("BrokerA", clientA);
+        assertEquals(messageCount/2, msgs.getMessageCount());
+    }
+
+    // use case: for maintenance, migrate consumers and producers from one
+    // node in the network to another so node can be replaced/updated
+    public void testMigrateConsumer() throws Exception {
+        boolean suppressQueueDuplicateSubscriptions = true;
+        boolean decreaseNetworkConsumerPriority = true;
+        bridgeAllBrokers("default", 3, suppressQueueDuplicateSubscriptions, decreaseNetworkConsumerPriority);
+        startAllBrokers();
+
+        // Setup destination
+        Destination dest = createDestination("TEST.FOO", false);    
+        
+        // Setup consumers
+        LOG.info("Consumer on A");
+        MessageConsumer clientA = createConsumer("BrokerA", dest);
+        
+        // ensure advisors have percolated
+        Thread.sleep(500);
+        
+        LOG.info("Consumer on B");
+        int messageCount = 2000;
+        CountDownLatch messagesReceived = new CountDownLatch(messageCount);
+        MessageConsumer clientB = createConsumer("BrokerB", dest, messagesReceived);
+       
+        // make the consumer slow so that any network consumer has a chance, even
+        // if it has a lower priority
+        MessageIdList msgs = getConsumerMessages("BrokerB", clientB);
+        msgs.setProcessingDelay(10);
+        
+        // ensure advisors have percolated
+        Thread.sleep(500);
+
+        LOG.info("Close consumer on A");
+        clientA.close();
+
+        // ensure advisors have percolated
+        Thread.sleep(500);
+       
+        LOG.info("Send to B"); 
+        sendMessages("BrokerB", dest, messageCount);
+
+        // Let's try to wait for any messages.
+        assertTrue(messagesReceived.await(30, TimeUnit.SECONDS));
+        assertEquals(messageCount, msgs.getMessageCount());      
+    }
+
     public void testNoDuplicateQueueSubs() throws Exception {
         
         bridgeAllBrokers("default", 3, true);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java?rev=763983&r1=763982&r2=763983&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java Fri Apr 10 17:31:40 2009
@@ -98,6 +98,40 @@
         assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
 
     }
+    
+    /**
+     * BrokerA -> BrokerB && BrokerB -> BrokerA
+     */
+    public void testDuplexStaticRemoteBrokerHasNoConsumer() throws Exception {
+        // Setup broker networks
+        boolean dynamicOnly = true;
+        int networkTTL = 2;
+        boolean conduit = true;
+        bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL, conduit);
+        bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL, conduit);
+
+        startAllBrokers();
+
+        // Setup destination
+        Destination dest = createDestination("TEST.FOO", false);
+
+        // Setup consumers
+        MessageConsumer clientA = createConsumer("BrokerA", dest);
+
+        Thread.sleep(2*1000);
+        
+        int messageCount = 2000;
+        // Send messages
+        sendMessages("BrokerA", dest, messageCount);
+
+        // Get message count
+        MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+
+        msgsA.waitForMessagesToArrive(messageCount);
+
+        assertEquals(messageCount, msgsA.getMessageCount());
+
+    }
 
     public void setUp() throws Exception {
         super.setAutoFail(true);