You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/06/22 16:31:41 UTC
svn commit: r1352902 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/
main/java/org/apache/activemq/broker/region/policy/
main/java/org/apache/activemq/broker/util/
main/java/org/apache/activemq/filter/ test/java/org/apache/active...
Author: gtully
Date: Fri Jun 22 14:31:40 2012
New Revision: 1352902
URL: http://svn.apache.org/viewvc?rev=1352902&view=rev
Log:
implement https://issues.apache.org/jira/browse/AMQ-3894 - broker based redelivery via schedular resend and a per destination redelivery policy, plugin and tests
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RedeliveryPolicyMap.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java (with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java (with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/RedeliveryPluginTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/DestinationMapEntry.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java?rev=1352902&r1=1352901&r2=1352902&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java Fri Jun 22 14:31:40 2012
@@ -18,28 +18,31 @@ package org.apache.activemq;
import java.io.Serializable;
import java.util.Random;
+import org.apache.activemq.filter.DestinationMapEntry;
+import org.apache.activemq.util.IntrospectionSupport;
/**
- * Configuration options used to control how messages are re-delivered when they
+ * Configuration options for a messageConsumer used to control how messages are re-delivered when they
* are rolled back.
+ * May be used server side on a per destination basis via the Broker RedeliveryPlugin
*
* @org.apache.xbean.XBean element="redeliveryPolicy"
*
*/
-public class RedeliveryPolicy implements Cloneable, Serializable {
+public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable, Serializable {
public static final int NO_MAXIMUM_REDELIVERIES = -1;
private static Random randomNumberGenerator;
// +/-15% for a 30% spread -cgs
- private double collisionAvoidanceFactor = 0.15d;
- private int maximumRedeliveries = 6;
- private long maximumRedeliveryDelay = -1;
- private long initialRedeliveryDelay = 1000L;
- private boolean useCollisionAvoidance;
- private boolean useExponentialBackOff;
- private double backOffMultiplier = 5.0;
- private long redeliveryDelay = initialRedeliveryDelay;
+ protected double collisionAvoidanceFactor = 0.15d;
+ protected int maximumRedeliveries = 6;
+ protected long maximumRedeliveryDelay = -1;
+ protected long initialRedeliveryDelay = 1000L;
+ protected boolean useCollisionAvoidance;
+ protected boolean useExponentialBackOff;
+ protected double backOffMultiplier = 5.0;
+ protected long redeliveryDelay = initialRedeliveryDelay;
public RedeliveryPolicy() {
}
@@ -150,4 +153,9 @@ public class RedeliveryPolicy implements
public long getRedeliveryDelay() {
return redeliveryDelay;
}
+
+ @Override
+ public String toString() {
+ return IntrospectionSupport.toString(this, DestinationMapEntry.class, null);
+ }
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RedeliveryPolicyMap.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RedeliveryPolicyMap.java?rev=1352902&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RedeliveryPolicyMap.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RedeliveryPolicyMap.java Fri Jun 22 14:31:40 2012
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import java.util.List;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.filter.DestinationMap;
+import org.apache.activemq.filter.DestinationMapEntry;
+
+/**
+ * Represents a destination based configuration of policies so that individual
+ * destinations or wildcard hierarchies of destinations can be configured using
+ * different policies.
+ *
+ * @org.apache.xbean.XBean
+ *
+ *
+ */
+public class RedeliveryPolicyMap extends DestinationMap {
+
+ private RedeliveryPolicy defaultEntry;
+
+ public RedeliveryPolicy getEntryFor(ActiveMQDestination destination) {
+ RedeliveryPolicy answer = (RedeliveryPolicy) chooseValue(destination);
+ if (answer == null) {
+ answer = getDefaultEntry();
+ }
+ return answer;
+ }
+
+ /**
+ * Sets the individual entries on the redeliveryPolicyMap
+ *
+ * @org.apache.xbean.ElementType class="org.apache.activemq.RedeliveryPolicy"
+ */
+ public void setRedeliveryPolicyEntries(List entries) {
+ super.setEntries(entries);
+ }
+
+ public RedeliveryPolicy getDefaultEntry() {
+ return defaultEntry;
+ }
+
+ public void setDefaultEntry(RedeliveryPolicy defaultEntry) {
+ this.defaultEntry = defaultEntry;
+ }
+
+ protected Class<? extends DestinationMapEntry> getEntryClass() {
+ return RedeliveryPolicy.class;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RedeliveryPolicyMap.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RedeliveryPolicyMap.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java?rev=1352902&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java Fri Jun 22 14:31:40 2012
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.util;
+
+import java.io.IOException;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.filter.AnyDestination;
+import org.apache.activemq.state.ProducerState;
+import org.apache.activemq.util.BrokerSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Replace regular DLQ handling with redelivery via a resend to the original destination
+ * after a delay
+ * A destination matching RedeliveryPolicy controls the quantity and delay for re-sends
+ * If there is no matching policy or an existing policy limit is exceeded by default
+ * regular DLQ processing resumes. This is controlled via sendToDlqIfMaxRetriesExceeded
+ * and fallbackToDeadLetter
+ *
+ * @org.apache.xbean.XBean element="redeliveryPlugin"
+ */
+public class RedeliveryPlugin extends BrokerPluginSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(RedeliveryPlugin.class);
+ public static final String REDELIVERY_DELAY = "redeliveryDelay";
+
+ RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
+ boolean sendToDlqIfMaxRetriesExceeded = true;
+ private boolean fallbackToDeadLetter = true;
+
+ @Override
+ public Broker installPlugin(Broker broker) throws Exception {
+ if (!broker.getBrokerService().isSchedulerSupport()) {
+ throw new IllegalStateException("RedeliveryPlugin requires schedulerSupport=true on the broker");
+ }
+ validatePolicyDelay(1000);
+ return super.installPlugin(broker);
+ }
+
+ /*
+ * sending to dlq is called as part of a poison ack processing, before the message is acknowledged and removed
+ * by the destination so a delay is vital to avoid resending before it has been consumed
+ */
+ private void validatePolicyDelay(long limit) {
+ final ActiveMQDestination matchAll = new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")});
+ for (Object entry : redeliveryPolicyMap.get(matchAll)) {
+ RedeliveryPolicy redeliveryPolicy = (RedeliveryPolicy) entry;
+ validateLimit(limit, redeliveryPolicy);
+ }
+ RedeliveryPolicy defaultEntry = redeliveryPolicyMap.getDefaultEntry();
+ if (defaultEntry != null) {
+ validateLimit(limit, defaultEntry);
+ }
+ }
+
+ private void validateLimit(long limit, RedeliveryPolicy redeliveryPolicy) {
+ if (redeliveryPolicy.getInitialRedeliveryDelay() < limit) {
+ throw new IllegalStateException("RedeliveryPolicy initialRedeliveryDelay must exceed: " + limit + ". " + redeliveryPolicy);
+ }
+ if (redeliveryPolicy.getRedeliveryDelay() < limit) {
+ throw new IllegalStateException("RedeliveryPolicy redeliveryDelay must exceed: " + limit + ". " + redeliveryPolicy);
+ }
+ }
+
+ public RedeliveryPolicyMap getRedeliveryPolicyMap() {
+ return redeliveryPolicyMap;
+ }
+
+ public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
+ this.redeliveryPolicyMap = redeliveryPolicyMap;
+ }
+
+ public boolean isSendToDlqIfMaxRetriesExceeded() {
+ return sendToDlqIfMaxRetriesExceeded;
+ }
+
+ /**
+ * What to do if the maxretries on a matching redelivery policy is exceeded.
+ * when true, the region broker DLQ processing will be used via sendToDeadLetterQueue
+ * when false, there is no action
+ * @param sendToDlqIfMaxRetriesExceeded
+ */
+ public void setSendToDlqIfMaxRetriesExceeded(boolean sendToDlqIfMaxRetriesExceeded) {
+ this.sendToDlqIfMaxRetriesExceeded = sendToDlqIfMaxRetriesExceeded;
+ }
+
+ public boolean isFallbackToDeadLetter() {
+ return fallbackToDeadLetter;
+ }
+
+ /**
+ * What to do if there is no matching redelivery policy for a destination.
+ * when true, the region broker DLQ processing will be used via sendToDeadLetterQueue
+ * when false, there is no action
+ * @param fallbackToDeadLetter
+ */
+ public void setFallbackToDeadLetter(boolean fallbackToDeadLetter) {
+ this.fallbackToDeadLetter = fallbackToDeadLetter;
+ }
+
+ @Override
+ public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription) {
+ if (next.get().isExpired(messageReference)) {
+ // there are two uses of sendToDeadLetterQueue, we are only interested in valid messages
+ super.sendToDeadLetterQueue(context, messageReference, subscription);
+ } else {
+ try {
+ final RedeliveryPolicy redeliveryPolicy = redeliveryPolicyMap.getEntryFor(messageReference.getRegionDestination().getActiveMQDestination());
+ if (redeliveryPolicy != null) {
+ int redeliveryCount = messageReference.getRedeliveryCounter();
+ if (redeliveryCount < redeliveryPolicy.getMaximumRedeliveries()) {
+
+ long delay = ( redeliveryCount == 0 ?
+ redeliveryPolicy.getInitialRedeliveryDelay() :
+ redeliveryPolicy.getNextRedeliveryDelay(getExistingDelay(messageReference)));
+
+ scheduleRedelivery(context, messageReference, delay, ++redeliveryCount);
+ } else if (isSendToDlqIfMaxRetriesExceeded()) {
+ super.sendToDeadLetterQueue(context, messageReference, subscription);
+ } else {
+ LOG.debug("Discarding message that exceeds max redelivery count, " + messageReference.getMessageId());
+ }
+ } else if (isFallbackToDeadLetter()) {
+ super.sendToDeadLetterQueue(context, messageReference, subscription);
+ } else {
+ LOG.debug("Ignoring dlq request for:" + messageReference.getMessageId() + ", RedeliveryPolicy not found (and no fallback) for: " + messageReference.getRegionDestination().getActiveMQDestination());
+ }
+ } catch (Exception exception) {
+ // abort the ack, will be effective if client use transactions or individual ack with sync send
+ RuntimeException toThrow = new RuntimeException("Failed to schedule redelivery for: " + messageReference.getMessageId(), exception);
+ LOG.error(toThrow.toString(), exception);
+ throw toThrow;
+ }
+ }
+ }
+
+ private void scheduleRedelivery(ConnectionContext context, MessageReference messageReference, long delay, int redeliveryCount) throws Exception {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("redelivery #" + redeliveryCount + " of: " + messageReference.getMessageId() + " with delay: "
+ + delay + ", dest: " + messageReference.getRegionDestination().getActiveMQDestination());
+ }
+ final Message old = messageReference.getMessage();
+ Message message = old.copy();
+
+ message.setTransactionId(null);
+ message.setMemoryUsage(null);
+ message.setMarshalledProperties(null);
+ message.removeProperty(ScheduledMessage.AMQ_SCHEDULED_ID);
+
+ message.setProperty(REDELIVERY_DELAY, delay);
+ message.setProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
+ message.setRedeliveryCounter(redeliveryCount);
+
+ boolean originalFlowControl = context.isProducerFlowControl();
+ try {
+ context.setProducerFlowControl(false);
+ ProducerInfo info = new ProducerInfo();
+ ProducerState state = new ProducerState(info);
+ ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
+ producerExchange.setProducerState(state);
+ producerExchange.setMutable(true);
+ producerExchange.setConnectionContext(context);
+ context.getBroker().send(producerExchange, message);
+ } finally {
+ context.setProducerFlowControl(originalFlowControl);
+ }
+ }
+
+ private int getExistingDelay(MessageReference messageReference) throws IOException {
+ Object val = messageReference.getMessage().getProperty(REDELIVERY_DELAY);
+ if (val instanceof Long) {
+ return ((Long)val).intValue();
+ }
+ return 0;
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/DestinationMapEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/DestinationMapEntry.java?rev=1352902&r1=1352901&r2=1352902&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/DestinationMapEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/DestinationMapEntry.java Fri Jun 22 14:31:40 2012
@@ -29,7 +29,7 @@ import org.apache.activemq.command.*;
*/
public abstract class DestinationMapEntry<T> implements Comparable<T> {
- private ActiveMQDestination destination;
+ protected ActiveMQDestination destination;
public int compareTo(Object that) {
if (that instanceof DestinationMapEntry) {
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java?rev=1352902&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java Fri Jun 22 14:31:40 2012
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.util.concurrent.TimeUnit;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
+import org.apache.activemq.broker.util.RedeliveryPlugin;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
+
+ static final Logger LOG = LoggerFactory.getLogger(BrokerRedeliveryTest.class);
+ BrokerService broker = null;
+
+ final ActiveMQQueue destination = new ActiveMQQueue("Redelivery");
+ final String data = "hi";
+ final long redeliveryDelayMillis = 2000;
+ final int maxBrokerRedeliveries = 2;
+
+ public void testScheduledRedelivery() throws Exception {
+
+ sendMessage();
+
+ ActiveMQConnection consumerConnection = (ActiveMQConnection) createConnection();
+ RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+ redeliveryPolicy.setInitialRedeliveryDelay(0);
+ redeliveryPolicy.setMaximumRedeliveries(0);
+ consumerConnection.setRedeliveryPolicy(redeliveryPolicy);
+ consumerConnection.start();
+ Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = consumerSession.createConsumer(destination);
+ Message message = consumer.receive(1000);
+ assertNotNull("got message", message);
+ LOG.info("got: " + message);
+ consumerSession.rollback();
+
+ for (int i=0;i<maxBrokerRedeliveries;i++) {
+ Message shouldBeNull = consumer.receive(500);
+ assertNull("did not get message after redelivery count exceeded: " + shouldBeNull, shouldBeNull);
+
+ TimeUnit.SECONDS.sleep(3);
+
+ Message brokerRedeliveryMessage = consumer.receive(500);
+ LOG.info("got: " + brokerRedeliveryMessage);
+ assertNotNull("got message via broker redelivery after delay", brokerRedeliveryMessage);
+ assertEquals("message matches", message.getStringProperty("data"), brokerRedeliveryMessage.getStringProperty("data"));
+ assertEquals("has expiryDelay specified", redeliveryDelayMillis, brokerRedeliveryMessage.getLongProperty(RedeliveryPlugin.REDELIVERY_DELAY));
+
+ consumerSession.rollback();
+ }
+
+ // validate DLQ
+ MessageConsumer dlqConsumer = consumerSession.createConsumer(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
+ Message dlqMessage = dlqConsumer.receive(2000);
+ assertNotNull("Got message from dql", dlqMessage);
+ assertEquals("message matches", message.getStringProperty("data"), dlqMessage.getStringProperty("data"));
+ consumerSession.commit();
+ }
+
+ private void sendMessage() throws Exception {
+ ActiveMQConnection producerConnection = (ActiveMQConnection) createConnection();
+ producerConnection.start();
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(destination);
+ Message message = producerSession.createMessage();
+ message.setStringProperty("data", data);
+ producer.send(message);
+ producerConnection.close();
+ }
+
+ private void startBroker(boolean deleteMessages) throws Exception {
+ broker = new BrokerService();
+ broker.setSchedulerSupport(true);
+
+
+ RedeliveryPlugin redeliveryPlugin = new RedeliveryPlugin();
+
+ RedeliveryPolicy brokerRedeliveryPolicy = new RedeliveryPolicy();
+ brokerRedeliveryPolicy.setRedeliveryDelay(redeliveryDelayMillis);
+ brokerRedeliveryPolicy.setInitialRedeliveryDelay(redeliveryDelayMillis);
+ brokerRedeliveryPolicy.setMaximumRedeliveries(maxBrokerRedeliveries);
+
+ RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
+ redeliveryPolicyMap.setDefaultEntry(brokerRedeliveryPolicy);
+ redeliveryPlugin.setRedeliveryPolicyMap(redeliveryPolicyMap);
+
+ broker.setPlugins(new BrokerPlugin[]{redeliveryPlugin});
+
+ if (deleteMessages) {
+ broker.setDeleteAllMessagesOnStartup(true);
+ }
+ broker.start();
+ }
+
+
+ private void stopBroker() throws Exception {
+ if (broker != null)
+ broker.stop();
+ broker = null;
+ }
+
+ protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+ return new ActiveMQConnectionFactory("vm://localhost");
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ startBroker(true);
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ stopBroker();
+ super.tearDown();
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/RedeliveryPluginTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/RedeliveryPluginTest.java?rev=1352902&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/RedeliveryPluginTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/RedeliveryPluginTest.java Fri Jun 22 14:31:40 2012
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.util;
+
+import junit.framework.TestCase;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ErrorBroker;
+import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RedeliveryPluginTest extends TestCase {
+ private static final Logger LOG = LoggerFactory.getLogger(RedeliveryPluginTest.class);
+ RedeliveryPlugin underTest = new RedeliveryPlugin();
+
+ public void testInstallPluginValidation() throws Exception {
+ RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
+ RedeliveryPolicy defaultEntry = new RedeliveryPolicy();
+ defaultEntry.setInitialRedeliveryDelay(500);
+ redeliveryPolicyMap.setDefaultEntry(defaultEntry);
+ underTest.setRedeliveryPolicyMap(redeliveryPolicyMap);
+
+ final BrokerService brokerService = new BrokerService();
+ brokerService.setSchedulerSupport(false);
+ Broker broker = new ErrorBroker("hi") {
+ @Override
+ public BrokerService getBrokerService() {
+ return brokerService;
+ }
+ };
+
+ try {
+ underTest.installPlugin(broker);
+ fail("expect exception on no scheduler support");
+ } catch (Exception expected) {
+ LOG.info("expected: " + expected);
+ }
+
+ brokerService.setSchedulerSupport(true);
+ try {
+ underTest.installPlugin(broker);
+ fail("expect exception on small initial delay");
+ } catch (Exception expected) {
+ LOG.info("expected: " + expected);
+ }
+
+ defaultEntry.setInitialRedeliveryDelay(5000);
+ defaultEntry.setRedeliveryDelay(500);
+ brokerService.setSchedulerSupport(true);
+ try {
+ underTest.installPlugin(broker);
+ fail("expect exception on small redelivery delay");
+ } catch (Exception expected) {
+ LOG.info("expected: " + expected);
+ }
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/RedeliveryPluginTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/RedeliveryPluginTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date