You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jl...@apache.org on 2006/11/18 14:42:35 UTC

svn commit: r476525 - in /incubator/activemq/branches/activemq-4.1/activemq-core/src: main/java/org/apache/activemq/ test/java/org/apache/activemq/

Author: jlim
Date: Sat Nov 18 05:42:34 2006
New Revision: 476525

URL: http://svn.apache.org/viewvc?view=rev&rev=476525
Log:
applied patch for http://issues.apache.org/activemq/browse/AMQ-967

Modified:
    incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
    incubator/activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java

Modified: incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?view=diff&rev=476525&r1=476524&r2=476525
==============================================================================
--- incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Sat Nov 18 05:42:34 2006
@@ -782,7 +782,8 @@
             	redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
 
             rollbackCounter++;
-            if(rollbackCounter>redeliveryPolicy.getMaximumRedeliveries()){
+            if(redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
+            		&& rollbackCounter>redeliveryPolicy.getMaximumRedeliveries()){
                 // We need to NACK the messages so that they get sent to the
                 // DLQ.
                 // Acknowledge the last message.

Modified: incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?view=diff&rev=476525&r1=476524&r2=476525
==============================================================================
--- incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Sat Nov 18 05:42:34 2006
@@ -712,7 +712,8 @@
                             
                             RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
                             int redeliveryCounter = md.getMessage().getRedeliveryCounter();
-                            if (redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) {
+                            if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
+                            		&& redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) {
                                 
                                 // We need to NACK the messages so that they get sent to the
                                 // DLQ.

Modified: incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java?view=diff&rev=476525&r1=476524&r2=476525
==============================================================================
--- incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java (original)
+++ incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java Sat Nov 18 05:42:34 2006
@@ -30,6 +30,8 @@
  */
 public class RedeliveryPolicy implements Cloneable, Serializable {
 
+    public static final int NO_MAXIMUM_REDELIVERIES = -1;
+
     // +/-15% for a 30% spread -cgs
     protected double collisionAvoidanceFactor = 0.15d;
     protected int maximumRedeliveries = 6;

Modified: incubator/activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java?view=diff&rev=476525&r1=476524&r2=476525
==============================================================================
--- incubator/activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java (original)
+++ incubator/activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java Sat Nov 18 05:42:34 2006
@@ -199,4 +199,109 @@
         session.commit();
         
     }
+    
+    /**
+     * @throws Exception
+     */
+    public void testInfiniteMaximumNumberOfRedeliveries() throws Exception {
+
+        // Receive a message with the JMS API
+        RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+        policy.setInitialRedeliveryDelay(100);
+        policy.setUseExponentialBackOff(false);
+       //  let's set the maximum redeliveries to no maximum (ie. infinite)
+        policy.setMaximumRedeliveries(-1);
+        
+        
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+        
+        MessageConsumer consumer = session.createConsumer(destination);
+        
+        // Send the messages
+        producer.send(session.createTextMessage("1st"));
+        producer.send(session.createTextMessage("2nd"));
+        session.commit();
+           
+        TextMessage m;
+ 
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());        
+        session.rollback();
+        
+        //we should be able to get the 1st message redelivered until a session.commit is called
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());        
+        session.rollback();           
+        
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());        
+        session.rollback();  
+        
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());        
+        session.rollback();  
+        
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());        
+        session.rollback();  
+        
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());        
+        session.commit();  
+        
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());        
+        session.commit();  
+       
+    }
+    
+    /**
+     * @throws Exception
+     */
+    public void testZeroMaximumNumberOfRedeliveries() throws Exception {
+
+        // Receive a message with the JMS API
+        RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+        policy.setInitialRedeliveryDelay(100);
+        policy.setUseExponentialBackOff(false);
+        //let's set the maximum redeliveries to 0
+        policy.setMaximumRedeliveries(0);
+      
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+        
+        MessageConsumer consumer = session.createConsumer(destination);
+        
+        // Send the messages
+        producer.send(session.createTextMessage("1st"));
+        producer.send(session.createTextMessage("2nd"));
+        session.commit();
+        
+        TextMessage m;
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());        
+        session.rollback();
+        
+        //the 1st  message should not be redelivered since maximumRedeliveries is set to 0
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());        
+        session.commit();        
+     
+  
+       
+    }        
 }