You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/07/30 23:18:00 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5907

Repository: activemq
Updated Branches:
  refs/heads/master 84ec047d2 -> b84413a31


https://issues.apache.org/jira/browse/AMQ-5907

recompute the timeout value and send a new pull request if the message
received exceeds the configure redelivery maximum. 

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b84413a3
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b84413a3
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b84413a3

Branch: refs/heads/master
Commit: b84413a31471e0c9e949cd7198ee952d063dfb61
Parents: 84ec047
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Jul 30 17:17:37 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Jul 30 17:17:55 2015 -0400

----------------------------------------------------------------------
 .../activemq/ActiveMQMessageConsumer.java       |   4 +
 .../ConsumerReceiveWithTimeoutTest.java         | 106 ++++++++++++++++---
 2 files changed, 94 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b84413a3/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index 6d3beb0..4584362 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -502,6 +502,10 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
                 } else if (redeliveryExceeded(md)) {
                     LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), md);
                     posionAck(md, "dispatch to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
+                    if (timeout > 0) {
+                        timeout = Math.max(deadline - System.currentTimeMillis(), 0);
+                    }
+                    sendPullCommand(timeout);
                 } else {
                     if (LOG.isTraceEnabled()) {
                         LOG.trace(getConsumerId() + " received message: " + md);

http://git-wip-us.apache.org/repos/asf/activemq/blob/b84413a3/activemq-unit-tests/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java
index b34fe7f..a98263b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java
@@ -16,43 +16,56 @@
  */
 package org.apache.activemq;
 
-import javax.jms.Connection;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 
-/**
- * 
- */
-public class ConsumerReceiveWithTimeoutTest extends TestSupport {
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ConsumerReceiveWithTimeoutTest {
 
-    private Connection connection;
+    private ActiveMQConnection connection;
+    private BrokerService broker;
+    private String connectionUri;
+
+    @Before
+    public void setUp() throws Exception {
+        createBroker();
 
-    protected void setUp() throws Exception {
-        super.setUp();
         connection = createConnection();
     }
 
-    /**
-     * @see junit.framework.TestCase#tearDown()
-     */
-    protected void tearDown() throws Exception {
+    @After
+    public void tearDown() throws Exception {
         if (connection != null) {
-            connection.close();
-            connection = null;
+            try {
+                connection.close();
+            } catch (Exception e) {}
+        }
+
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
         }
-        super.tearDown();
     }
 
     /**
      * Test to check if consumer thread wakes up inside a receive(timeout) after
      * a message is dispatched to the consumer
-     * 
+     *
      * @throws javax.jms.JMSException
      */
+    @Test(timeout = 30000)
     public void testConsumerReceiveBeforeMessageDispatched() throws JMSException {
 
         connection.start();
@@ -61,6 +74,7 @@ public class ConsumerReceiveWithTimeoutTest extends TestSupport {
         final Queue queue = session.createQueue("test");
 
         Thread t = new Thread() {
+            @Override
             public void run() {
                 try {
                     // wait for 10 seconds to allow consumer.receive to be run
@@ -81,7 +95,67 @@ public class ConsumerReceiveWithTimeoutTest extends TestSupport {
         Message msg = consumer.receive(60000);
         assertNotNull(msg);
         session.close();
+    }
+
+    /**
+     * check if receive(timeout) does timeout when prefetch=0 and redeliveries=0
+     * <p/>
+     * send a message.
+     * consume and rollback to ensure redeliverCount is incremented
+     * try to consume message with a timeout.
+     */
+    @Test(timeout=20000)
+    public void testConsumerReceivePrefetchZeroRedeliveryZero() throws Exception {
+
+        connection.start();
+
+        // push message to queue
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("test.prefetch.zero");
+        MessageProducer producer = session.createProducer(queue);
+        TextMessage textMessage = session.createTextMessage("test Message");
+        producer.send(textMessage);
+        session.close();
+
+        // consume and rollback - increase redelivery counter on message
+        session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message message = consumer.receive(2000);
+        assertNotNull(message);
+        session.rollback();
+        session.close();
+
+        // Reconnect with zero prefetch and zero redeliveries allowed.
+        connection.close();
+        connection = createConnection();
+        connection.getPrefetchPolicy().setQueuePrefetch(0);
+        connection.getRedeliveryPolicy().setMaximumRedeliveries(0);
+        connection.start();
 
+        // try consume with timeout - expect it to timeout and return NULL message
+        session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        consumer = session.createConsumer(queue);
+        message = consumer.receive(3000);
+
+        assertNull(message);
+    }
+
+    private void createBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setUseJmx(false);
+        broker.setPersistent(false);
+        broker.addConnector("tcp://localhost:0");
+        broker.start();
+        broker.waitUntilStarted();
+
+        connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
     }
 
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(connectionUri);
+    }
+
+    protected ActiveMQConnection createConnection() throws Exception {
+        return (ActiveMQConnection) createConnectionFactory().createConnection();
+    }
 }