You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2015/03/18 17:08:07 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5674 -
initialRedeliveryDelay not respected
Repository: activemq
Updated Branches:
refs/heads/master ca456c460 -> 20832f1f1
https://issues.apache.org/jira/browse/AMQ-5674 - initialRedeliveryDelay not respected
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/20832f1f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/20832f1f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/20832f1f
Branch: refs/heads/master
Commit: 20832f1f1b5bf028d43256a8b2172f2795b0c02d
Parents: ca456c4
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Wed Mar 18 17:07:45 2015 +0100
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Wed Mar 18 17:07:59 2015 +0100
----------------------------------------------------------------------
.../java/org/apache/activemq/ActiveMQSession.java | 4 ++--
.../java/org/apache/activemq/RedeliveryPolicy.java | 5 ++++-
.../src/test/java/org/apache/activemq/ra/MDBTest.java | 14 ++++++++++++--
.../org/apache/activemq/RedeliveryPolicyTest.java | 14 ++++++++++++++
4 files changed, 32 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/20832f1f/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index e327ef1..14c2869 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -937,7 +937,6 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
@Override
public void afterRollback() throws Exception {
LOG.trace("rollback {}", ack, new Throwable("here"));
- md.getMessage().onMessageRolledBack();
// ensure we don't filter this as a duplicate
connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
@@ -956,7 +955,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
int redeliveryCounter = md.getMessage().getRedeliveryCounter();
if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
- && redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) {
+ && redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) {
// We need to NACK the messages so that they get
// sent to the
// DLQ.
@@ -986,6 +985,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
}
}, redeliveryDelay);
}
+ md.getMessage().onMessageRolledBack();
}
});
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/20832f1f/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java b/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java
index 91f2b71..e0a8f33 100644
--- a/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java
+++ b/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java
@@ -98,7 +98,10 @@ public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable,
}
public long getNextRedeliveryDelay(long previousDelay) {
- long nextDelay = redeliveryDelay;
+ long nextDelay = initialRedeliveryDelay;
+ if (nextDelay == 0) {
+ nextDelay = redeliveryDelay;
+ }
if (previousDelay > 0 && useExponentialBackOff && backOffMultiplier > 1) {
nextDelay = (long) (previousDelay * backOffMultiplier);
http://git-wip-us.apache.org/repos/asf/activemq/blob/20832f1f/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
index 5927d3e..904dd18 100644
--- a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
+++ b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
@@ -23,6 +23,8 @@ import java.lang.reflect.Method;
import java.util.Timer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
@@ -424,12 +426,18 @@ public class MDBTest extends TestCase {
adapter.setServerUrl("vm://localhost?broker.persistent=false");
adapter.start(new StubBootstrapContext());
- final CountDownLatch messageDelivered = new CountDownLatch(2);
+ final CountDownLatch messageDelivered = new CountDownLatch(5);
+ final AtomicLong timeReceived = new AtomicLong();
+ final AtomicBoolean failed = new AtomicBoolean(false);
final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
public void onMessage(Message message) {
super.onMessage(message);
try {
+ long now = System.currentTimeMillis();
+ if ((now - timeReceived.getAndSet(now)) > 1000) {
+ failed.set(true);
+ }
messageDelivered.countDown();
if (!messageDelivered.await(1, TimeUnit.MILLISECONDS)) {
throw new RuntimeException(getName() + " ex on first delivery");
@@ -463,6 +471,7 @@ public class MDBTest extends TestCase {
ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
activationSpec.setDestinationType(Queue.class.getName());
activationSpec.setDestination("TEST");
+ activationSpec.setInitialRedeliveryDelay(100);
activationSpec.setResourceAdapter(adapter);
activationSpec.validate();
@@ -486,7 +495,7 @@ public class MDBTest extends TestCase {
} catch (Exception e) {
}
-
+ timeReceived.set(System.currentTimeMillis());
// Send the broker a message to that endpoint
MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
producer.send(session.createTextMessage("Hello!"));
@@ -494,6 +503,7 @@ public class MDBTest extends TestCase {
// Wait for the message to be delivered twice.
assertTrue(messageDelivered.await(10000, TimeUnit.MILLISECONDS));
+ assertFalse("Delivery policy delay not working", failed.get());
// Shut the Endpoint down.
adapter.endpointDeactivation(messageEndpointFactory, activationSpec);
http://git-wip-us.apache.org/repos/asf/activemq/blob/20832f1f/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
index e2b5867..1f8f687 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
@@ -69,6 +69,20 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
assertEquals(500, delay);
}
+ public void testGetNextWithInitialDelay() throws Exception {
+
+ RedeliveryPolicy policy = new RedeliveryPolicy();
+ policy.setInitialRedeliveryDelay(500);
+
+ long delay = policy.getNextRedeliveryDelay(500);
+ assertEquals(500, delay);
+ delay = policy.getNextRedeliveryDelay(delay);
+ assertEquals(500, delay);
+ delay = policy.getNextRedeliveryDelay(delay);
+ assertEquals(500, delay);
+
+ }
+
/**
* @throws Exception
*/