You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2015/03/18 17:08:07 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5674 - initialRedeliveryDelay not respected

Repository: activemq
Updated Branches:
  refs/heads/master ca456c460 -> 20832f1f1


https://issues.apache.org/jira/browse/AMQ-5674 - initialRedeliveryDelay not respected


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/20832f1f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/20832f1f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/20832f1f

Branch: refs/heads/master
Commit: 20832f1f1b5bf028d43256a8b2172f2795b0c02d
Parents: ca456c4
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Wed Mar 18 17:07:45 2015 +0100
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Wed Mar 18 17:07:59 2015 +0100

----------------------------------------------------------------------
 .../java/org/apache/activemq/ActiveMQSession.java     |  4 ++--
 .../java/org/apache/activemq/RedeliveryPolicy.java    |  5 ++++-
 .../src/test/java/org/apache/activemq/ra/MDBTest.java | 14 ++++++++++++--
 .../org/apache/activemq/RedeliveryPolicyTest.java     | 14 ++++++++++++++
 4 files changed, 32 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/20832f1f/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index e327ef1..14c2869 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -937,7 +937,6 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
                         @Override
                         public void afterRollback() throws Exception {
                             LOG.trace("rollback {}", ack, new Throwable("here"));
-                            md.getMessage().onMessageRolledBack();
                             // ensure we don't filter this as a duplicate
                             connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
 
@@ -956,7 +955,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
                             RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
                             int redeliveryCounter = md.getMessage().getRedeliveryCounter();
                             if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
-                                && redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) {
+                                && redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) {
                                 // We need to NACK the messages so that they get
                                 // sent to the
                                 // DLQ.
@@ -986,6 +985,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
                                     }
                                 }, redeliveryDelay);
                             }
+                            md.getMessage().onMessageRolledBack();
                         }
                     });
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/20832f1f/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java b/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java
index 91f2b71..e0a8f33 100644
--- a/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java
+++ b/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java
@@ -98,7 +98,10 @@ public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable,
     }
 
     public long getNextRedeliveryDelay(long previousDelay) {
-        long nextDelay = redeliveryDelay;
+        long nextDelay = initialRedeliveryDelay;
+        if (nextDelay == 0) {
+            nextDelay = redeliveryDelay;
+        }
 
         if (previousDelay > 0 && useExponentialBackOff && backOffMultiplier > 1) {
             nextDelay = (long) (previousDelay * backOffMultiplier);

http://git-wip-us.apache.org/repos/asf/activemq/blob/20832f1f/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
index 5927d3e..904dd18 100644
--- a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
+++ b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
@@ -23,6 +23,8 @@ import java.lang.reflect.Method;
 import java.util.Timer;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.Connection;
@@ -424,12 +426,18 @@ public class MDBTest extends TestCase {
         adapter.setServerUrl("vm://localhost?broker.persistent=false");
         adapter.start(new StubBootstrapContext());
 
-        final CountDownLatch messageDelivered = new CountDownLatch(2);
+        final CountDownLatch messageDelivered = new CountDownLatch(5);
+        final AtomicLong timeReceived = new AtomicLong();
+        final AtomicBoolean failed = new AtomicBoolean(false);
 
         final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
             public void onMessage(Message message) {
                 super.onMessage(message);
                 try {
+                    long now = System.currentTimeMillis();
+                    if ((now - timeReceived.getAndSet(now)) > 1000) {
+                        failed.set(true);
+                    }
                     messageDelivered.countDown();
                     if (!messageDelivered.await(1, TimeUnit.MILLISECONDS)) {
                         throw new RuntimeException(getName() + " ex on first delivery");
@@ -463,6 +471,7 @@ public class MDBTest extends TestCase {
         ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
         activationSpec.setDestinationType(Queue.class.getName());
         activationSpec.setDestination("TEST");
+        activationSpec.setInitialRedeliveryDelay(100);
         activationSpec.setResourceAdapter(adapter);
         activationSpec.validate();
 
@@ -486,7 +495,7 @@ public class MDBTest extends TestCase {
         } catch (Exception e) {
 
         }
-
+        timeReceived.set(System.currentTimeMillis());
         // Send the broker a message to that endpoint
         MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
         producer.send(session.createTextMessage("Hello!"));
@@ -494,6 +503,7 @@ public class MDBTest extends TestCase {
 
         // Wait for the message to be delivered twice.
         assertTrue(messageDelivered.await(10000, TimeUnit.MILLISECONDS));
+        assertFalse("Delivery policy delay not working", failed.get());
 
         // Shut the Endpoint down.
         adapter.endpointDeactivation(messageEndpointFactory, activationSpec);

http://git-wip-us.apache.org/repos/asf/activemq/blob/20832f1f/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
index e2b5867..1f8f687 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
@@ -69,6 +69,20 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
         assertEquals(500, delay);
     }
 
+    public void testGetNextWithInitialDelay() throws Exception {
+
+        RedeliveryPolicy policy = new RedeliveryPolicy();
+        policy.setInitialRedeliveryDelay(500);
+
+        long delay = policy.getNextRedeliveryDelay(500);
+        assertEquals(500, delay);
+        delay = policy.getNextRedeliveryDelay(delay);
+        assertEquals(500, delay);
+        delay = policy.getNextRedeliveryDelay(delay);
+        assertEquals(500, delay);
+
+    }
+
     /**
      * @throws Exception
      */