You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/07/03 22:45:14 UTC

svn commit: r1356927 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ test/java/org/apache/activemq/

Author: tabish
Date: Tue Jul  3 20:45:12 2012
New Revision: 1356927

URL: http://svn.apache.org/viewvc?rev=1356927&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3224

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1356927&r1=1356926&r2=1356927&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Tue Jul  3 20:45:12 2012
@@ -55,6 +55,7 @@ import javax.jms.XAConnection;
 
 import org.apache.activemq.advisory.DestinationSource;
 import org.apache.activemq.blob.BlobTransferPolicy;
+import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTempDestination;
@@ -131,7 +132,7 @@ public class ActiveMQConnection implemen
     // Configuration options variables
     private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
     private BlobTransferPolicy blobTransferPolicy;
-    private RedeliveryPolicy redeliveryPolicy;
+    private RedeliveryPolicyMap redeliveryPolicyMap;
     private MessageTransformer transformer;
 
     private boolean disableTimeStampsByDefault;
@@ -1644,14 +1645,14 @@ public class ActiveMQConnection implemen
      * @throws JMSException
      */
     public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
-        return redeliveryPolicy;
+        return redeliveryPolicyMap.getDefaultEntry();
     }
 
     /**
      * Sets the redelivery policy to be used when messages are rolled back
      */
     public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
-        this.redeliveryPolicy = redeliveryPolicy;
+        this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy);
     }
 
     public BlobTransferPolicy getBlobTransferPolicy() {
@@ -2549,4 +2550,22 @@ public class ActiveMQConnection implemen
             }
         }
     }
+
+    /**
+     * Sets the Connection wide RedeliveryPolicyMap for handling messages that are being rolled back.
+     * @param redeliveryPolicyMap the redeliveryPolicyMap to set
+     */
+    public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
+        this.redeliveryPolicyMap = redeliveryPolicyMap;
+    }
+
+    /**
+     * Gets the Connection's configured RedeliveryPolicyMap which will be used by all the
+     * Consumers when dealing with transaction messages that have been rolled back.
+     *
+     * @return the redeliveryPolicyMap
+     */
+    public RedeliveryPolicyMap getRedeliveryPolicyMap() {
+        return redeliveryPolicyMap;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=1356927&r1=1356926&r2=1356927&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java Tue Jul  3 20:45:12 2012
@@ -36,6 +36,7 @@ import javax.jms.TopicConnectionFactory;
 import javax.naming.Context;
 
 import org.apache.activemq.blob.BlobTransferPolicy;
+import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
 import org.apache.activemq.jndi.JNDIBaseStorable;
 import org.apache.activemq.management.JMSStatsImpl;
 import org.apache.activemq.management.StatsCapable;
@@ -90,7 +91,10 @@ public class ActiveMQConnectionFactory e
 
     // client policies
     private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
-    private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+    private RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
+    {
+        redeliveryPolicyMap.setDefaultEntry(new RedeliveryPolicy());
+    }
     private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
     private MessageTransformer transformer;
 
@@ -317,7 +321,7 @@ public class ActiveMQConnectionFactory e
         connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut());
         connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
         connection.setExclusiveConsumer(isExclusiveConsumer());
-        connection.setRedeliveryPolicy(getRedeliveryPolicy());
+        connection.setRedeliveryPolicyMap(getRedeliveryPolicyMap());
         connection.setTransformer(getTransformer());
         connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
         connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
@@ -577,15 +581,27 @@ public class ActiveMQConnectionFactory e
     }
 
     public RedeliveryPolicy getRedeliveryPolicy() {
-        return redeliveryPolicy;
+        return redeliveryPolicyMap.getDefaultEntry();
     }
 
     /**
-     * Sets the global redelivery policy to be used when a message is delivered
+     * Sets the global default redelivery policy to be used when a message is delivered
      * but the session is rolled back
      */
     public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
-        this.redeliveryPolicy = redeliveryPolicy;
+        this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy);
+    }
+
+    public RedeliveryPolicyMap getRedeliveryPolicyMap() {
+        return this.redeliveryPolicyMap;
+    }
+
+    /**
+     * Sets the global redelivery policy mapping to be used when a message is delivered
+     * but the session is rolled back
+     */
+    public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
+        this.redeliveryPolicyMap = redeliveryPolicyMap;
     }
 
     public MessageTransformer getTransformer() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1356927&r1=1356926&r2=1356927&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Tue Jul  3 20:45:12 2012
@@ -208,7 +208,7 @@ public class ActiveMQMessageConsumer imp
         }
 
         this.session = session;
-        this.redeliveryPolicy = session.connection.getRedeliveryPolicy();
+        this.redeliveryPolicy = session.connection.getRedeliveryPolicyMap().getEntryFor(dest);
         setTransformer(session.getTransformer());
 
         this.info = new ConsumerInfo(consumerId);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java?rev=1356927&r1=1356926&r2=1356927&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java Tue Jul  3 20:45:12 2012
@@ -23,7 +23,9 @@ import javax.jms.TextMessage;
 
 import junit.framework.Test;
 
+import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
 
 /**
  * Test cases used to test the JMS message exclusive consumers.
@@ -479,4 +481,112 @@ public class RedeliveryPolicyTest extend
         assertEquals("2nd", m.getText());
         session.commit();
     }
+
+    public void testRedeliveryPolicyPerDestination() throws Exception {
+
+        RedeliveryPolicy queuePolicy = new RedeliveryPolicy();
+        queuePolicy.setInitialRedeliveryDelay(0);
+        queuePolicy.setRedeliveryDelay(1000);
+        queuePolicy.setUseExponentialBackOff(false);
+        queuePolicy.setMaximumRedeliveries(2);
+
+        RedeliveryPolicy topicPolicy = new RedeliveryPolicy();
+        topicPolicy.setInitialRedeliveryDelay(0);
+        topicPolicy.setRedeliveryDelay(1000);
+        topicPolicy.setUseExponentialBackOff(false);
+        topicPolicy.setMaximumRedeliveries(3);
+
+        // Receive a message with the JMS API
+        RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap();
+        map.put(new ActiveMQTopic(">"), topicPolicy);
+        map.put(new ActiveMQQueue(">"), queuePolicy);
+
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue queue = new ActiveMQQueue("TEST");
+        ActiveMQTopic topic = new ActiveMQTopic("TEST");
+
+        MessageProducer producer = session.createProducer(null);
+
+        MessageConsumer queueConsumer = session.createConsumer(queue);
+        MessageConsumer topicConsumer = session.createConsumer(topic);
+
+        // Send the messages
+        producer.send(queue, session.createTextMessage("1st"));
+        producer.send(queue, session.createTextMessage("2nd"));
+        producer.send(topic, session.createTextMessage("1st"));
+        producer.send(topic, session.createTextMessage("2nd"));
+
+        session.commit();
+
+        TextMessage m;
+        m = (TextMessage)queueConsumer.receive(100);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        m = (TextMessage)topicConsumer.receive(100);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        m = (TextMessage)queueConsumer.receive(100);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());
+        m = (TextMessage)topicConsumer.receive(100);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());
+        session.rollback();
+
+        m = (TextMessage)queueConsumer.receive(100);
+        assertNotNull("first immediate redelivery", m);
+        m = (TextMessage)topicConsumer.receive(100);
+        assertNotNull("first immediate redelivery", m);
+        session.rollback();
+
+        m = (TextMessage)queueConsumer.receive(100);
+        assertNull("second delivery delayed: " + m, m);
+        m = (TextMessage)topicConsumer.receive(100);
+        assertNull("second delivery delayed: " + m, m);
+
+        m = (TextMessage)queueConsumer.receive(2000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        m = (TextMessage)topicConsumer.receive(2000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+
+        m = (TextMessage)queueConsumer.receive(100);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());
+        m = (TextMessage)topicConsumer.receive(100);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());
+        session.rollback();
+
+        m = (TextMessage)queueConsumer.receive(2000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        m = (TextMessage)topicConsumer.receive(2000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+
+        m = (TextMessage)queueConsumer.receive(100);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());
+        m = (TextMessage)topicConsumer.receive(100);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());
+        session.rollback();
+
+        // No third attempt for the Queue consumer
+        m = (TextMessage)queueConsumer.receive(2000);
+        assertNull(m);
+        m = (TextMessage)topicConsumer.receive(2000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+
+        m = (TextMessage)queueConsumer.receive(100);
+        assertNull(m);
+        m = (TextMessage)topicConsumer.receive(100);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());
+        session.commit();
+    }
 }