You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/10/20 19:35:04 UTC
svn commit: r1186952 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/
test/java/org/apache/activemq/usecases/
Author: gtully
Date: Thu Oct 20 17:35:04 2011
New Revision: 1186952
URL: http://svn.apache.org/viewvc?rev=1186952&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3551 - exclude networkConnectors from sendFailIfNoSpace on producer flow control, with test for topic and queue networks
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1186952&r1=1186951&r2=1186952&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Thu Oct 20 17:35:04 2011
@@ -601,11 +601,11 @@ public abstract class BaseDestination im
}
protected final void waitForSpace(ConnectionContext context, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
- if (systemUsage.isSendFailIfNoSpace()) {
+ if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
getLog().debug("sendFailIfNoSpace, forcing exception on send, usage: " + usage + ": " + warning);
throw new ResourceAllocationException(warning);
}
- if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
+ if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) {
getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: " + usage + ": " + warning);
throw new ResourceAllocationException(warning);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1186952&r1=1186951&r2=1186952&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Oct 20 17:35:04 2011
@@ -555,7 +555,7 @@ public class Queue extends BaseDestinati
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
}
- if (systemUsage.isSendFailIfNoSpace()) {
+ if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer ("
+ message.getProducerId() + ") to prevent flooding "
+ getActiveMQDestination().getQualifiedName() + "."
@@ -613,7 +613,7 @@ public class Queue extends BaseDestinati
}
});
- if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
+ if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage
.getSendFailIfNoSpaceAfterTimeout()));
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1186952&r1=1186951&r2=1186952&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Thu Oct 20 17:35:04 2011
@@ -298,7 +298,7 @@ public class Topic extends BaseDestinati
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
}
- if (systemUsage.isSendFailIfNoSpace()) {
+ if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("
+ memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId()
+ ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
@@ -427,7 +427,7 @@ public class Topic extends BaseDestinati
+ systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
+ ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
- if (systemUsage.isSendFailIfNoSpace()) {
+ if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
throw new javax.jms.ResourceAllocationException(logMessage);
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java?rev=1186952&r1=1186951&r2=1186952&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java Thu Oct 20 17:35:04 2011
@@ -20,6 +20,7 @@ package org.apache.activemq.usecases;
import java.net.URI;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.MessageConsumer;
import junit.framework.Test;
@@ -27,7 +28,9 @@ import org.apache.activemq.JmsMultipleBr
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.MessageIdList;
import org.apache.commons.logging.Log;
@@ -99,15 +102,6 @@ public class NetworkBridgeProducerFlowCo
private static final Log LOG = LogFactory
.getLog(NetworkBridgeProducerFlowControlTest.class);
- // Consumer prefetch is disabled for broker1's consumers.
- private static final ActiveMQQueue SLOW_SHARED_QUEUE = new ActiveMQQueue(
- NetworkBridgeProducerFlowControlTest.class.getSimpleName()
- + ".slow.shared?consumer.prefetchSize=1");
-
- private static final ActiveMQQueue FAST_SHARED_QUEUE = new ActiveMQQueue(
- NetworkBridgeProducerFlowControlTest.class.getSimpleName()
- + ".fast.shared?consumer.prefetchSize=1");
-
// Combo flag set to true/false by the test framework.
public boolean persistentTestMessages;
public boolean networkIsAlwaysSendSync;
@@ -146,6 +140,15 @@ public class NetworkBridgeProducerFlowCo
final long TEST_MESSAGE_SIZE = 1024;
final long SLOW_CONSUMER_DELAY_MILLIS = 100;
+ // Consumer prefetch is disabled for broker1's consumers.
+ final ActiveMQQueue SLOW_SHARED_QUEUE = new ActiveMQQueue(
+ NetworkBridgeProducerFlowControlTest.class.getSimpleName()
+ + ".slow.shared?consumer.prefetchSize=1");
+
+ final ActiveMQQueue FAST_SHARED_QUEUE = new ActiveMQQueue(
+ NetworkBridgeProducerFlowControlTest.class.getSimpleName()
+ + ".fast.shared?consumer.prefetchSize=1");
+
// Start a local and a remote broker.
createBroker(new URI("broker:(tcp://localhost:0"
+ ")?brokerName=broker0&persistent=false&useJmx=true"));
@@ -246,4 +249,139 @@ public class NetworkBridgeProducerFlowCo
fastConsumerTime.get() < slowConsumerTime.get() / 10);
}
}
+
+ public void testSendFailIfNoSpaceDoesNotBlockQueueNetwork() throws Exception {
+ // Consumer prefetch is disabled for broker1's consumers.
+ final ActiveMQQueue SLOW_SHARED_QUEUE = new ActiveMQQueue(
+ NetworkBridgeProducerFlowControlTest.class.getSimpleName()
+ + ".slow.shared?consumer.prefetchSize=1");
+
+ final ActiveMQQueue FAST_SHARED_QUEUE = new ActiveMQQueue(
+ NetworkBridgeProducerFlowControlTest.class.getSimpleName()
+ + ".fast.shared?consumer.prefetchSize=1");
+
+ doTestSendFailIfNoSpaceDoesNotBlockNetwork(
+ SLOW_SHARED_QUEUE,
+ FAST_SHARED_QUEUE);
+ }
+
+ public void testSendFailIfNoSpaceDoesNotBlockTopicNetwork() throws Exception {
+ // Consumer prefetch is disabled for broker1's consumers.
+ final ActiveMQTopic SLOW_SHARED_TOPIC = new ActiveMQTopic(
+ NetworkBridgeProducerFlowControlTest.class.getSimpleName()
+ + ".slow.shared?consumer.prefetchSize=1");
+
+ final ActiveMQTopic FAST_SHARED_TOPIC = new ActiveMQTopic(
+ NetworkBridgeProducerFlowControlTest.class.getSimpleName()
+ + ".fast.shared?consumer.prefetchSize=1");
+
+ doTestSendFailIfNoSpaceDoesNotBlockNetwork(
+ SLOW_SHARED_TOPIC,
+ FAST_SHARED_TOPIC);
+ }
+
+ public void doTestSendFailIfNoSpaceDoesNotBlockNetwork(
+ ActiveMQDestination slowDestination, ActiveMQDestination fastDestination) throws Exception {
+
+ final int NUM_MESSAGES = 100;
+ final long TEST_MESSAGE_SIZE = 1024;
+ final long SLOW_CONSUMER_DELAY_MILLIS = 100;
+
+ // Start a local and a remote broker.
+ createBroker(new URI("broker:(tcp://localhost:0"
+ + ")?brokerName=broker0&persistent=false&useJmx=true"));
+ BrokerService remoteBroker = createBroker(new URI(
+ "broker:(tcp://localhost:0"
+ + ")?brokerName=broker1&persistent=false&useJmx=true"));
+ remoteBroker.getSystemUsage().setSendFailIfNoSpace(true);
+
+ // Set a policy on the remote broker that limits the maximum size of the
+ // slow shared queue.
+ PolicyEntry policyEntry = new PolicyEntry();
+ policyEntry.setMemoryLimit(5 * TEST_MESSAGE_SIZE);
+ PolicyMap policyMap = new PolicyMap();
+ policyMap.put(slowDestination, policyEntry);
+ remoteBroker.setDestinationPolicy(policyMap);
+
+ // Create an outbound bridge from the local broker to the remote broker.
+ // The bridge is configured with the remoteDispatchType enhancement.
+ NetworkConnector nc = bridgeBrokers("broker0", "broker1");
+ nc.setAlwaysSyncSend(true);
+ nc.setPrefetchSize(1);
+
+ startAllBrokers();
+ waitForBridgeFormation();
+
+ // Start two asynchronous consumers on the remote broker, one for each
+ // of the two shared queues, and keep track of how long it takes for
+ // each of the consumers to receive all the messages.
+ final CountDownLatch fastConsumerLatch = new CountDownLatch(
+ NUM_MESSAGES);
+ final CountDownLatch slowConsumerLatch = new CountDownLatch(
+ NUM_MESSAGES);
+
+ final long startTimeMillis = System.currentTimeMillis();
+ final AtomicLong fastConsumerTime = new AtomicLong();
+ final AtomicLong slowConsumerTime = new AtomicLong();
+
+ Thread fastWaitThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ fastConsumerLatch.await();
+ fastConsumerTime.set(System.currentTimeMillis()
+ - startTimeMillis);
+ } catch (InterruptedException ex) {
+ exceptions.add(ex);
+ Assert.fail(ex.getMessage());
+ }
+ }
+ };
+
+ Thread slowWaitThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ slowConsumerLatch.await();
+ slowConsumerTime.set(System.currentTimeMillis()
+ - startTimeMillis);
+ } catch (InterruptedException ex) {
+ exceptions.add(ex);
+ Assert.fail(ex.getMessage());
+ }
+ }
+ };
+
+ fastWaitThread.start();
+ slowWaitThread.start();
+
+ createConsumer("broker1", fastDestination, fastConsumerLatch);
+ MessageConsumer slowConsumer = createConsumer("broker1",
+ slowDestination, slowConsumerLatch);
+ MessageIdList messageIdList = brokers.get("broker1").consumers
+ .get(slowConsumer);
+ messageIdList.setProcessingDelay(SLOW_CONSUMER_DELAY_MILLIS);
+
+ // Send the test messages to the local broker's shared queues. The
+ // messages are either persistent or non-persistent to demonstrate the
+ // difference between synchronous and asynchronous dispatch.
+ persistentDelivery = false;
+ sendMessages("broker0", fastDestination, NUM_MESSAGES);
+ sendMessages("broker0", slowDestination, NUM_MESSAGES);
+
+ fastWaitThread.join(TimeUnit.SECONDS.toMillis(60));
+ slowWaitThread.join(TimeUnit.SECONDS.toMillis(60));
+
+ assertTrue("no exceptions on the wait threads:" + exceptions,
+ exceptions.isEmpty());
+
+ LOG.info("Fast consumer duration (ms): " + fastConsumerTime.get());
+ LOG.info("Slow consumer duration (ms): " + slowConsumerTime.get());
+
+ assertTrue("fast time set", fastConsumerTime.get() > 0);
+ assertTrue("slow time set", slowConsumerTime.get() > 0);
+
+ // Verify the behaviour as described in the description of this class.
+ Assert.assertTrue(fastConsumerTime.get() < slowConsumerTime.get() / 10);
+ }
}
\ No newline at end of file