You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/11/24 17:02:51 UTC

activemq git commit: [AMQ-6517] make pre dispatch redelivery policy check optional jms.redeliveryPolicy.preDispatchCheck=false

Repository: activemq
Updated Branches:
  refs/heads/master 8bc3ee29c -> b6bca3976


[AMQ-6517] make pre dispatch redelivery policy check optional jms.redeliveryPolicy.preDispatchCheck=false


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b6bca397
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b6bca397
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b6bca397

Branch: refs/heads/master
Commit: b6bca3976c9dc18ff5a51c51c218b2f7f8825fbe
Parents: 8bc3ee2
Author: gtully <ga...@gmail.com>
Authored: Thu Nov 24 17:02:37 2016 +0000
Committer: gtully <ga...@gmail.com>
Committed: Thu Nov 24 17:02:37 2016 +0000

----------------------------------------------------------------------
 .../activemq/ActiveMQMessageConsumer.java       |   7 +-
 .../org/apache/activemq/RedeliveryPolicy.java   |   9 ++
 .../activemq/MessageListenerRedeliveryTest.java |   1 +
 .../apache/activemq/RedeliveryPolicyTest.java   | 136 ++++++++++++++++++-
 .../activemq/broker/BrokerRedeliveryTest.java   |   2 +-
 .../bugs/RedeliveryPluginHeaderTest.java        |   2 +-
 6 files changed, 150 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b6bca397/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index a52e2d4..0bf1ade 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -506,7 +506,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
                     sendPullCommand(timeout);
                 } else if (redeliveryExceeded(md)) {
                     LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), md);
-                    posionAck(md, "dispatch to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
+                    posionAck(md, "Dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
                     if (timeout > 0) {
                         timeout = Math.max(deadline - System.currentTimeMillis(), 0);
                     }
@@ -539,6 +539,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
         try {
             return session.getTransacted()
                     && redeliveryPolicy != null
+                    && redeliveryPolicy.isPreDispatchCheck()
                     && redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
                     && md.getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()
                     // redeliveryCounter > x expected after resend via brokerRedeliveryPlugin
@@ -1255,7 +1256,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
 
                     MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
                     ack.setFirstMessageId(firstMsgId);
-                    ack.setPoisonCause(new Throwable("Exceeded redelivery policy limit:" + redeliveryPolicy
+                    ack.setPoisonCause(new Throwable("Delivery[" + lastMd.getMessage().getRedeliveryCounter()  + "] exceeds redelivery policy limit:" + redeliveryPolicy
                             + ", cause:" + lastMd.getRollbackCause(), lastMd.getRollbackCause()));
                     session.sendAck(ack,true);
                     // Adjust the window size.
@@ -1392,7 +1393,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
                     if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
                         if (listener != null && unconsumedMessages.isRunning()) {
                             if (redeliveryExceeded(md)) {
-                                posionAck(md, "dispatch to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
+                                posionAck(md, "listener dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
                                 return;
                             }
                             ActiveMQMessage message = createActiveMQMessage(md);

http://git-wip-us.apache.org/repos/asf/activemq/blob/b6bca397/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java b/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java
index 91f2b71..1ced507 100644
--- a/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java
+++ b/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java
@@ -45,6 +45,7 @@ public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable,
     protected boolean useExponentialBackOff;
     protected double backOffMultiplier = 5.0;
     protected long redeliveryDelay = initialRedeliveryDelay;
+    protected boolean preDispatchCheck = true;
 
     public RedeliveryPolicy() {
     }
@@ -156,4 +157,12 @@ public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable,
     public String toString() {
         return IntrospectionSupport.toString(this, DestinationMapEntry.class, null);
     }
+
+    public void setPreDispatchCheck(boolean preDispatchCheck) {
+        this.preDispatchCheck = preDispatchCheck;
+    }
+
+    public boolean isPreDispatchCheck() {
+        return preDispatchCheck;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/b6bca397/activemq-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
index 62f4995..cdecfce 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
@@ -350,6 +350,7 @@ public class MessageListenerRedeliveryTest {
         assertTrue("is correct exception", cause.contains(getTestName()));
         assertTrue("cause exception is remembered", cause.contains("Throwable"));
         assertTrue("cause policy is remembered", cause.contains("RedeliveryPolicy"));
+        assertTrue("cause redelivered count is remembered", cause.contains("[" + (maxDeliveries+1) +"]"));
 
         session.close();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/b6bca397/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
index ac81a1f..c403781 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
@@ -38,6 +38,8 @@ import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.transport.vm.VMTransportFactory;
+import org.apache.activemq.transport.vm.VMTransportServer;
 import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -246,7 +248,9 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
         assertNotNull(m);
         assertEquals("1st", m.getText());
         String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
-        assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy"));
+        assertTrue("cause exception has policy ref: " + cause, cause.contains("RedeliveryPolicy"));
+        assertTrue("cause exception has redelivered count ref: " + cause, cause.contains("[3]"));
+
         session.commit();
 
     }
@@ -451,9 +455,134 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
         assertNotNull("Got message from DLQ", m);
         assertEquals("1st", m.getText());
         String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
-        assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy"));
+        assertTrue("cause exception has policy ref: " + cause, cause.contains("RedeliveryPolicy"));
+        assertTrue("cause exception has pre dispatch and count:" + cause, cause.contains("Delivery[5]"));
+
         dlqSession.commit();
 
+
+    }
+
+
+    public void testRepeatedRedeliveryBrokerCloseReceiveNoCommit() throws Exception {
+
+        connection.start();
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+        MessageProducer producer = session.createProducer(destination);
+
+        // Send the messages
+        producer.send(session.createTextMessage("1st"));
+
+        session.commit();
+
+        final int maxRedeliveries = 4;
+        for (int i=0;i<=maxRedeliveries +1;i++) {
+
+            connection = (ActiveMQConnection)factory.createConnection(userName, password);
+            connections.add(connection);
+            // Receive a message with the JMS API
+            RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+            policy.setInitialRedeliveryDelay(0);
+            policy.setUseExponentialBackOff(false);
+            policy.setMaximumRedeliveries(maxRedeliveries);
+
+            connection.start();
+            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = session.createConsumer(destination);
+
+            ActiveMQTextMessage m = ((ActiveMQTextMessage)consumer.receive(4000));
+            if (i<=maxRedeliveries) {
+                assertEquals("1st", m.getText());
+                assertEquals(i, m.getRedeliveryCounter());
+            } else {
+                assertNull("null on exceeding redelivery count", m);
+
+                assertTrue("message in dlq", Wait.waitFor(new Wait.Condition() {
+                    @Override
+                    public boolean isSatisified() throws Exception {
+                        LOG.info("total dequeue count: " + broker.getAdminView().getTotalDequeueCount());
+                        return broker.getAdminView().getTotalDequeueCount() == 1;
+                    }
+                }));
+            }
+
+            // abortive close via broker
+            for (VMTransportServer transportServer : VMTransportFactory.SERVERS.values()) {
+                transportServer.stop();
+            }
+
+            try {
+                connection.close();
+            } catch (Exception expected) {
+            } finally {
+                connections.remove(connection);
+            }
+        }
+
+        connection = (ActiveMQConnection)factory.createConnection(userName, password);
+        connection.start();
+        connections.add(connection);
+        Session dlqSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
+
+        // We should be able to get the message off the DLQ now.
+        TextMessage m = (TextMessage)dlqConsumer.receive(1000);
+        assertNotNull("Got message from DLQ", m);
+        assertEquals("1st", m.getText());
+        String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+        assertTrue("cause exception has policy ref: " + cause, cause.contains("RedeliveryPolicy"));
+        assertTrue("cause exception has pre dispatch and count:" + cause, cause.contains("Dispatch[5]"));
+
+        dlqSession.commit();
+
+    }
+
+    public void testRepeatedRedeliveryReceiveBrokerCloseNoPreDispatchCheck() throws Exception {
+
+        connection.start();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+
+        // Send the messages
+        producer.send(session.createTextMessage("1st"));
+        session.commit();
+
+        final int maxRedeliveries = 4;
+        for (int i=0;i<=maxRedeliveries + 1;i++) {
+
+            connection = (ActiveMQConnection)factory.createConnection(userName, password);
+            connections.add(connection);
+            // Receive a message with the JMS API
+            RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+            policy.setInitialRedeliveryDelay(0);
+            policy.setUseExponentialBackOff(false);
+            policy.setMaximumRedeliveries(maxRedeliveries);
+            policy.setPreDispatchCheck(false);
+
+            connection.start();
+            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = session.createConsumer(destination);
+
+            ActiveMQTextMessage m = ((ActiveMQTextMessage)consumer.receive(4000));
+            assertNotNull("got message on i=" + i, m);
+            assertEquals("1st", m.getText());
+            assertEquals(i, m.getRedeliveryCounter());
+
+            // abortive close via broker
+            for (VMTransportServer transportServer : VMTransportFactory.SERVERS.values()) {
+                transportServer.stop();
+            }
+
+            try {
+                connection.close();
+            } catch (Exception expected) {
+            } finally {
+                connections.remove(connection);
+            }
+        }
     }
 
 
@@ -518,7 +647,10 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
         assertNotNull("Got message from DLQ", m);
         assertEquals("1st", m.getText());
         String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+        LOG.info("cause: " + cause);
         assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy"));
+        assertTrue("cause exception has redelivered count ref: " + cause, cause.contains("[5]"));
+
         dlqSession.commit();
 
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/b6bca397/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
index 916a655..9971e8e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
@@ -74,7 +74,7 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
 
         for (int i = 0; i < maxBrokerRedeliveriesToValidate; i++) {
             Message shouldBeNull = consumer.receive(500);
-            assertNull("did not get message after redelivery count exceeded: " + shouldBeNull, shouldBeNull);
+            assertNull("did not get message early: " + shouldBeNull, shouldBeNull);
 
             TimeUnit.SECONDS.sleep(4);
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/b6bca397/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java
index 414b70d..9fc20fe 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java
@@ -70,7 +70,7 @@ public class RedeliveryPluginHeaderTest extends TestCase {
         //pushed message to broker
 
         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
-                transportURL + "?trace=true&jms.redeliveryPolicy.maximumRedeliveries=0");
+                transportURL + "?trace=true&jms.redeliveryPolicy.maximumRedeliveries=0&jms.redeliveryPolicy.preDispatchCheck=true");
 
         Connection connection = factory.createConnection();
         connection.start();