You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/11/18 16:59:22 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6500

Repository: activemq
Updated Branches:
  refs/heads/master 253726674 -> e050519ff


https://issues.apache.org/jira/browse/AMQ-6500

Better handle prefetch extension and pull consumers over Topics to avoid
the remote not receiving all the messages available based on the credit
it has issued. 

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e050519f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e050519f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e050519f

Branch: refs/heads/master
Commit: e050519ff6ae8079c5183f6e6372ddb3d03e91c7
Parents: 2537266
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Nov 18 11:59:01 2016 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Nov 18 11:59:01 2016 -0500

----------------------------------------------------------------------
 .../activemq/transport/amqp/JMSClientTest.java  |  77 +++++++-
 .../amqp/interop/AmqpReceiverDrainTest.java     | 148 +++++++++++++---
 .../amqp/interop/AmqpReceiverTest.java          |  43 ++++-
 .../amqp/interop/AmqpSendReceiveTest.java       | 107 +++++++++++-
 .../broker/region/TopicSubscription.java        |  98 ++++++-----
 .../TopicSubscriptionZeroPrefetchTest.java      | 174 +++++++++++++++++--
 6 files changed, 546 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e050519f/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index 3583656..97ce106 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -230,7 +230,7 @@ public class JMSClientTest extends JMSClientTestSupport {
         assertEquals(totalCount, proxy.getQueueSize());
 
         // Consume again..check we receive all the messages.
-        Set<Integer> messageNumbers = new HashSet<Integer>();
+        Set<Integer> messageNumbers = new HashSet<>();
         for (int i = 1; i <= totalCount; i++) {
             messageNumbers.add(i);
         }
@@ -644,7 +644,7 @@ public class JMSClientTest extends JMSClientTestSupport {
     public void testDurableConsumerAsync() throws Exception {
         ActiveMQAdmin.enableJMSFrameTracing();
         final CountDownLatch latch = new CountDownLatch(1);
-        final AtomicReference<Message> received = new AtomicReference<Message>();
+        final AtomicReference<Message> received = new AtomicReference<>();
         String durableClientId = getDestinationName() + "-ClientId";
 
         connection = createConnection(durableClientId);
@@ -693,7 +693,7 @@ public class JMSClientTest extends JMSClientTestSupport {
             message.setText("hello");
             producer.send(message);
 
-            final AtomicReference<Message> msg = new AtomicReference<Message>();
+            final AtomicReference<Message> msg = new AtomicReference<>();
             assertTrue(Wait.waitFor(new Wait.Condition() {
 
                 @Override
@@ -712,7 +712,7 @@ public class JMSClientTest extends JMSClientTestSupport {
     public void testTopicConsumerAsync() throws Exception {
         ActiveMQAdmin.enableJMSFrameTracing();
         final CountDownLatch latch = new CountDownLatch(1);
-        final AtomicReference<Message> received = new AtomicReference<Message>();
+        final AtomicReference<Message> received = new AtomicReference<>();
 
         connection = createConnection();
         {
@@ -760,7 +760,7 @@ public class JMSClientTest extends JMSClientTestSupport {
             message.setText("hello");
             producer.send(message);
 
-            final AtomicReference<Message> msg = new AtomicReference<Message>();
+            final AtomicReference<Message> msg = new AtomicReference<>();
             assertTrue(Wait.waitFor(new Wait.Condition() {
 
                 @Override
@@ -782,7 +782,7 @@ public class JMSClientTest extends JMSClientTestSupport {
         final ConnectorViewMBean connector = getProxyToConnectionView(getTargetConnectorName());
         LOG.info("Current number of Connections is: {}", connector.connectionCount());
 
-        ArrayList<Connection> connections = new ArrayList<Connection>();
+        ArrayList<Connection> connections = new ArrayList<>();
 
         for (int i = 0; i < 10; i++) {
             connections.add(createConnection(null));
@@ -1265,4 +1265,69 @@ public class JMSClientTest extends JMSClientTestSupport {
             assertFalse(message.getJMSRedelivered());
         }
     }
+
+    @Test(timeout = 30000)
+    public void testProduceAndConsumeLargeNumbersOfTopicMessagesClientAck() throws Exception {
+        doTestProduceAndConsumeLargeNumbersOfMessages(true, Session.CLIENT_ACKNOWLEDGE);
+    }
+
+    @Test(timeout = 30000)
+    public void testProduceAndConsumeLargeNumbersOfQueueMessagesClientAck() throws Exception {
+        doTestProduceAndConsumeLargeNumbersOfMessages(false, Session.CLIENT_ACKNOWLEDGE);
+    }
+
+    @Test(timeout = 30000)
+    public void testProduceAndConsumeLargeNumbersOfTopicMessagesAutoAck() throws Exception {
+        doTestProduceAndConsumeLargeNumbersOfMessages(true, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    @Test(timeout = 30000)
+    public void testProduceAndConsumeLargeNumbersOfQueueMessagesAutoAck() throws Exception {
+        doTestProduceAndConsumeLargeNumbersOfMessages(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    public void doTestProduceAndConsumeLargeNumbersOfMessages(boolean topic, int ackMode) throws Exception {
+
+        final int MSG_COUNT = 1000;
+        final CountDownLatch done = new CountDownLatch(MSG_COUNT);
+
+        JmsConnectionFactory factory = new JmsConnectionFactory(getAmqpURI());
+        factory.setForceSyncSend(true);
+
+        connection = factory.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, ackMode);
+        final Destination destination;
+        if (topic) {
+            destination = session.createTopic(getDestinationName());
+        } else {
+            destination = session.createQueue(getDestinationName());
+        }
+
+        MessageConsumer consumer = session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+                try {
+                    message.acknowledge();
+                    done.countDown();
+                } catch (JMSException ex) {
+                    LOG.info("Caught exception.", ex);
+                }
+            }
+        });
+
+        MessageProducer producer = session.createProducer(destination);
+
+        TextMessage textMessage = session.createTextMessage();
+        textMessage.setText("messageText");
+
+        for (int i = 0; i < MSG_COUNT; i++) {
+            producer.send(textMessage);
+        }
+
+        assertTrue("Did not receive all messages: " + MSG_COUNT, done.await(15, TimeUnit.SECONDS));
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e050519f/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverDrainTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverDrainTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverDrainTest.java
index aa74100..993f26b 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverDrainTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverDrainTest.java
@@ -22,7 +22,9 @@ import static org.junit.Assert.assertNull;
 
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.jmx.TopicViewMBean;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
@@ -37,47 +39,93 @@ import org.junit.Test;
 public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
 
     @Test(timeout = 60000)
-    public void testReceiverCanDrainMessages() throws Exception {
+    public void testReceiverCanDrainMessagesQueue() throws Exception {
+        doTestReceiverCanDrainMessages(false);
+    }
+
+    @Test(timeout = 60000)
+    public void testReceiverCanDrainMessagesTopic() throws Exception {
+        doTestReceiverCanDrainMessages(true);
+    }
+
+    private void doTestReceiverCanDrainMessages(boolean topic) throws Exception {
+        final String destinationName;
+        if (topic) {
+            destinationName = "topic://" + getTestName();
+        } else {
+            destinationName = "queue://" + getTestName();
+        }
+
         int MSG_COUNT = 20;
-        sendMessages(getTestName(), MSG_COUNT, false);
 
         AmqpClient client = createAmqpClient();
         AmqpConnection connection = trackConnection(client.connect());
         AmqpSession session = connection.createSession();
 
-        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+        AmqpReceiver receiver = session.createReceiver(destinationName);
+
+        sendMessages(getTestName(), MSG_COUNT, topic);
+
+        final DestinationViewMBean destinationView;
+        if (topic) {
+            destinationView = getProxyToTopic(getTestName());
+        } else {
+            destinationView = getProxyToQueue(getTestName());
+        }
 
-        QueueViewMBean queueView = getProxyToQueue(getTestName());
-        assertEquals(MSG_COUNT, queueView.getQueueSize());
-        assertEquals(0, queueView.getDispatchCount());
+        assertEquals(MSG_COUNT, destinationView.getEnqueueCount());
+        assertEquals(0, destinationView.getDispatchCount());
 
         receiver.drain(MSG_COUNT);
         for (int i = 0; i < MSG_COUNT; ++i) {
             AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
-            assertNotNull(message);
+            assertNotNull("Failed to read message: " + (i + 1), message);
+            LOG.info("Read message: {}", message.getMessageId());
             message.accept();
         }
         receiver.close();
 
-        assertEquals(0, queueView.getQueueSize());
+        assertEquals(MSG_COUNT, destinationView.getDequeueCount());
 
         connection.close();
     }
 
     @Test(timeout = 60000)
-    public void testPullWithNoMessageGetDrained() throws Exception {
+    public void testPullWithNoMessageGetDrainedQueue() throws Exception {
+        doTestPullWithNoMessageGetDrained(false);
+    }
+
+    @Test(timeout = 60000)
+    public void testPullWithNoMessageGetDrainedTopic() throws Exception {
+        doTestPullWithNoMessageGetDrained(true);
+    }
+
+    private void doTestPullWithNoMessageGetDrained(boolean topic) throws Exception {
+
+        final String destinationName;
+        if (topic) {
+            destinationName = "topic://" + getTestName();
+        } else {
+            destinationName = "queue://" + getTestName();
+        }
 
         AmqpClient client = createAmqpClient();
         AmqpConnection connection = trackConnection(client.connect());
         AmqpSession session = connection.createSession();
 
-        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+        AmqpReceiver receiver = session.createReceiver(destinationName);
 
         receiver.flow(10);
 
-        QueueViewMBean queueView = getProxyToQueue(getTestName());
-        assertEquals(0, queueView.getQueueSize());
-        assertEquals(0, queueView.getDispatchCount());
+        final DestinationViewMBean destinationView;
+        if (topic) {
+            destinationView = getProxyToTopic(getTestName());
+        } else {
+            destinationView = getProxyToQueue(getTestName());
+        }
+
+        assertEquals(0, destinationView.getEnqueueCount());
+        assertEquals(0, destinationView.getDispatchCount());
 
         assertEquals(10, receiver.getReceiver().getRemoteCredit());
 
@@ -89,19 +137,42 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
     }
 
     @Test(timeout = 60000)
-    public void testPullOneFromRemote() throws Exception {
-        int MSG_COUNT = 20;
-        sendMessages(getTestName(), MSG_COUNT, false);
+    public void testPullOneFromRemoteQueue() throws Exception {
+        doTestPullOneFromRemote(false);
+    }
+
+    @Test(timeout = 60000)
+    public void testPullOneFromRemoteTopic() throws Exception {
+        doTestPullOneFromRemote(true);
+    }
+
+    private void doTestPullOneFromRemote(boolean topic) throws Exception {
 
         AmqpClient client = createAmqpClient();
         AmqpConnection connection = trackConnection(client.connect());
         AmqpSession session = connection.createSession();
 
-        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+        final String destinationName;
+        if (topic) {
+            destinationName = "topic://" + getTestName();
+        } else {
+            destinationName = "queue://" + getTestName();
+        }
+
+        AmqpReceiver receiver = session.createReceiver(destinationName);
+
+        int MSG_COUNT = 20;
+        sendMessages(getTestName(), MSG_COUNT, topic);
+
+        final DestinationViewMBean destinationView;
+        if (topic) {
+            destinationView = getProxyToTopic(getTestName());
+        } else {
+            destinationView = getProxyToQueue(getTestName());
+        }
 
-        QueueViewMBean queueView = getProxyToQueue(getTestName());
-        assertEquals(MSG_COUNT, queueView.getQueueSize());
-        assertEquals(0, queueView.getDispatchCount());
+        assertEquals(MSG_COUNT, destinationView.getEnqueueCount());
+        assertEquals(0, destinationView.getDispatchCount());
 
         assertEquals(0, receiver.getReceiver().getRemoteCredit());
 
@@ -113,25 +184,48 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
 
         receiver.close();
 
-        assertEquals(MSG_COUNT - 1, queueView.getQueueSize());
-        assertEquals(1, queueView.getDispatchCount());
+        assertEquals(MSG_COUNT - 1, destinationView.getEnqueueCount() - destinationView.getDequeueCount());
+        assertEquals(1, destinationView.getDispatchCount());
 
         connection.close();
     }
 
     @Test(timeout = 60000)
-    public void testMultipleZeroResultPulls() throws Exception {
+    public void testMultipleZeroResultPullsQueue() throws Exception {
+        doTestMultipleZeroResultPulls(false);
+    }
+
+    @Test(timeout = 60000)
+    public void testMultipleZeroResultPullsTopic() throws Exception {
+        doTestMultipleZeroResultPulls(true);
+    }
+
+    private void doTestMultipleZeroResultPulls(boolean topic) throws Exception {
+
         AmqpClient client = createAmqpClient();
         AmqpConnection connection = trackConnection(client.connect());
         AmqpSession session = connection.createSession();
 
-        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+        final String destinationName;
+        if (topic) {
+            destinationName = "topic://" + getTestName();
+        } else {
+            destinationName = "queue://" + getTestName();
+        }
+
+        AmqpReceiver receiver = session.createReceiver(destinationName);
 
         receiver.flow(10);
 
-        QueueViewMBean queueView = getProxyToQueue(getTestName());
-        assertEquals(0, queueView.getQueueSize());
-        assertEquals(0, queueView.getDispatchCount());
+        if (topic) {
+            TopicViewMBean topicView = getProxyToTopic(getTestName());
+            assertEquals(0, topicView.getEnqueueCount());
+            assertEquals(0, topicView.getDispatchCount());
+        } else {
+            QueueViewMBean queueView = getProxyToQueue(getTestName());
+            assertEquals(0, queueView.getQueueSize());
+            assertEquals(0, queueView.getDispatchCount());
+        }
 
         assertEquals(10, receiver.getReceiver().getRemoteCredit());
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/e050519f/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
index 08df785..f60af7b 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.junit.ActiveMQTestRunner;
 import org.apache.activemq.junit.Repeat;
@@ -268,19 +269,43 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
 
     @Test(timeout = 60000)
     @Repeat(repetitions = 1)
-    public void testPresettledReceiverReadsAllMessagesInNonFlowBatch() throws Exception {
+    public void testPresettledReceiverReadsAllMessagesInNonFlowBatchQueue() throws Exception {
+        doTestPresettledReceiverReadsAllMessagesInNonFlowBatch(false);
+    }
+
+    @Test(timeout = 60000)
+    @Repeat(repetitions = 1)
+    public void testPresettledReceiverReadsAllMessagesInNonFlowBatchTopic() throws Exception {
+        doTestPresettledReceiverReadsAllMessagesInNonFlowBatch(true);
+    }
+
+    private void doTestPresettledReceiverReadsAllMessagesInNonFlowBatch(boolean topic) throws Exception {
+
+        final String destinationName;
+        if (topic) {
+            destinationName = "topic://" + getTestName();
+        } else {
+            destinationName = "queue://" + getTestName();
+        }
+
         final int MSG_COUNT = 100;
-        sendMessages(getTestName(), MSG_COUNT, false);
 
         AmqpClient client = createAmqpClient();
         AmqpConnection connection = trackConnection(client.connect());
         AmqpSession session = connection.createSession();
 
-        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), null, false, true);
+        AmqpReceiver receiver = session.createReceiver(destinationName, null, false, true);
 
-        QueueViewMBean queueView = getProxyToQueue(getTestName());
-        assertEquals(MSG_COUNT, queueView.getQueueSize());
-        assertEquals(0, queueView.getDispatchCount());
+        sendMessages(getTestName(), MSG_COUNT, topic);
+
+        final DestinationViewMBean destinationView;
+        if (topic) {
+            destinationView = getProxyToTopic(getTestName());
+        } else {
+            destinationView = getProxyToQueue(getTestName());
+        }
+        assertEquals(MSG_COUNT, destinationView.getEnqueueCount());
+        assertEquals(0, destinationView.getDispatchCount());
 
         receiver.flow(20);
         // consume less that flow
@@ -302,7 +327,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
 
         receiver.close();
 
-        assertEquals(0, queueView.getQueueSize());
+        assertEquals(0, destinationView.getEnqueueCount() - destinationView.getDequeueCount());
 
         connection.close();
     }
@@ -481,7 +506,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
             }
         });
 
-        Map<Symbol, DescribedType> filters = new HashMap<Symbol, DescribedType>();
+        Map<Symbol, DescribedType> filters = new HashMap<>();
         filters.put(AmqpUnknownFilterType.UNKNOWN_FILTER_NAME, AmqpUnknownFilterType.UNKOWN_FILTER);
 
         Source source = new Source();

http://git-wip-us.apache.org/repos/asf/activemq/blob/e050519f/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
index ef3f27d..cb60e54 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.Queue;
 import javax.jms.Topic;
@@ -60,13 +61,30 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
     protected static final Logger LOG = LoggerFactory.getLogger(AmqpSendReceiveTest.class);
 
     @Test(timeout = 60000)
-    public void testSimpleSendOneReceiveOne() throws Exception {
+    public void testSimpleSendOneReceiveOneToQueue() throws Exception {
+        doTestSimpleSendOneReceiveOne(Queue.class);
+    }
+
+    @Test(timeout = 60000)
+    public void testSimpleSendOneReceiveOneToTopic() throws Exception {
+        doTestSimpleSendOneReceiveOne(Topic.class);
+    }
+
+    public void doTestSimpleSendOneReceiveOne(Class<?> destType) throws Exception {
+
+        final String address;
+        if (Queue.class.equals(destType)) {
+            address = "queue://" + getTestName();
+        } else {
+            address = "topic://" + getTestName();
+        }
 
         AmqpClient client = createAmqpClient();
         AmqpConnection connection = trackConnection(client.connect());
         AmqpSession session = connection.createSession();
 
-        AmqpSender sender = session.createSender("queue://" + getTestName());
+        AmqpSender sender = session.createSender(address);
+        AmqpReceiver receiver = session.createReceiver(address);
 
         AmqpMessage message = new AmqpMessage();
 
@@ -78,7 +96,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
         sender.close();
 
         LOG.info("Attempting to read message with receiver");
-        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
         receiver.flow(2);
         AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
         assertNotNull("Should have read message", received);
@@ -366,7 +383,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
 
     private void doTestReceiveMessageAndRefillCreditBeforeAcceptOnTopicAsync(Class<?> destType) throws Exception {
         final AmqpClient client = createAmqpClient();
-        final LinkedList<Throwable> errors = new LinkedList<Throwable>();
+        final LinkedList<Throwable> errors = new LinkedList<>();
         final CountDownLatch receiverReady = new CountDownLatch(1);
         ExecutorService executorService = Executors.newCachedThreadPool();
 
@@ -609,7 +626,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
             sender.send(message);
         }
 
-        List<AmqpMessage> pendingAcks = new ArrayList<AmqpMessage>();
+        List<AmqpMessage> pendingAcks = new ArrayList<>();
 
         for (int i = 0; i < MSG_COUNT; i++) {
             receiver.flow(1);
@@ -719,4 +736,84 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
 
         connection.close();
     }
+
+    @Test(timeout = 60000)
+    public void testSendReceiveLotsOfDurableMessagesOnQueue() throws Exception {
+        doTestSendReceiveLotsOfDurableMessages(Queue.class);
+    }
+
+    @Test(timeout = 60000)
+    public void testSendReceiveLotsOfDurableMessagesOnTopic() throws Exception {
+        doTestSendReceiveLotsOfDurableMessages(Topic.class);
+    }
+
+    private void doTestSendReceiveLotsOfDurableMessages(Class<?> destType) throws Exception {
+        final int MSG_COUNT = 1000;
+
+        AmqpClient client = createAmqpClient();
+
+        AmqpConnection connection = trackConnection(client.connect());
+        AmqpSession session = connection.createSession();
+
+        final CountDownLatch done = new CountDownLatch(MSG_COUNT);
+        final AtomicBoolean error = new AtomicBoolean(false);
+        final ExecutorService executor = Executors.newSingleThreadExecutor();
+
+        final String address;
+        if (Queue.class.equals(destType)) {
+            address = "queue://" + getTestName();
+        } else {
+            address = "topic://" + getTestName();
+        }
+
+        final AmqpReceiver receiver = session.createReceiver(address);
+        receiver.flow(MSG_COUNT);
+
+        AmqpSender sender = session.createSender(address);
+
+        final DestinationViewMBean destinationView;
+        if (Queue.class.equals(destType)) {
+            destinationView = getProxyToQueue(getTestName());
+        } else {
+            destinationView = getProxyToTopic(getTestName());
+        }
+
+        executor.execute(new Runnable() {
+
+            @Override
+            public void run() {
+                for (int i = 0; i < MSG_COUNT; i++) {
+                    try {
+                        AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+                        received.accept();
+                        done.countDown();
+                    } catch (Exception ex) {
+                        LOG.info("Caught error: {}", ex.getClass().getSimpleName());
+                        error.set(true);
+                    }
+                }
+            }
+        });
+
+        for (int i = 0; i < MSG_COUNT; i++) {
+            AmqpMessage message = new AmqpMessage();
+            message.setMessageId("msg" + i);
+            sender.send(message);
+        }
+
+        assertTrue("did not read all messages, waiting on: " + done.getCount(), done.await(10, TimeUnit.SECONDS));
+        assertFalse("should not be any errors on receive", error.get());
+
+        assertTrue("Should be no inflight messages: " + destinationView.getInFlightCount(), Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return destinationView.getInFlightCount() == 0;
+            }
+        }));
+
+        sender.close();
+        receiver.close();
+        connection.close();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e050519f/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index 6ab264d..d993333 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -20,7 +20,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.JMSException;
@@ -276,37 +275,61 @@ public class TopicSubscription extends AbstractSubscription {
     public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
         super.acknowledge(context, ack);
 
-        // Handle the standard acknowledgment case.
-        if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
-            if (context.isInTransaction()) {
-                context.getTransaction().addSynchronization(new Synchronization() {
-                    @Override
-                    public void afterCommit() throws Exception {
-                        updateStatsOnAck(ack);
-                        dispatchMatched();
-                    }
-                });
-            } else {
-                updateStatsOnAck(ack);
+        if (ack.isStandardAck()) {
+            updateStatsOnAck(context, ack);
+        } else if (ack.isPoisonAck()) {
+            if (ack.isInTransaction()) {
+                throw new JMSException("Poison ack cannot be transacted: " + ack);
+            }
+            updateStatsOnAck(context, ack);
+            if (getPrefetchSize() != 0) {
+                decrementPrefetchExtension(ack.getMessageCount());
+            }
+        } else if (ack.isIndividualAck()) {
+            updateStatsOnAck(context, ack);
+            if (getPrefetchSize() != 0 && ack.isInTransaction()) {
+                incrementPrefetchExtension(ack.getMessageCount());
             }
-            updatePrefetch(ack);
-            dispatchMatched();
-            return;
-        } else if (ack.isDeliveredAck()) {
-            // Message was delivered but not acknowledged: update pre-fetch counters.
-            prefetchExtension.addAndGet(ack.getMessageCount());
-            dispatchMatched();
-            return;
         } else if (ack.isExpiredAck()) {
             updateStatsOnAck(ack);
-            updatePrefetch(ack);
-            dispatchMatched();
-            return;
+            if (getPrefetchSize() != 0) {
+                incrementPrefetchExtension(ack.getMessageCount());
+            }
+        } else if (ack.isDeliveredAck()) {
+            // Message was delivered but not acknowledged: update pre-fetch counters.
+            if (getPrefetchSize() != 0) {
+                incrementPrefetchExtension(ack.getMessageCount());
+            }
         } else if (ack.isRedeliveredAck()) {
-            // nothing to do atm
+            // No processing for redelivered needed
             return;
+        } else {
+            throw new JMSException("Invalid acknowledgment: " + ack);
+        }
+
+        dispatchMatched();
+    }
+
+    private void updateStatsOnAck(final ConnectionContext context, final MessageAck ack) {
+        if (context.isInTransaction()) {
+            context.getTransaction().addSynchronization(new Synchronization() {
+
+                @Override
+                public void beforeEnd() {
+                    if (getPrefetchSize() != 0) {
+                        decrementPrefetchExtension(ack.getMessageCount());
+                    }
+                }
+
+                @Override
+                public void afterCommit() throws Exception {
+                    updateStatsOnAck(ack);
+                    dispatchMatched();
+                }
+            });
+        } else {
+            updateStatsOnAck(ack);
         }
-        throw new JMSException("Invalid acknowledgment: " + ack);
     }
 
     @Override
@@ -399,20 +422,20 @@ public class TopicSubscription extends AbstractSubscription {
         }
     }
 
-    private void updatePrefetch(MessageAck ack) {
+    private void incrementPrefetchExtension(int amount) {
         while (true) {
             int currentExtension = prefetchExtension.get();
-            int newExtension = Math.max(0, currentExtension - ack.getMessageCount());
+            int newExtension = Math.max(0, currentExtension + amount);
             if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
                 break;
             }
         }
     }
 
-    private void decrementPrefetchExtension() {
+    private void decrementPrefetchExtension(int amount) {
         while (true) {
             int currentExtension = prefetchExtension.get();
-            int newExtension = Math.max(0, currentExtension - 1);
+            int newExtension = Math.max(0, currentExtension - amount);
             if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
                 break;
             }
@@ -439,7 +462,7 @@ public class TopicSubscription extends AbstractSubscription {
     @Override
     public int getDispatchedQueueSize() {
         return (int)(getSubscriptionStatistics().getDispatched().getCount() -
-                prefetchExtension.get() - getSubscriptionStatistics().getDequeues().getCount());
+                     getSubscriptionStatistics().getDequeues().getCount());
     }
 
     public int getMaximumPendingMessages() {
@@ -538,10 +561,7 @@ public class TopicSubscription extends AbstractSubscription {
     // -------------------------------------------------------------------------
     @Override
     public boolean isFull() {
-        if (info.getPrefetchSize() == 0) {
-            return prefetchExtension.get() == 0;
-        }
-        return getDispatchedQueueSize() >= info.getPrefetchSize();
+        return getPrefetchSize() == 0 ? prefetchExtension.get() == 0 : getDispatchedQueueSize() - prefetchExtension.get() >= info.getPrefetchSize();
     }
 
     @Override
@@ -554,7 +574,7 @@ public class TopicSubscription extends AbstractSubscription {
      */
     @Override
     public boolean isLowWaterMark() {
-        return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4);
+        return (getDispatchedQueueSize() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4);
     }
 
     /**
@@ -562,7 +582,7 @@ public class TopicSubscription extends AbstractSubscription {
      */
     @Override
     public boolean isHighWaterMark() {
-        return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9);
+        return (getDispatchedQueueSize() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9);
     }
 
     /**
@@ -669,10 +689,10 @@ public class TopicSubscription extends AbstractSubscription {
             }
 
             if (getPrefetchSize() == 0) {
-                decrementPrefetchExtension();
+                decrementPrefetchExtension(1);
             }
-
         }
+
         if (info.isDispatchAsync()) {
             if (node != null) {
                 md.setTransmitCallback(new TransmitCallback() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/e050519f/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
index 38fa921..87be689 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,24 +16,42 @@
  */
 package org.apache.activemq.usecases;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import javax.jms.Connection;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 
-import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TopicSubscriptionZeroPrefetchTest {
 
-    private static final String TOPIC_NAME = "slow.consumer";
+    private static final Logger LOG = LoggerFactory.getLogger(TopicSubscriptionZeroPrefetchTest.class);
+
+    @Rule
+    public TestName name = new TestName();
+
     private Connection connection;
     private Session session;
     private ActiveMQTopic destination;
@@ -41,6 +59,10 @@ public class TopicSubscriptionZeroPrefetchTest {
     private MessageConsumer consumer;
     private BrokerService brokerService;
 
+    public String getTopicName() {
+        return name.getMethodName();
+    }
+
     @Before
     public void setUp() throws Exception {
 
@@ -52,7 +74,7 @@ public class TopicSubscriptionZeroPrefetchTest {
         connection = activeMQConnectionFactory.createConnection();
         connection.setClientID("ClientID-1");
         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        destination = new ActiveMQTopic(TOPIC_NAME);
+        destination = new ActiveMQTopic(getTopicName());
         producer = session.createProducer(destination);
 
         connection.start();
@@ -61,10 +83,10 @@ public class TopicSubscriptionZeroPrefetchTest {
     /*
      * test non durable topic subscription with prefetch set to zero
      */
-    @Test(timeout=60000)
+    @Test(timeout = 60000)
     public void testTopicConsumerPrefetchZero() throws Exception {
 
-        ActiveMQTopic consumerDestination = new ActiveMQTopic(TOPIC_NAME + "?consumer.retroactive=true&consumer.prefetchSize=0");
+        ActiveMQTopic consumerDestination = new ActiveMQTopic(getTopicName() + "?consumer.retroactive=true&consumer.prefetchSize=0");
         consumer = session.createConsumer(consumerDestination);
 
         // publish messages
@@ -76,31 +98,153 @@ public class TopicSubscriptionZeroPrefetchTest {
         Assert.assertNotNull("should have received a message the published message", consumedMessage);
     }
 
-    @Test(timeout=60000)
-    public void testTopicConsumerPrefetchZeroClientAckLoop() throws Exception {
-        ActiveMQTopic consumerDestination = new ActiveMQTopic(TOPIC_NAME + "?consumer.retroactive=true&consumer.prefetchSize=0");
+    @Test(timeout = 60000)
+    public void testTopicConsumerPrefetchZeroClientAckLoopReceive() throws Exception {
+        ActiveMQTopic consumerDestination = new ActiveMQTopic(getTopicName() + "?consumer.retroactive=true&consumer.prefetchSize=0");
+        Session consumerClientAckSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        consumer = consumerClientAckSession.createConsumer(consumerDestination);
+
+        final int count = 10;
+        for (int i = 0; i < count; i++) {
+            Message txtMessage = session.createTextMessage("M:" + i);
+            producer.send(txtMessage);
+        }
+
+        for (int i = 0; i < count; i++) {
+            Message consumedMessage = consumer.receive();
+            Assert.assertNotNull("should have received message[" + i + "]", consumedMessage);
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testTopicConsumerPrefetchZeroClientAckLoopTimedReceive() throws Exception {
+        ActiveMQTopic consumerDestination = new ActiveMQTopic(getTopicName() + "?consumer.retroactive=true&consumer.prefetchSize=0");
         Session consumerClientAckSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
         consumer = consumerClientAckSession.createConsumer(consumerDestination);
 
         final int count = 10;
-        for (int i=0;i<count;i++) {
-            Message txtMessage = session.createTextMessage("M:"+ i);
+        for (int i = 0; i < count; i++) {
+            Message txtMessage = session.createTextMessage("M:" + i);
             producer.send(txtMessage);
         }
 
-        for (int i=0;i<count;i++) {
+        for (int i = 0; i < count; i++) {
             Message consumedMessage = consumer.receive(2000);
-            Assert.assertNotNull("should have received message[" + i +"]", consumedMessage);
+            Assert.assertNotNull("should have received message[" + i + "]", consumedMessage);
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testTopicConsumerPrefetchZeroClientAckLoopReceiveNoWait() throws Exception {
+        ActiveMQTopic consumerDestination = new ActiveMQTopic(getTopicName() + "?consumer.retroactive=true&consumer.prefetchSize=0");
+        Session consumerClientAckSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        consumer = consumerClientAckSession.createConsumer(consumerDestination);
+
+        final int count = 10;
+        for (int i = 0; i < count; i++) {
+            Message txtMessage = session.createTextMessage("M:" + i);
+            producer.send(txtMessage);
+        }
+
+        for (int i = 0; i < count; i++) {
+            Message consumedMessage = consumer.receiveNoWait();
+            Assert.assertNotNull("should have received message[" + i + "]", consumedMessage);
         }
     }
 
+    @Test(timeout = 60000)
+    public void testTopicConsumerPrefetchZeroConcurrentProduceConsumeAutoAck() throws Exception {
+        doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(Session.AUTO_ACKNOWLEDGE);
+    }
+
+    @Test(timeout = 60000)
+    public void testTopicConsumerPrefetchZeroConcurrentProduceConsumeClientAck() throws Exception {
+        doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(Session.CLIENT_ACKNOWLEDGE);
+    }
+
+    @Test(timeout = 60000)
+    public void testTopicConsumerPrefetchZeroConcurrentProduceConsumeDupsOk() throws Exception {
+        doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(Session.DUPS_OK_ACKNOWLEDGE);
+    }
+
+    @Test(timeout = 60000)
+    public void testTopicConsumerPrefetchZeroConcurrentProduceConsumeTransacted() throws Exception {
+        doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(Session.SESSION_TRANSACTED);
+    }
+
+    @Test(timeout = 60000)
+    public void testTopicConsumerPrefetchZeroConcurrentProduceConsumeTransactedComitInBatches() throws Exception {
+        doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(Session.SESSION_TRANSACTED);
+    }
+
+    @Test(timeout = 60000)
+    public void testTopicConsumerPrefetchZeroConcurrentProduceConsumeIndividual() throws Exception {
+        doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+    }
+
+    private void doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(int ackMode) throws Exception {
+        doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(ackMode, false);
+    }
+
+    private void doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(int ackMode, boolean commitBatch) throws Exception {
+
+        ActiveMQTopic consumerDestination = new ActiveMQTopic(getTopicName() + "?consumer.retroactive=true&consumer.prefetchSize=0");
+        Session consumerSession = connection.createSession(ackMode == Session.SESSION_TRANSACTED, ackMode);
+        consumer = consumerSession.createConsumer(consumerDestination);
+
+        final int MSG_COUNT = 2000;
+
+        final AtomicBoolean error = new AtomicBoolean();
+        final CountDownLatch done = new CountDownLatch(MSG_COUNT);
+
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        executor.execute(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    for (int i = 0; i < MSG_COUNT; i++) {
+                        Message consumedMessage = consumer.receive();
+                        if (consumedMessage != null) {
+                            done.countDown();
+                            consumedMessage.acknowledge();
+                            if (ackMode == Session.SESSION_TRANSACTED && commitBatch && ((i + 1) % 50) == 0) {
+                                consumerSession.commit();
+                            }
+                        }
+                    }
+                } catch (Exception ex) {
+                    LOG.error("Caught exception during receive: {}", ex);
+                    error.set(true);
+                } finally {
+                    if (ackMode == Session.SESSION_TRANSACTED) {
+                        try {
+                            consumerSession.commit();
+                        } catch (JMSException e) {
+                            LOG.error("Caught exception on commit: {}", e);
+                            error.set(true);
+                        }
+                    }
+                }
+            }
+        });
+
+        for (int i = 0; i < MSG_COUNT; i++) {
+            Message txtMessage = session.createTextMessage("M:" + i);
+            producer.send(txtMessage);
+        }
+
+        assertFalse("Should not have gotten any errors", error.get());
+        assertTrue("Should have read all messages", done.await(10, TimeUnit.SECONDS));
+    }
+
     /*
      * test durable topic subscription with prefetch zero
      */
-    @Test(timeout=60000)
+    @Test(timeout = 60000)
     public void testDurableTopicConsumerPrefetchZero() throws Exception {
 
-        ActiveMQTopic consumerDestination = new ActiveMQTopic(TOPIC_NAME + "?consumer.prefetchSize=0");
+        ActiveMQTopic consumerDestination = new ActiveMQTopic(getTopicName() + "?consumer.prefetchSize=0");
         consumer = session.createDurableSubscriber(consumerDestination, "mysub1");
 
         // publish messages