You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/06/22 16:31:41 UTC

svn commit: r1352902 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/region/policy/ main/java/org/apache/activemq/broker/util/ main/java/org/apache/activemq/filter/ test/java/org/apache/active...

Author: gtully
Date: Fri Jun 22 14:31:40 2012
New Revision: 1352902

URL: http://svn.apache.org/viewvc?rev=1352902&view=rev
Log:
implement https://issues.apache.org/jira/browse/AMQ-3894 - broker based redelivery via schedular resend and a per destination redelivery policy, plugin and tests

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RedeliveryPolicyMap.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/RedeliveryPluginTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/DestinationMapEntry.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java?rev=1352902&r1=1352901&r2=1352902&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java Fri Jun 22 14:31:40 2012
@@ -18,28 +18,31 @@ package org.apache.activemq;
 
 import java.io.Serializable;
 import java.util.Random;
+import org.apache.activemq.filter.DestinationMapEntry;
+import org.apache.activemq.util.IntrospectionSupport;
 
 /**
- * Configuration options used to control how messages are re-delivered when they
+ * Configuration options for a messageConsumer used to control how messages are re-delivered when they
  * are rolled back.
+ * May be used server side on a per destination basis via the Broker RedeliveryPlugin
  *
  * @org.apache.xbean.XBean element="redeliveryPolicy"
  *
  */
-public class RedeliveryPolicy implements Cloneable, Serializable {
+public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable, Serializable {
 
     public static final int NO_MAXIMUM_REDELIVERIES = -1;
     private static Random randomNumberGenerator;
 
     // +/-15% for a 30% spread -cgs
-    private double collisionAvoidanceFactor = 0.15d;
-    private int maximumRedeliveries = 6;
-    private long maximumRedeliveryDelay = -1;
-    private long initialRedeliveryDelay = 1000L;
-    private boolean useCollisionAvoidance;
-    private boolean useExponentialBackOff;
-    private double backOffMultiplier = 5.0;
-    private long redeliveryDelay = initialRedeliveryDelay;
+    protected double collisionAvoidanceFactor = 0.15d;
+    protected int maximumRedeliveries = 6;
+    protected long maximumRedeliveryDelay = -1;
+    protected long initialRedeliveryDelay = 1000L;
+    protected boolean useCollisionAvoidance;
+    protected boolean useExponentialBackOff;
+    protected double backOffMultiplier = 5.0;
+    protected long redeliveryDelay = initialRedeliveryDelay;
 
     public RedeliveryPolicy() {
     }
@@ -150,4 +153,9 @@ public class RedeliveryPolicy implements
     public long getRedeliveryDelay() {
         return redeliveryDelay;
     }
+
+    @Override
+    public String toString() {
+        return IntrospectionSupport.toString(this, DestinationMapEntry.class, null);
+    }
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RedeliveryPolicyMap.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RedeliveryPolicyMap.java?rev=1352902&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RedeliveryPolicyMap.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RedeliveryPolicyMap.java Fri Jun 22 14:31:40 2012
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import java.util.List;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.filter.DestinationMap;
+import org.apache.activemq.filter.DestinationMapEntry;
+
+/**
+ * Represents a destination based configuration of policies so that individual
+ * destinations or wildcard hierarchies of destinations can be configured using
+ * different policies.
+ * 
+ * @org.apache.xbean.XBean
+ * 
+ * 
+ */
+public class RedeliveryPolicyMap extends DestinationMap {
+
+    private RedeliveryPolicy defaultEntry;
+
+    public RedeliveryPolicy getEntryFor(ActiveMQDestination destination) {
+        RedeliveryPolicy answer = (RedeliveryPolicy) chooseValue(destination);
+        if (answer == null) {
+            answer = getDefaultEntry();
+        }
+        return answer;
+    }
+
+    /**
+     * Sets the individual entries on the redeliveryPolicyMap
+     * 
+     * @org.apache.xbean.ElementType class="org.apache.activemq.RedeliveryPolicy"
+     */
+    public void setRedeliveryPolicyEntries(List entries) {
+        super.setEntries(entries);
+    }
+
+    public RedeliveryPolicy getDefaultEntry() {
+        return defaultEntry;
+    }
+
+    public void setDefaultEntry(RedeliveryPolicy defaultEntry) {
+        this.defaultEntry = defaultEntry;
+    }
+
+    protected Class<? extends DestinationMapEntry> getEntryClass() {
+        return RedeliveryPolicy.class;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RedeliveryPolicyMap.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RedeliveryPolicyMap.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java?rev=1352902&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java Fri Jun 22 14:31:40 2012
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.util;
+
+import java.io.IOException;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.filter.AnyDestination;
+import org.apache.activemq.state.ProducerState;
+import org.apache.activemq.util.BrokerSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Replace regular DLQ handling with redelivery via a resend to the original destination
+ * after a delay
+ * A destination matching RedeliveryPolicy controls the quantity and delay for re-sends
+ * If there is no matching policy or an existing policy limit is exceeded by default
+ * regular DLQ processing resumes. This is controlled via sendToDlqIfMaxRetriesExceeded
+ * and fallbackToDeadLetter
+ *
+ * @org.apache.xbean.XBean element="redeliveryPlugin"
+ */
+public class RedeliveryPlugin extends BrokerPluginSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(RedeliveryPlugin.class);
+    public static final String REDELIVERY_DELAY = "redeliveryDelay";
+
+    RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
+    boolean sendToDlqIfMaxRetriesExceeded = true;
+    private boolean fallbackToDeadLetter = true;
+
+    @Override
+    public Broker installPlugin(Broker broker) throws Exception {
+        if (!broker.getBrokerService().isSchedulerSupport()) {
+            throw new IllegalStateException("RedeliveryPlugin requires schedulerSupport=true on the broker");
+        }
+        validatePolicyDelay(1000);
+        return super.installPlugin(broker);
+    }
+
+    /*
+     * sending to dlq is called as part of a poison ack processing, before the message is acknowledged  and removed
+     * by the destination so a delay is vital to avoid resending before it has been consumed
+     */
+    private void validatePolicyDelay(long limit) {
+        final ActiveMQDestination matchAll = new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")});
+        for (Object entry : redeliveryPolicyMap.get(matchAll)) {
+            RedeliveryPolicy redeliveryPolicy = (RedeliveryPolicy) entry;
+            validateLimit(limit, redeliveryPolicy);
+        }
+        RedeliveryPolicy defaultEntry = redeliveryPolicyMap.getDefaultEntry();
+        if (defaultEntry != null) {
+            validateLimit(limit, defaultEntry);
+        }
+    }
+
+    private void validateLimit(long limit, RedeliveryPolicy redeliveryPolicy) {
+        if (redeliveryPolicy.getInitialRedeliveryDelay() < limit) {
+            throw new IllegalStateException("RedeliveryPolicy initialRedeliveryDelay must exceed: " + limit + ". " + redeliveryPolicy);
+        }
+        if (redeliveryPolicy.getRedeliveryDelay() < limit) {
+            throw new IllegalStateException("RedeliveryPolicy redeliveryDelay must exceed: " + limit + ". " + redeliveryPolicy);
+        }
+    }
+
+    public RedeliveryPolicyMap getRedeliveryPolicyMap() {
+        return redeliveryPolicyMap;
+    }
+
+    public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
+        this.redeliveryPolicyMap = redeliveryPolicyMap;
+    }
+
+    public boolean isSendToDlqIfMaxRetriesExceeded() {
+        return sendToDlqIfMaxRetriesExceeded;
+    }
+
+    /**
+     * What to do if the maxretries on a matching redelivery policy is exceeded.
+     * when true, the region broker DLQ processing will be used via sendToDeadLetterQueue
+     * when false, there is no action
+     * @param sendToDlqIfMaxRetriesExceeded
+     */
+    public void setSendToDlqIfMaxRetriesExceeded(boolean sendToDlqIfMaxRetriesExceeded) {
+        this.sendToDlqIfMaxRetriesExceeded = sendToDlqIfMaxRetriesExceeded;
+    }
+
+    public boolean isFallbackToDeadLetter() {
+        return fallbackToDeadLetter;
+    }
+
+    /**
+     * What to do if there is no matching redelivery policy for a destination.
+     * when true, the region broker DLQ processing will be used via sendToDeadLetterQueue
+     * when false, there is no action
+     * @param fallbackToDeadLetter
+     */
+    public void setFallbackToDeadLetter(boolean fallbackToDeadLetter) {
+        this.fallbackToDeadLetter = fallbackToDeadLetter;
+    }
+
+    @Override
+    public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription) {
+        if (next.get().isExpired(messageReference)) {
+            // there are two uses of  sendToDeadLetterQueue, we are only interested in valid messages
+            super.sendToDeadLetterQueue(context, messageReference, subscription);
+        } else {
+            try {
+                final RedeliveryPolicy redeliveryPolicy = redeliveryPolicyMap.getEntryFor(messageReference.getRegionDestination().getActiveMQDestination());
+                if (redeliveryPolicy != null) {
+                    int redeliveryCount = messageReference.getRedeliveryCounter();
+                    if (redeliveryCount < redeliveryPolicy.getMaximumRedeliveries()) {
+
+                        long delay = ( redeliveryCount == 0 ?
+                                redeliveryPolicy.getInitialRedeliveryDelay() :
+                                redeliveryPolicy.getNextRedeliveryDelay(getExistingDelay(messageReference)));
+
+                        scheduleRedelivery(context, messageReference, delay, ++redeliveryCount);
+                    } else if (isSendToDlqIfMaxRetriesExceeded()) {
+                        super.sendToDeadLetterQueue(context, messageReference, subscription);
+                    } else {
+                        LOG.debug("Discarding message that exceeds max redelivery count, " + messageReference.getMessageId());
+                    }
+                } else if (isFallbackToDeadLetter()) {
+                    super.sendToDeadLetterQueue(context, messageReference, subscription);
+                } else {
+                    LOG.debug("Ignoring dlq request for:" + messageReference.getMessageId()  + ", RedeliveryPolicy not found (and no fallback) for: " + messageReference.getRegionDestination().getActiveMQDestination());
+                }
+            } catch (Exception exception) {
+                // abort the ack, will be effective if client use transactions or individual ack with sync send
+                RuntimeException toThrow =  new RuntimeException("Failed to schedule redelivery for: " + messageReference.getMessageId(), exception);
+                LOG.error(toThrow.toString(), exception);
+                throw toThrow;
+            }
+        }
+    }
+
+    private void scheduleRedelivery(ConnectionContext context, MessageReference messageReference, long delay, int redeliveryCount) throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("redelivery #" + redeliveryCount + " of: " + messageReference.getMessageId() + " with delay: "
+                    + delay + ", dest: " + messageReference.getRegionDestination().getActiveMQDestination());
+        }
+        final Message old = messageReference.getMessage();
+        Message message = old.copy();
+
+        message.setTransactionId(null);
+        message.setMemoryUsage(null);
+        message.setMarshalledProperties(null);
+        message.removeProperty(ScheduledMessage.AMQ_SCHEDULED_ID);
+
+        message.setProperty(REDELIVERY_DELAY, delay);
+        message.setProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
+        message.setRedeliveryCounter(redeliveryCount);
+
+        boolean originalFlowControl = context.isProducerFlowControl();
+        try {
+            context.setProducerFlowControl(false);
+            ProducerInfo info = new ProducerInfo();
+            ProducerState state = new ProducerState(info);
+            ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
+            producerExchange.setProducerState(state);
+            producerExchange.setMutable(true);
+            producerExchange.setConnectionContext(context);
+            context.getBroker().send(producerExchange, message);
+        } finally {
+            context.setProducerFlowControl(originalFlowControl);
+        }
+    }
+
+    private int getExistingDelay(MessageReference messageReference) throws IOException {
+        Object val = messageReference.getMessage().getProperty(REDELIVERY_DELAY);
+        if (val instanceof Long) {
+            return ((Long)val).intValue();
+        }
+        return 0;
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/DestinationMapEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/DestinationMapEntry.java?rev=1352902&r1=1352901&r2=1352902&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/DestinationMapEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/DestinationMapEntry.java Fri Jun 22 14:31:40 2012
@@ -29,7 +29,7 @@ import org.apache.activemq.command.*;
  */
 public abstract class DestinationMapEntry<T> implements Comparable<T> {
 
-    private ActiveMQDestination destination;
+    protected ActiveMQDestination destination;
 
     public int compareTo(Object that) {
         if (that instanceof DestinationMapEntry) {

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java?rev=1352902&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java Fri Jun 22 14:31:40 2012
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.util.concurrent.TimeUnit;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
+import org.apache.activemq.broker.util.RedeliveryPlugin;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
+
+    static final Logger LOG = LoggerFactory.getLogger(BrokerRedeliveryTest.class);
+    BrokerService broker = null;
+
+    final ActiveMQQueue destination = new ActiveMQQueue("Redelivery");
+    final String data = "hi";
+    final long redeliveryDelayMillis = 2000;
+    final int maxBrokerRedeliveries = 2;
+
+    public void testScheduledRedelivery() throws Exception {
+
+        sendMessage();
+
+        ActiveMQConnection consumerConnection = (ActiveMQConnection) createConnection();
+        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+        redeliveryPolicy.setInitialRedeliveryDelay(0);
+        redeliveryPolicy.setMaximumRedeliveries(0);
+        consumerConnection.setRedeliveryPolicy(redeliveryPolicy);
+        consumerConnection.start();
+        Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer consumer = consumerSession.createConsumer(destination);
+        Message message = consumer.receive(1000);
+        assertNotNull("got message", message);
+        LOG.info("got: " + message);
+        consumerSession.rollback();
+
+        for (int i=0;i<maxBrokerRedeliveries;i++) {
+            Message shouldBeNull = consumer.receive(500);
+            assertNull("did not get message after redelivery count exceeded: " + shouldBeNull, shouldBeNull);
+
+            TimeUnit.SECONDS.sleep(3);
+
+            Message brokerRedeliveryMessage = consumer.receive(500);
+            LOG.info("got: " + brokerRedeliveryMessage);
+            assertNotNull("got message via broker redelivery after delay", brokerRedeliveryMessage);
+            assertEquals("message matches", message.getStringProperty("data"), brokerRedeliveryMessage.getStringProperty("data"));
+            assertEquals("has expiryDelay specified", redeliveryDelayMillis, brokerRedeliveryMessage.getLongProperty(RedeliveryPlugin.REDELIVERY_DELAY));
+
+            consumerSession.rollback();
+        }
+
+        // validate DLQ
+        MessageConsumer dlqConsumer = consumerSession.createConsumer(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
+        Message dlqMessage = dlqConsumer.receive(2000);
+        assertNotNull("Got message from dql", dlqMessage);
+        assertEquals("message matches", message.getStringProperty("data"), dlqMessage.getStringProperty("data"));
+        consumerSession.commit();
+    }
+
+    private void sendMessage() throws Exception {
+        ActiveMQConnection producerConnection = (ActiveMQConnection) createConnection();
+        producerConnection.start();
+        Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(destination);
+        Message message = producerSession.createMessage();
+        message.setStringProperty("data", data);
+        producer.send(message);
+        producerConnection.close();
+    }
+
+    private void startBroker(boolean deleteMessages) throws Exception {
+        broker = new BrokerService();
+        broker.setSchedulerSupport(true);
+
+
+        RedeliveryPlugin redeliveryPlugin = new RedeliveryPlugin();
+
+        RedeliveryPolicy brokerRedeliveryPolicy = new RedeliveryPolicy();
+        brokerRedeliveryPolicy.setRedeliveryDelay(redeliveryDelayMillis);
+        brokerRedeliveryPolicy.setInitialRedeliveryDelay(redeliveryDelayMillis);
+        brokerRedeliveryPolicy.setMaximumRedeliveries(maxBrokerRedeliveries);
+
+        RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
+        redeliveryPolicyMap.setDefaultEntry(brokerRedeliveryPolicy);
+        redeliveryPlugin.setRedeliveryPolicyMap(redeliveryPolicyMap);
+
+        broker.setPlugins(new BrokerPlugin[]{redeliveryPlugin});
+
+        if (deleteMessages) {
+            broker.setDeleteAllMessagesOnStartup(true);
+        }
+        broker.start();
+    }
+
+
+    private void stopBroker() throws Exception {
+        if (broker != null)
+            broker.stop();
+        broker = null;
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory("vm://localhost");
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        startBroker(true);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        stopBroker();
+        super.tearDown();
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/RedeliveryPluginTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/RedeliveryPluginTest.java?rev=1352902&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/RedeliveryPluginTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/RedeliveryPluginTest.java Fri Jun 22 14:31:40 2012
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.util;
+
+import junit.framework.TestCase;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ErrorBroker;
+import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RedeliveryPluginTest extends TestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(RedeliveryPluginTest.class);
+    RedeliveryPlugin underTest = new RedeliveryPlugin();
+
+    public void testInstallPluginValidation() throws Exception {
+        RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
+        RedeliveryPolicy defaultEntry = new RedeliveryPolicy();
+        defaultEntry.setInitialRedeliveryDelay(500);
+        redeliveryPolicyMap.setDefaultEntry(defaultEntry);
+        underTest.setRedeliveryPolicyMap(redeliveryPolicyMap);
+
+        final BrokerService brokerService = new BrokerService();
+        brokerService.setSchedulerSupport(false);
+        Broker broker = new ErrorBroker("hi") {
+            @Override
+            public BrokerService getBrokerService() {
+                return brokerService;
+            }
+        };
+
+        try {
+            underTest.installPlugin(broker);
+            fail("expect exception on no scheduler support");
+        } catch (Exception expected) {
+            LOG.info("expected: " + expected);
+        }
+
+        brokerService.setSchedulerSupport(true);
+        try {
+            underTest.installPlugin(broker);
+            fail("expect exception on small initial delay");
+        } catch (Exception expected) {
+            LOG.info("expected: " + expected);
+        }
+
+        defaultEntry.setInitialRedeliveryDelay(5000);
+        defaultEntry.setRedeliveryDelay(500);
+        brokerService.setSchedulerSupport(true);
+        try {
+            underTest.installPlugin(broker);
+            fail("expect exception on small redelivery delay");
+        } catch (Exception expected) {
+            LOG.info("expected: " + expected);
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/RedeliveryPluginTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/RedeliveryPluginTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date