You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/07/12 13:58:06 UTC
svn commit: r1360642 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/broker/region/policy/ main/java/org/ap...
Author: gtully
Date: Thu Jul 12 11:58:05 2012
New Revision: 1360642
URL: http://svn.apache.org/viewvc?rev=1360642&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3903 - Failed to fire fast producer advisory, reason: java.lang.NullPointerException. A generic producer does not contain a destination, so it must be obtained from the exchange. Modified the boker interface to reflect that. fixed up typo in the policy entry, advisoryForFastProducers now correctly spelled in favour of advisdoryForFastProducers
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3903Test.java
- copied, changed from r1360614, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AdvisoryTopicCleanUpTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Thu Jul 12 11:58:05 2012
@@ -362,11 +362,11 @@ public class AdvisoryBroker extends Brok
}
@Override
- public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
- super.fastProducer(context, producerInfo);
+ public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) {
+ super.fastProducer(context, producerInfo, destination);
try {
- if (!AdvisorySupport.isAdvisoryTopic(producerInfo.getDestination())) {
- ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(producerInfo.getDestination());
+ if (!AdvisorySupport.isAdvisoryTopic(destination)) {
+ ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(destination);
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID, producerInfo.getProducerId().toString());
fireAdvisory(context, topic, producerInfo, null, advisoryMessage);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Thu Jul 12 11:58:05 2012
@@ -361,8 +361,9 @@ public interface Broker extends Region,
* Called to notify a producer is too fast
* @param context
* @param producerInfo
+ * @param destination
*/
- void fastProducer(ConnectionContext context,ProducerInfo producerInfo);
+ void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination);
/**
* Called when a Usage reaches a limit
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java Thu Jul 12 11:58:05 2012
@@ -271,8 +271,8 @@ public class BrokerFilter implements Bro
}
- public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
- next.fastProducer(context, producerInfo);
+ public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) {
+ next.fastProducer(context, producerInfo, destination);
}
public void isFull(ConnectionContext context,Destination destination, Usage usage) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java Thu Jul 12 11:58:05 2012
@@ -263,7 +263,7 @@ public class EmptyBroker implements Brok
return -1l;
}
- public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
+ public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) {
}
public void isFull(ConnectionContext context, Destination destination,Usage usage) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java Thu Jul 12 11:58:05 2012
@@ -273,7 +273,7 @@ public class ErrorBroker implements Brok
throw new BrokerStoppedException(this.message);
}
- public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
+ public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) {
throw new BrokerStoppedException(this.message);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java Thu Jul 12 11:58:05 2012
@@ -282,8 +282,8 @@ public class MutableBrokerFilter impleme
return getNext().getBrokerSequenceId();
}
- public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
- getNext().fastProducer(context, producerInfo);
+ public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) {
+ getNext().fastProducer(context, producerInfo, destination);
}
public void isFull(ConnectionContext context,Destination destination, Usage usage) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Thu Jul 12 11:58:05 2012
@@ -76,7 +76,7 @@ public abstract class BaseDestination im
private int minimumMessageSize = 1024;
private boolean lazyDispatch = false;
private boolean advisoryForSlowConsumers;
- private boolean advisdoryForFastProducers;
+ private boolean advisoryForFastProducers;
private boolean advisoryForDiscardingMessages;
private boolean advisoryWhenFull;
private boolean advisoryForDelivery;
@@ -407,15 +407,15 @@ public abstract class BaseDestination im
/**
* @return the advisdoryForFastProducers
*/
- public boolean isAdvisdoryForFastProducers() {
- return advisdoryForFastProducers;
+ public boolean isAdvisoryForFastProducers() {
+ return advisoryForFastProducers;
}
/**
- * @param advisdoryForFastProducers the advisdoryForFastProducers to set
+ * @param advisoryForFastProducers the advisdoryForFastProducers to set
*/
- public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) {
- this.advisdoryForFastProducers = advisdoryForFastProducers;
+ public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) {
+ this.advisoryForFastProducers = advisoryForFastProducers;
}
public boolean isSendAdvisoryIfNoConsumers() {
@@ -509,8 +509,8 @@ public abstract class BaseDestination im
* @param producerInfo
*/
public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
- if (advisdoryForFastProducers) {
- broker.fastProducer(context, producerInfo);
+ if (advisoryForFastProducers) {
+ broker.fastProducer(context, producerInfo, getActiveMQDestination());
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Thu Jul 12 11:58:05 2012
@@ -74,7 +74,7 @@ public class PolicyEntry extends Destina
private int timeBeforeDispatchStarts = 0;
private int consumersBeforeDispatchStarts = 0;
private boolean advisoryForSlowConsumers;
- private boolean advisdoryForFastProducers;
+ private boolean advisoryForFastProducers;
private boolean advisoryForDiscardingMessages;
private boolean advisoryWhenFull;
private boolean advisoryForDelivery;
@@ -159,7 +159,7 @@ public class PolicyEntry extends Destina
destination.setAdvisoryForDelivery(isAdvisoryForDelivery());
destination.setAdvisoryForDiscardingMessages(isAdvisoryForDiscardingMessages());
destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers());
- destination.setAdvisdoryForFastProducers(isAdvisdoryForFastProducers());
+ destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers());
destination.setAdvisoryWhenFull(isAdvisoryWhenFull());
destination.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
destination.setExpireMessagesPeriod(getExpireMessagesPeriod());
@@ -661,15 +661,15 @@ public class PolicyEntry extends Destina
/**
* @return the advisdoryForFastProducers
*/
- public boolean isAdvisdoryForFastProducers() {
- return advisdoryForFastProducers;
+ public boolean isAdvisoryForFastProducers() {
+ return advisoryForFastProducers;
}
/**
- * @param advisdoryForFastProducers the advisdoryForFastProducers to set
+ * @param advisoryForFastProducers the advisdoryForFastProducers to set
*/
- public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) {
- this.advisdoryForFastProducers = advisdoryForFastProducers;
+ public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) {
+ this.advisoryForFastProducers = advisoryForFastProducers;
}
public void setMaxExpirePageSize(int maxExpirePageSize) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java Thu Jul 12 11:58:05 2012
@@ -510,11 +510,11 @@ public class LoggingBrokerPlugin extends
}
@Override
- public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
+ public void fastProducer(ConnectionContext context, ProducerInfo producerInfo,ActiveMQDestination destination) {
if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
LOG.info("Fast Producer : " + producerInfo);
}
- super.fastProducer(context, producerInfo);
+ super.fastProducer(context, producerInfo, destination);
}
@Override
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java Thu Jul 12 11:58:05 2012
@@ -200,7 +200,7 @@ public class AdvisoryTempDestinationTest
private PolicyEntry createPolicyEntry(ConstantPendingMessageLimitStrategy strategy) {
PolicyEntry policy = new PolicyEntry();
- policy.setAdvisdoryForFastProducers(true);
+ policy.setAdvisoryForFastProducers(true);
policy.setAdvisoryForConsumed(true);
policy.setAdvisoryForDelivery(true);
policy.setAdvisoryForDiscardingMessages(true);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java Thu Jul 12 11:58:05 2012
@@ -210,7 +210,7 @@ public class AdvisoryTests extends TestC
protected void configureBroker(BrokerService answer) throws Exception {
answer.setPersistent(false);
PolicyEntry policy = new PolicyEntry();
- policy.setAdvisdoryForFastProducers(true);
+ policy.setAdvisoryForFastProducers(true);
policy.setAdvisoryForConsumed(true);
policy.setAdvisoryForDelivery(true);
policy.setAdvisoryForDiscardingMessages(true);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java Thu Jul 12 11:58:05 2012
@@ -130,7 +130,7 @@ public class AMQ3324Test {
entry.setInactiveTimoutBeforeGC(2000);
entry.setProducerFlowControl(true);
entry.setAdvisoryForConsumed(true);
- entry.setAdvisdoryForFastProducers(true);
+ entry.setAdvisoryForFastProducers(true);
entry.setAdvisoryForDelivery(true);
PolicyMap map = new PolicyMap();
map.setDefaultEntry(entry);
Copied: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3903Test.java (from r1360614, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3903Test.java?p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3903Test.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java&r1=1360614&r2=1360642&rev=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3903Test.java Thu Jul 12 11:58:05 2012
@@ -16,18 +16,16 @@
*/
package org.apache.activemq.bugs;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
import javax.jms.BytesMessage;
import javax.jms.Connection;
+import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.ResourceAllocationException;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.Topic;
-
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
@@ -43,9 +41,13 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class AMQ3324Test {
- private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3324Test.class);
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class AMQ3903Test {
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3903Test.class);
private static final String bindAddress = "tcp://0.0.0.0:0";
private BrokerService broker;
@@ -72,77 +74,66 @@ public class AMQ3324Test {
}
@Test
- public void testTempMessageConsumedAdvisoryConnectionClose() throws Exception {
+ public void testAdvisoryForFastGenericProducer() throws Exception {
+ doTestAdvisoryForFastProducer(true);
+ }
+
+ @Test
+ public void testAdvisoryForFastDedicatedProducer() throws Exception {
+ doTestAdvisoryForFastProducer(false);
+ }
+
+ public void doTestAdvisoryForFastProducer(boolean genericProducer) throws Exception {
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final TemporaryQueue queue = session.createTemporaryQueue();
- MessageConsumer consumer = session.createConsumer(queue);
- final Topic advisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic((ActiveMQDestination) queue);
+ final Topic advisoryTopic = AdvisorySupport.getFastProducerAdvisoryTopic((ActiveMQDestination) queue);
MessageConsumer advisoryConsumer = session.createConsumer(advisoryTopic);
- MessageProducer producer = session.createProducer(queue);
-
- // send lots of messages to the tempQueue
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- BytesMessage m = session.createBytesMessage();
- m.writeBytes(new byte[1024]);
- producer.send(m);
- }
+ MessageProducer producer = session.createProducer(genericProducer ? null : queue);
- // consume one message from tempQueue
- Message msg = consumer.receive(5000);
- assertNotNull(msg);
+ try {
+ // send lots of messages to the tempQueue
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ BytesMessage m = session.createBytesMessage();
+ m.writeBytes(new byte[1024]);
+ if (genericProducer) {
+ producer.send(queue, m, DeliveryMode.PERSISTENT, 4, 0);
+ } else {
+ producer.send(m);
+ }
+ }
+ } catch (ResourceAllocationException expectedOnLimitReachedAfterFastAdvisory) {}
// check one advisory message has produced on the advisoryTopic
- Message advCmsg = advisoryConsumer.receive(5000);
+ Message advCmsg = advisoryConsumer.receive(4000);
assertNotNull(advCmsg);
+
connection.close();
LOG.debug("Connection closed, destinations should now become inactive.");
-
- assertTrue("The destination " + advisoryTopic + "was not removed. ", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return broker.getAdminView().getTopics().length == 0;
- }
- }));
-
- assertTrue("The destination " + queue + " was not removed. ", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return broker.getAdminView().getTemporaryQueues().length == 0;
- }
- }));
}
protected BrokerService createBroker() throws Exception {
BrokerService answer = new BrokerService();
- answer.setUseMirroredQueues(true);
answer.setPersistent(false);
- answer.setSchedulePeriodForDestinationPurge(1000);
+ answer.setUseJmx(false);
PolicyEntry entry = new PolicyEntry();
- entry.setGcInactiveDestinations(true);
- entry.setInactiveTimoutBeforeGC(2000);
- entry.setProducerFlowControl(true);
- entry.setAdvisoryForConsumed(true);
- entry.setAdvisdoryForFastProducers(true);
- entry.setAdvisoryForDelivery(true);
+ entry.setAdvisoryForFastProducers(true);
+ entry.setMemoryLimit(10000);
PolicyMap map = new PolicyMap();
map.setDefaultEntry(entry);
- MirroredQueue mirrorQ = new MirroredQueue();
- mirrorQ.setCopyMessage(true);
- DestinationInterceptor[] destinationInterceptors = new DestinationInterceptor[]{mirrorQ};
- answer.setDestinationInterceptors(destinationInterceptors);
-
answer.setDestinationPolicy(map);
answer.addConnector(bindAddress);
+ answer.getSystemUsage().setSendFailIfNoSpace(true);
+
return answer;
}
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java Thu Jul 12 11:58:05 2012
@@ -20,27 +20,21 @@ package org.apache.activemq.transport.st
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.JmsTestSupport;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
import java.io.File;
-import java.io.IOException;
import java.net.Socket;
import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
/**
@@ -59,7 +53,7 @@ public class StompAdvisoryTest extends T
private PolicyEntry createPolicyEntry() {
PolicyEntry policy = new PolicyEntry();
- policy.setAdvisdoryForFastProducers(true);
+ policy.setAdvisoryForFastProducers(true);
policy.setAdvisoryForConsumed(true);
policy.setAdvisoryForDelivery(true);
policy.setAdvisoryForDiscardingMessages(true);
@@ -78,7 +72,7 @@ public class StompAdvisoryTest extends T
broker.setPersistent(false);
PolicyEntry policy = new PolicyEntry();
- policy.setAdvisdoryForFastProducers(true);
+ policy.setAdvisoryForFastProducers(true);
policy.setAdvisoryForConsumed(true);
policy.setAdvisoryForDelivery(true);
policy.setAdvisoryForDiscardingMessages(true);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AdvisoryTopicCleanUpTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AdvisoryTopicCleanUpTest.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AdvisoryTopicCleanUpTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AdvisoryTopicCleanUpTest.java Thu Jul 12 11:58:05 2012
@@ -62,7 +62,7 @@ public class AdvisoryTopicCleanUpTest {
connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
PolicyEntry policy = new PolicyEntry();
- policy.setAdvisdoryForFastProducers(true);
+ policy.setAdvisoryForFastProducers(true);
policy.setAdvisoryForConsumed(true);
policy.setAdvisoryForDelivery(true);
policy.setAdvisoryForDiscardingMessages(true);