You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/11/24 17:02:51 UTC
activemq git commit: [AMQ-6517] make pre dispatch redelivery policy
check optional jms.redeliveryPolicy.preDispatchCheck=false
Repository: activemq
Updated Branches:
refs/heads/master 8bc3ee29c -> b6bca3976
[AMQ-6517] make pre dispatch redelivery policy check optional jms.redeliveryPolicy.preDispatchCheck=false
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b6bca397
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b6bca397
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b6bca397
Branch: refs/heads/master
Commit: b6bca3976c9dc18ff5a51c51c218b2f7f8825fbe
Parents: 8bc3ee2
Author: gtully <ga...@gmail.com>
Authored: Thu Nov 24 17:02:37 2016 +0000
Committer: gtully <ga...@gmail.com>
Committed: Thu Nov 24 17:02:37 2016 +0000
----------------------------------------------------------------------
.../activemq/ActiveMQMessageConsumer.java | 7 +-
.../org/apache/activemq/RedeliveryPolicy.java | 9 ++
.../activemq/MessageListenerRedeliveryTest.java | 1 +
.../apache/activemq/RedeliveryPolicyTest.java | 136 ++++++++++++++++++-
.../activemq/broker/BrokerRedeliveryTest.java | 2 +-
.../bugs/RedeliveryPluginHeaderTest.java | 2 +-
6 files changed, 150 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/b6bca397/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index a52e2d4..0bf1ade 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -506,7 +506,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
sendPullCommand(timeout);
} else if (redeliveryExceeded(md)) {
LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), md);
- posionAck(md, "dispatch to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
+ posionAck(md, "Dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
if (timeout > 0) {
timeout = Math.max(deadline - System.currentTimeMillis(), 0);
}
@@ -539,6 +539,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
try {
return session.getTransacted()
&& redeliveryPolicy != null
+ && redeliveryPolicy.isPreDispatchCheck()
&& redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
&& md.getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()
// redeliveryCounter > x expected after resend via brokerRedeliveryPlugin
@@ -1255,7 +1256,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
ack.setFirstMessageId(firstMsgId);
- ack.setPoisonCause(new Throwable("Exceeded redelivery policy limit:" + redeliveryPolicy
+ ack.setPoisonCause(new Throwable("Delivery[" + lastMd.getMessage().getRedeliveryCounter() + "] exceeds redelivery policy limit:" + redeliveryPolicy
+ ", cause:" + lastMd.getRollbackCause(), lastMd.getRollbackCause()));
session.sendAck(ack,true);
// Adjust the window size.
@@ -1392,7 +1393,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
if (listener != null && unconsumedMessages.isRunning()) {
if (redeliveryExceeded(md)) {
- posionAck(md, "dispatch to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
+ posionAck(md, "listener dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
return;
}
ActiveMQMessage message = createActiveMQMessage(md);
http://git-wip-us.apache.org/repos/asf/activemq/blob/b6bca397/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java b/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java
index 91f2b71..1ced507 100644
--- a/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java
+++ b/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java
@@ -45,6 +45,7 @@ public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable,
protected boolean useExponentialBackOff;
protected double backOffMultiplier = 5.0;
protected long redeliveryDelay = initialRedeliveryDelay;
+ protected boolean preDispatchCheck = true;
public RedeliveryPolicy() {
}
@@ -156,4 +157,12 @@ public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable,
public String toString() {
return IntrospectionSupport.toString(this, DestinationMapEntry.class, null);
}
+
+ public void setPreDispatchCheck(boolean preDispatchCheck) {
+ this.preDispatchCheck = preDispatchCheck;
+ }
+
+ public boolean isPreDispatchCheck() {
+ return preDispatchCheck;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/b6bca397/activemq-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
index 62f4995..cdecfce 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
@@ -350,6 +350,7 @@ public class MessageListenerRedeliveryTest {
assertTrue("is correct exception", cause.contains(getTestName()));
assertTrue("cause exception is remembered", cause.contains("Throwable"));
assertTrue("cause policy is remembered", cause.contains("RedeliveryPolicy"));
+ assertTrue("cause redelivered count is remembered", cause.contains("[" + (maxDeliveries+1) +"]"));
session.close();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/b6bca397/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
index ac81a1f..c403781 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
@@ -38,6 +38,8 @@ import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.transport.vm.VMTransportFactory;
+import org.apache.activemq.transport.vm.VMTransportServer;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -246,7 +248,9 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
assertNotNull(m);
assertEquals("1st", m.getText());
String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
- assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy"));
+ assertTrue("cause exception has policy ref: " + cause, cause.contains("RedeliveryPolicy"));
+ assertTrue("cause exception has redelivered count ref: " + cause, cause.contains("[3]"));
+
session.commit();
}
@@ -451,9 +455,134 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
assertNotNull("Got message from DLQ", m);
assertEquals("1st", m.getText());
String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
- assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy"));
+ assertTrue("cause exception has policy ref: " + cause, cause.contains("RedeliveryPolicy"));
+ assertTrue("cause exception has pre dispatch and count:" + cause, cause.contains("Delivery[5]"));
+
dlqSession.commit();
+
+ }
+
+
+ public void testRepeatedRedeliveryBrokerCloseReceiveNoCommit() throws Exception {
+
+ connection.start();
+ ActiveMQQueue destination = new ActiveMQQueue("TEST");
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageProducer producer = session.createProducer(destination);
+
+ // Send the messages
+ producer.send(session.createTextMessage("1st"));
+
+ session.commit();
+
+ final int maxRedeliveries = 4;
+ for (int i=0;i<=maxRedeliveries +1;i++) {
+
+ connection = (ActiveMQConnection)factory.createConnection(userName, password);
+ connections.add(connection);
+ // Receive a message with the JMS API
+ RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+ policy.setInitialRedeliveryDelay(0);
+ policy.setUseExponentialBackOff(false);
+ policy.setMaximumRedeliveries(maxRedeliveries);
+
+ connection.start();
+ session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ ActiveMQTextMessage m = ((ActiveMQTextMessage)consumer.receive(4000));
+ if (i<=maxRedeliveries) {
+ assertEquals("1st", m.getText());
+ assertEquals(i, m.getRedeliveryCounter());
+ } else {
+ assertNull("null on exceeding redelivery count", m);
+
+ assertTrue("message in dlq", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ LOG.info("total dequeue count: " + broker.getAdminView().getTotalDequeueCount());
+ return broker.getAdminView().getTotalDequeueCount() == 1;
+ }
+ }));
+ }
+
+ // abortive close via broker
+ for (VMTransportServer transportServer : VMTransportFactory.SERVERS.values()) {
+ transportServer.stop();
+ }
+
+ try {
+ connection.close();
+ } catch (Exception expected) {
+ } finally {
+ connections.remove(connection);
+ }
+ }
+
+ connection = (ActiveMQConnection)factory.createConnection(userName, password);
+ connection.start();
+ connections.add(connection);
+ Session dlqSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
+
+ // We should be able to get the message off the DLQ now.
+ TextMessage m = (TextMessage)dlqConsumer.receive(1000);
+ assertNotNull("Got message from DLQ", m);
+ assertEquals("1st", m.getText());
+ String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+ assertTrue("cause exception has policy ref: " + cause, cause.contains("RedeliveryPolicy"));
+ assertTrue("cause exception has pre dispatch and count:" + cause, cause.contains("Dispatch[5]"));
+
+ dlqSession.commit();
+
+ }
+
+ public void testRepeatedRedeliveryReceiveBrokerCloseNoPreDispatchCheck() throws Exception {
+
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ ActiveMQQueue destination = new ActiveMQQueue("TEST");
+ MessageProducer producer = session.createProducer(destination);
+
+ // Send the messages
+ producer.send(session.createTextMessage("1st"));
+ session.commit();
+
+ final int maxRedeliveries = 4;
+ for (int i=0;i<=maxRedeliveries + 1;i++) {
+
+ connection = (ActiveMQConnection)factory.createConnection(userName, password);
+ connections.add(connection);
+ // Receive a message with the JMS API
+ RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+ policy.setInitialRedeliveryDelay(0);
+ policy.setUseExponentialBackOff(false);
+ policy.setMaximumRedeliveries(maxRedeliveries);
+ policy.setPreDispatchCheck(false);
+
+ connection.start();
+ session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ ActiveMQTextMessage m = ((ActiveMQTextMessage)consumer.receive(4000));
+ assertNotNull("got message on i=" + i, m);
+ assertEquals("1st", m.getText());
+ assertEquals(i, m.getRedeliveryCounter());
+
+ // abortive close via broker
+ for (VMTransportServer transportServer : VMTransportFactory.SERVERS.values()) {
+ transportServer.stop();
+ }
+
+ try {
+ connection.close();
+ } catch (Exception expected) {
+ } finally {
+ connections.remove(connection);
+ }
+ }
}
@@ -518,7 +647,10 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
assertNotNull("Got message from DLQ", m);
assertEquals("1st", m.getText());
String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+ LOG.info("cause: " + cause);
assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy"));
+ assertTrue("cause exception has redelivered count ref: " + cause, cause.contains("[5]"));
+
dlqSession.commit();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/b6bca397/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
index 916a655..9971e8e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
@@ -74,7 +74,7 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
for (int i = 0; i < maxBrokerRedeliveriesToValidate; i++) {
Message shouldBeNull = consumer.receive(500);
- assertNull("did not get message after redelivery count exceeded: " + shouldBeNull, shouldBeNull);
+ assertNull("did not get message early: " + shouldBeNull, shouldBeNull);
TimeUnit.SECONDS.sleep(4);
http://git-wip-us.apache.org/repos/asf/activemq/blob/b6bca397/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java
index 414b70d..9fc20fe 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java
@@ -70,7 +70,7 @@ public class RedeliveryPluginHeaderTest extends TestCase {
//pushed message to broker
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
- transportURL + "?trace=true&jms.redeliveryPolicy.maximumRedeliveries=0");
+ transportURL + "?trace=true&jms.redeliveryPolicy.maximumRedeliveries=0&jms.redeliveryPolicy.preDispatchCheck=true");
Connection connection = factory.createConnection();
connection.start();