You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/10/20 19:35:04 UTC

svn commit: r1186952 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/usecases/

Author: gtully
Date: Thu Oct 20 17:35:04 2011
New Revision: 1186952

URL: http://svn.apache.org/viewvc?rev=1186952&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3551 - exclude networkConnectors from sendFailIfNoSpace on producer flow control, with test for topic and queue networks

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1186952&r1=1186951&r2=1186952&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Thu Oct 20 17:35:04 2011
@@ -601,11 +601,11 @@ public abstract class BaseDestination im
     }
 
     protected final void waitForSpace(ConnectionContext context, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
-        if (systemUsage.isSendFailIfNoSpace()) {
+        if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
             getLog().debug("sendFailIfNoSpace, forcing exception on send, usage:  " + usage + ": " + warning);
             throw new ResourceAllocationException(warning);
         }
-        if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
+        if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
             if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) {
                 getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: " + usage + ": " + warning);
                 throw new ResourceAllocationException(warning);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1186952&r1=1186951&r2=1186952&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Oct 20 17:35:04 2011
@@ -555,7 +555,7 @@ public class Queue extends BaseDestinati
                                     + " See http://activemq.apache.org/producer-flow-control.html for more info");
                 }
 
-                if (systemUsage.isSendFailIfNoSpace()) {
+                if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
                     throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer ("
                             + message.getProducerId() + ") to prevent flooding "
                             + getActiveMQDestination().getQualifiedName() + "."
@@ -613,7 +613,7 @@ public class Queue extends BaseDestinati
                             }
                         });
 
-                        if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
+                        if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
                             flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage
                                     .getSendFailIfNoSpaceAfterTimeout()));
                         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1186952&r1=1186951&r2=1186952&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Thu Oct 20 17:35:04 2011
@@ -298,7 +298,7 @@ public class Topic extends BaseDestinati
                                     + " See http://activemq.apache.org/producer-flow-control.html for more info");
                 }
 
-                if (systemUsage.isSendFailIfNoSpace()) {
+                if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
                     throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("
                             + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId()
                             + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
@@ -427,7 +427,7 @@ public class Topic extends BaseDestinati
                         + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
                         + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
                         + " See http://activemq.apache.org/producer-flow-control.html for more info";
-                if (systemUsage.isSendFailIfNoSpace()) {
+                if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
                     throw new javax.jms.ResourceAllocationException(logMessage);
                 }
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java?rev=1186952&r1=1186951&r2=1186952&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java Thu Oct 20 17:35:04 2011
@@ -20,6 +20,7 @@ package org.apache.activemq.usecases;
 import java.net.URI;
 import java.util.Vector;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.jms.MessageConsumer;
 import junit.framework.Test;
@@ -27,7 +28,9 @@ import org.apache.activemq.JmsMultipleBr
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.network.NetworkConnector;
 import org.apache.activemq.util.MessageIdList;
 import org.apache.commons.logging.Log;
@@ -99,15 +102,6 @@ public class NetworkBridgeProducerFlowCo
     private static final Log LOG = LogFactory
             .getLog(NetworkBridgeProducerFlowControlTest.class);
 
-    // Consumer prefetch is disabled for broker1's consumers.
-    private static final ActiveMQQueue SLOW_SHARED_QUEUE = new ActiveMQQueue(
-            NetworkBridgeProducerFlowControlTest.class.getSimpleName()
-                    + ".slow.shared?consumer.prefetchSize=1");
-
-    private static final ActiveMQQueue FAST_SHARED_QUEUE = new ActiveMQQueue(
-            NetworkBridgeProducerFlowControlTest.class.getSimpleName()
-                    + ".fast.shared?consumer.prefetchSize=1");
-
     // Combo flag set to true/false by the test framework.
     public boolean persistentTestMessages;
     public boolean networkIsAlwaysSendSync;
@@ -146,6 +140,15 @@ public class NetworkBridgeProducerFlowCo
         final long TEST_MESSAGE_SIZE = 1024;
         final long SLOW_CONSUMER_DELAY_MILLIS = 100;
 
+        // Consumer prefetch is disabled for broker1's consumers.
+        final ActiveMQQueue SLOW_SHARED_QUEUE = new ActiveMQQueue(
+            NetworkBridgeProducerFlowControlTest.class.getSimpleName()
+                    + ".slow.shared?consumer.prefetchSize=1");
+
+        final ActiveMQQueue FAST_SHARED_QUEUE = new ActiveMQQueue(
+            NetworkBridgeProducerFlowControlTest.class.getSimpleName()
+                    + ".fast.shared?consumer.prefetchSize=1");
+
         // Start a local and a remote broker.
         createBroker(new URI("broker:(tcp://localhost:0"
                 + ")?brokerName=broker0&persistent=false&useJmx=true"));
@@ -246,4 +249,139 @@ public class NetworkBridgeProducerFlowCo
                     fastConsumerTime.get() < slowConsumerTime.get() / 10);
         }
     }
+
+    public void testSendFailIfNoSpaceDoesNotBlockQueueNetwork() throws Exception {
+        // Consumer prefetch is disabled for broker1's consumers.
+        final ActiveMQQueue SLOW_SHARED_QUEUE = new ActiveMQQueue(
+            NetworkBridgeProducerFlowControlTest.class.getSimpleName()
+                    + ".slow.shared?consumer.prefetchSize=1");
+
+        final ActiveMQQueue FAST_SHARED_QUEUE = new ActiveMQQueue(
+            NetworkBridgeProducerFlowControlTest.class.getSimpleName()
+                    + ".fast.shared?consumer.prefetchSize=1");
+
+        doTestSendFailIfNoSpaceDoesNotBlockNetwork(
+                SLOW_SHARED_QUEUE,
+                FAST_SHARED_QUEUE);
+    }
+
+    public void testSendFailIfNoSpaceDoesNotBlockTopicNetwork() throws Exception {
+        // Consumer prefetch is disabled for broker1's consumers.
+        final ActiveMQTopic SLOW_SHARED_TOPIC = new ActiveMQTopic(
+            NetworkBridgeProducerFlowControlTest.class.getSimpleName()
+                    + ".slow.shared?consumer.prefetchSize=1");
+
+        final ActiveMQTopic FAST_SHARED_TOPIC = new ActiveMQTopic(
+            NetworkBridgeProducerFlowControlTest.class.getSimpleName()
+                    + ".fast.shared?consumer.prefetchSize=1");
+
+        doTestSendFailIfNoSpaceDoesNotBlockNetwork(
+                SLOW_SHARED_TOPIC,
+                FAST_SHARED_TOPIC);
+    }
+
+    public void doTestSendFailIfNoSpaceDoesNotBlockNetwork(
+            ActiveMQDestination slowDestination, ActiveMQDestination fastDestination) throws Exception {
+
+        final int NUM_MESSAGES = 100;
+        final long TEST_MESSAGE_SIZE = 1024;
+        final long SLOW_CONSUMER_DELAY_MILLIS = 100;
+
+        // Start a local and a remote broker.
+        createBroker(new URI("broker:(tcp://localhost:0"
+                + ")?brokerName=broker0&persistent=false&useJmx=true"));
+        BrokerService remoteBroker = createBroker(new URI(
+                "broker:(tcp://localhost:0"
+                        + ")?brokerName=broker1&persistent=false&useJmx=true"));
+        remoteBroker.getSystemUsage().setSendFailIfNoSpace(true);
+
+        // Set a policy on the remote broker that limits the maximum size of the
+        // slow shared queue.
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setMemoryLimit(5 * TEST_MESSAGE_SIZE);
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.put(slowDestination, policyEntry);
+        remoteBroker.setDestinationPolicy(policyMap);
+
+        // Create an outbound bridge from the local broker to the remote broker.
+        // The bridge is configured with the remoteDispatchType enhancement.
+        NetworkConnector nc = bridgeBrokers("broker0", "broker1");
+        nc.setAlwaysSyncSend(true);
+        nc.setPrefetchSize(1);
+
+        startAllBrokers();
+        waitForBridgeFormation();
+
+        // Start two asynchronous consumers on the remote broker, one for each
+        // of the two shared queues, and keep track of how long it takes for
+        // each of the consumers to receive all the messages.
+        final CountDownLatch fastConsumerLatch = new CountDownLatch(
+                NUM_MESSAGES);
+        final CountDownLatch slowConsumerLatch = new CountDownLatch(
+                NUM_MESSAGES);
+
+        final long startTimeMillis = System.currentTimeMillis();
+        final AtomicLong fastConsumerTime = new AtomicLong();
+        final AtomicLong slowConsumerTime = new AtomicLong();
+
+        Thread fastWaitThread = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    fastConsumerLatch.await();
+                    fastConsumerTime.set(System.currentTimeMillis()
+                            - startTimeMillis);
+                } catch (InterruptedException ex) {
+                    exceptions.add(ex);
+                    Assert.fail(ex.getMessage());
+                }
+            }
+        };
+
+        Thread slowWaitThread = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    slowConsumerLatch.await();
+                    slowConsumerTime.set(System.currentTimeMillis()
+                            - startTimeMillis);
+                } catch (InterruptedException ex) {
+                    exceptions.add(ex);
+                    Assert.fail(ex.getMessage());
+                }
+            }
+        };
+
+        fastWaitThread.start();
+        slowWaitThread.start();
+
+        createConsumer("broker1", fastDestination, fastConsumerLatch);
+        MessageConsumer slowConsumer = createConsumer("broker1",
+                slowDestination, slowConsumerLatch);
+        MessageIdList messageIdList = brokers.get("broker1").consumers
+                .get(slowConsumer);
+        messageIdList.setProcessingDelay(SLOW_CONSUMER_DELAY_MILLIS);
+
+        // Send the test messages to the local broker's shared queues. The
+        // messages are either persistent or non-persistent to demonstrate the
+        // difference between synchronous and asynchronous dispatch.
+        persistentDelivery = false;
+        sendMessages("broker0", fastDestination, NUM_MESSAGES);
+        sendMessages("broker0", slowDestination, NUM_MESSAGES);
+
+        fastWaitThread.join(TimeUnit.SECONDS.toMillis(60));
+        slowWaitThread.join(TimeUnit.SECONDS.toMillis(60));
+
+        assertTrue("no exceptions on the wait threads:" + exceptions,
+                exceptions.isEmpty());
+
+        LOG.info("Fast consumer duration (ms): " + fastConsumerTime.get());
+        LOG.info("Slow consumer duration (ms): " + slowConsumerTime.get());
+
+        assertTrue("fast time set", fastConsumerTime.get() > 0);
+        assertTrue("slow time set", slowConsumerTime.get() > 0);
+
+        // Verify the behaviour as described in the description of this class.
+        Assert.assertTrue(fastConsumerTime.get() < slowConsumerTime.get() / 10);
+    }
 }
\ No newline at end of file