You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/09/27 16:46:24 UTC
[12/14] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-6422 - match proton sender view
credit to prefetchExtension - tracking credit to dispatch delta to track
additional flow requests. Proton sender layer is distinct from the transport
l
https://issues.apache.org/jira/browse/AMQ-6422 - match proton sender view credit to prefetchExtension - tracking credit to dispatch delta to track additional flow requests. Proton sender layer is distinct from the transport layer - they mirror each other
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ebbb7ab4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ebbb7ab4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ebbb7ab4
Branch: refs/heads/activemq-5.14.x
Commit: ebbb7ab437b3023be5afe731a0015fec58a51d57
Parents: 0bb76c7
Author: gtully <ga...@gmail.com>
Authored: Wed Sep 21 10:33:20 2016 +0100
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Sep 27 12:15:50 2016 -0400
----------------------------------------------------------------------
.../transport/amqp/protocol/AmqpSender.java | 91 ++++++++++----------
.../amqp/JMSClientTransactionTest.java | 6 --
.../amqp/interop/AmqpSendReceiveTest.java | 8 +-
.../activemq/broker/region/AbstractRegion.java | 2 +-
.../broker/region/AbstractSubscription.java | 12 +++
.../broker/region/PrefetchSubscription.java | 10 +--
.../broker/region/QueueSubscription.java | 2 +-
.../broker/region/TopicSubscription.java | 21 ++++-
.../TopicSubscriptionZeroPrefetchTest.java | 19 ++++
9 files changed, 103 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/ebbb7ab4/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 0b85858..75f2371 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -20,8 +20,9 @@ import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
import java.io.IOException;
import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.AbstractSubscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerControl;
@@ -81,7 +82,10 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT";
private final ConsumerInfo consumerInfo;
- private Subscription subscription;
+ private AbstractSubscription subscription;
+ private AtomicInteger prefetchExtension;
+ private int currentCreditRequest;
+ private int logicalDeliveryCount; // echoes prefetch extension but from protons perspective
private final boolean presettle;
private boolean draining;
@@ -111,7 +115,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
public void open() {
if (!isClosed()) {
session.registerSender(getConsumerId(), this);
- subscription = session.getConnection().lookupPrefetchSubscription(consumerInfo);
+ subscription = (AbstractSubscription)session.getConnection().lookupPrefetchSubscription(consumerInfo);
+ prefetchExtension = subscription.getPrefetchExtension();
}
super.open();
@@ -168,24 +173,15 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
public void flow() throws Exception {
Link endpoint = getEndpoint();
if (LOG.isTraceEnabled()) {
- LOG.trace("Flow: draining={}, drain={} credit={}, remoteCredit={}, queued={}, unsettled={}",
+ LOG.trace("Flow: draining={}, drain={} credit={}, currentCredit={}, senderDeliveryCount={} - Sub={}",
draining, endpoint.getDrain(),
- endpoint.getCredit(), endpoint.getRemoteCredit(), endpoint.getQueued(), endpoint.getUnsettled());
+ endpoint.getCredit(), currentCreditRequest, logicalDeliveryCount, subscription);
}
+ final int endpointCredit = endpoint.getCredit();
if (endpoint.getDrain() && !draining) {
- // Revert to a pull consumer.
- ConsumerControl control = new ConsumerControl();
- control.setConsumerId(getConsumerId());
- control.setDestination(getDestination());
- control.setPrefetch(0);
-
- LOG.trace("Flow: Pull case -> consumer control with prefetch (0) to control output");
-
- sendToActiveMQ(control);
-
- if (endpoint.getCredit() > 0) {
+ if (endpointCredit > 0) {
draining = true;
// Now request dispatch of the drain amount, we request immediate
@@ -196,9 +192,9 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
pullRequest.setDestination(getDestination());
pullRequest.setTimeout(-1);
pullRequest.setAlwaysSignalDone(true);
- pullRequest.setQuantity(endpoint.getCredit());
+ pullRequest.setQuantity(endpointCredit);
- LOG.trace("Pull case -> consumer pull request quantity = {}", endpoint.getCredit());
+ LOG.trace("Pull case -> consumer pull request quantity = {}", endpointCredit);
sendToActiveMQ(pullRequest);
} else {
@@ -207,25 +203,36 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
pumpOutbound();
getEndpoint().drained();
session.pumpProtonToSocket();
+ currentCreditRequest = 0;
+ logicalDeliveryCount = 0;
}
- } else {
- ConsumerControl control = new ConsumerControl();
- control.setConsumerId(getConsumerId());
- control.setDestination(getDestination());
-
- int remoteCredit = endpoint.getRemoteCredit();
- if (remoteCredit > 0 && subscription != null) {
- // ensure prefetch exceeds credit + inflight
- if (remoteCredit + endpoint.getUnsettled() + endpoint.getQueued() > subscription.getPrefetchSize()) {
- LOG.trace("Adding dispatched size to credit for sub: " + subscription);
- remoteCredit += subscription.getDispatchedQueueSize();
- }
- }
- control.setPrefetch(remoteCredit);
+ } else if (endpointCredit >= 0) {
+
+ if (endpointCredit == 0 && currentCreditRequest != 0) {
+
+ prefetchExtension.set(0);
+ currentCreditRequest = 0;
+ logicalDeliveryCount = 0;
+ LOG.trace("Flow: credit 0 for sub:" + subscription);
- LOG.trace("Flow: update -> consumer control with prefetch {}", control.getPrefetch());
+ } else {
- sendToActiveMQ(control);
+ int deltaToAdd = endpointCredit;
+ int logicalCredit = currentCreditRequest - logicalDeliveryCount;
+ if (logicalCredit > 0) {
+ deltaToAdd -= logicalCredit;
+ } else {
+ // reset delivery counter - dispatch from broker concurrent with credit=0 flow can go negative
+ logicalDeliveryCount = 0;
+ }
+ if (deltaToAdd > 0) {
+ currentCreditRequest = prefetchExtension.addAndGet(deltaToAdd);
+ subscription.wakeupDestinationsForDispatch();
+ // force dispatch of matched/pending for topics (pending messages accumulate in the sub and are dispatched on update of prefetch)
+ subscription.setPrefetchSize(0);
+ LOG.trace("Flow: credit addition of {} for sub {}", deltaToAdd, subscription);
+ }
+ }
}
}
@@ -285,6 +292,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
}
pumpOutbound();
+ logicalDeliveryCount++;
}
@Override
@@ -440,6 +448,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
// It's the end of browse signal in response to a MessagePull
getEndpoint().drained();
draining = false;
+ currentCreditRequest = 0;
+ logicalDeliveryCount = 0;
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Sender:[{}] msgId={} draining={}, drain={}, credit={}, remoteCredit={}, queued={}",
@@ -451,6 +461,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
LOG.trace("Sender:[{}] browse complete.", getEndpoint().getName());
getEndpoint().drained();
draining = false;
+ currentCreditRequest = 0;
+ logicalDeliveryCount = 0;
}
jms.setRedeliveryCounter(md.getRedeliveryCounter());
@@ -481,17 +493,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
tagCache.returnTag(tag);
}
- int newCredit = Math.max(0, getEndpoint().getCredit() - 1);
- LOG.trace("Sender:[{}] updating conumser prefetch:{} after delivery settled.",
- getEndpoint().getName(), newCredit);
-
- ConsumerControl control = new ConsumerControl();
- control.setConsumerId(getConsumerId());
- control.setDestination(getDestination());
- control.setPrefetch(newCredit);
-
- sendToActiveMQ(control);
-
if (ackType == -1) {
// we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ
delivery.settle();
http://git-wip-us.apache.org/repos/asf/activemq/blob/ebbb7ab4/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
index 1251410..f481ba9 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
@@ -193,8 +193,6 @@ public class JMSClientTransactionTest extends JMSClientTestSupport {
assertEquals(MSG_COUNT, getProxyToQueue(getDestinationName()).getQueueSize());
SubscriptionViewMBean subscription = getProxyToQueueSubscriber(getDestinationName());
assertNotNull(subscription);
- LOG.info("Subscription[{}]: prefetch size after rollback = {}", subscription.getSubscriptionId(), subscription.getPrefetchSize());
- assertTrue(subscription.getPrefetchSize() > 0);
for (int i = 1; i <= MSG_COUNT; i++) {
LOG.info("Trying to receive message: {}", i);
@@ -259,8 +257,6 @@ public class JMSClientTransactionTest extends JMSClientTestSupport {
assertEquals(MSG_COUNT, getProxyToQueue(getDestinationName()).getQueueSize());
SubscriptionViewMBean subscription = getProxyToQueueSubscriber(getDestinationName());
assertNotNull(subscription);
- LOG.info("Subscription[{}]: prefetch size after rollback = {}", subscription.getSubscriptionId(), subscription.getPrefetchSize());
- assertTrue(subscription.getPrefetchSize() > 0);
assertTrue("Should read all " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition() {
@@ -273,7 +269,6 @@ public class JMSClientTransactionTest extends JMSClientTestSupport {
LOG.info("COMMIT of first received batch here:");
session.commit();
- assertTrue(subscription.getPrefetchSize() > 0);
for (int i = 1; i <= MSG_COUNT; i++) {
LOG.info("Sending message: {} to commit", msgIndex++);
TextMessage message = session.createTextMessage("Commit Message: " + msgIndex);
@@ -286,7 +281,6 @@ public class JMSClientTransactionTest extends JMSClientTestSupport {
LOG.info("WAITING -> for next three messages to arrive:");
- assertTrue(subscription.getPrefetchSize() > 0);
assertTrue("Should read all " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition() {
@Override
http://git-wip-us.apache.org/repos/asf/activemq/blob/ebbb7ab4/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
index 3132e6e..34436f2 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
@@ -294,7 +294,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
receiver2.flow(splitCredit);
for (int i = 0; i < splitCredit; i++) {
AmqpMessage message = receiver2.receive(10, TimeUnit.SECONDS);
- assertNotNull("Receiver #2 should have read a message", message);
+ assertNotNull("Receiver #2 should have read message[" + i + "]", message);
LOG.info("Receiver #2 read message: {}", message.getMessageId());
message.accept();
}
@@ -671,7 +671,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
LOG.info("*** Attempting to read remaining messages with both receivers");
int splitCredit = (MSG_COUNT - 4) / 2;
- LOG.info("**** Receiver #1 granting creadit[{}] for its block of messages", splitCredit);
+ LOG.info("**** Receiver #1 granting credit[{}] for its block of messages", splitCredit);
receiver1.flow(splitCredit);
for (int i = 0; i < splitCredit; i++) {
AmqpMessage message = receiver1.receive(10, TimeUnit.SECONDS);
@@ -680,11 +680,11 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
message.accept();
}
- LOG.info("**** Receiver #2 granting creadit[{}] for its block of messages", splitCredit);
+ LOG.info("**** Receiver #2 granting credit[{}] for its block of messages", splitCredit);
receiver2.flow(splitCredit);
for (int i = 0; i < splitCredit; i++) {
AmqpMessage message = receiver2.receive(10, TimeUnit.SECONDS);
- assertNotNull("Receiver #2 should have read a message", message);
+ assertNotNull("Receiver #2 should have read a message[" + i + "]", message);
LOG.info("Receiver #2 read message: {}", message.getMessageId());
message.accept();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/ebbb7ab4/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
index be77b6e..13251c8 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
@@ -688,7 +688,7 @@ public abstract class AbstractRegion implements Region {
entry.configurePrefetch(sub);
}
}
- LOG.debug("setting prefetch: {}, on subscription: {}; resulting value: {}", new Object[]{ control.getPrefetch(), control.getConsumerId(), sub.getConsumerInfo().getCurrentPrefetchSize()});
+ LOG.debug("setting prefetch: {}, on subscription: {}; resulting value: {}", new Object[]{ control.getPrefetch(), control.getConsumerId(), sub.getConsumerInfo().getPrefetchSize()});
try {
lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/ebbb7ab4/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
index 1d84269..3cb2f1f 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
@@ -49,6 +50,7 @@ public abstract class AbstractSubscription implements Subscription {
protected ConsumerInfo info;
protected final DestinationFilter destinationFilter;
protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>();
+ protected final AtomicInteger prefetchExtension = new AtomicInteger(0);
private BooleanExpression selectorExpression;
private ObjectName objectName;
@@ -309,4 +311,14 @@ public abstract class AbstractSubscription implements Subscription {
public SubscriptionStatistics getSubscriptionStatistics() {
return subscriptionStatistics;
}
+
+ public void wakeupDestinationsForDispatch() {
+ for (Destination dest : destinations) {
+ dest.wakeup();
+ }
+ }
+
+ public AtomicInteger getPrefetchExtension() {
+ return this.prefetchExtension;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/ebbb7ab4/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index 5254440..0a277fb 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -23,7 +23,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.JMSException;
@@ -57,7 +56,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
protected PendingMessageCursor pending;
protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
- protected final AtomicInteger prefetchExtension = new AtomicInteger();
protected boolean usePrefetchExtension = true;
private int maxProducersToAudit=32;
private int maxAuditDepth=2048;
@@ -431,9 +429,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
dispatchPending();
if (pending.isEmpty()) {
- for (Destination dest : destinations) {
- dest.wakeup();
- }
+ wakeupDestinationsForDispatch();
}
} else {
LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): {}", ack);
@@ -904,10 +900,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
this.usePrefetchExtension = usePrefetchExtension;
}
- protected int getPrefetchExtension() {
- return this.prefetchExtension.get();
- }
-
@Override
public void setPrefetchSize(int prefetchSize) {
this.info.setPrefetchSize(prefetchSize);
http://git-wip-us.apache.org/repos/asf/activemq/blob/ebbb7ab4/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
index 358f946..6e865ec 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
@@ -69,7 +69,7 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
@Override
public synchronized String toString() {
return "QueueSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + dispatched.size() + ", delivered="
- + this.prefetchExtension + ", pending=" + getPendingQueueSize();
+ + this.prefetchExtension + ", pending=" + getPendingQueueSize() + ", prefetch=" + getPrefetchSize() + ", prefetchExtension=" + prefetchExtension.get();
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq/blob/ebbb7ab4/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index eff2393..6ab264d 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -64,7 +64,6 @@ public class TopicSubscription extends AbstractSubscription {
private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
private int discarded;
private final Object matchedListMutex = new Object();
- private final AtomicInteger prefetchExtension = new AtomicInteger(0);
private int memoryUsageHighWaterMark = 95;
// allow duplicate suppression in a ring network of brokers
protected int maxProducersToAudit = 1024;
@@ -410,6 +409,16 @@ public class TopicSubscription extends AbstractSubscription {
}
}
+ private void decrementPrefetchExtension() {
+ while (true) {
+ int currentExtension = prefetchExtension.get();
+ int newExtension = Math.max(0, currentExtension - 1);
+ if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
+ break;
+ }
+ }
+ }
+
@Override
public int countBeforeFull() {
return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - getDispatchedQueueSize();
@@ -529,6 +538,9 @@ public class TopicSubscription extends AbstractSubscription {
// -------------------------------------------------------------------------
@Override
public boolean isFull() {
+ if (info.getPrefetchSize() == 0) {
+ return prefetchExtension.get() == 0;
+ }
return getDispatchedQueueSize() >= info.getPrefetchSize();
}
@@ -655,6 +667,11 @@ public class TopicSubscription extends AbstractSubscription {
}
}
}
+
+ if (getPrefetchSize() == 0) {
+ decrementPrefetchExtension();
+ }
+
}
if (info.isDispatchAsync()) {
if (node != null) {
@@ -712,7 +729,7 @@ public class TopicSubscription extends AbstractSubscription {
@Override
public String toString() {
return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
- + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded();
+ + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded() + ", prefetchExtension=" + prefetchExtension.get();
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq/blob/ebbb7ab4/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
index b9f0d50..38fa921 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
@@ -22,6 +22,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
@@ -75,6 +76,24 @@ public class TopicSubscriptionZeroPrefetchTest {
Assert.assertNotNull("should have received a message the published message", consumedMessage);
}
+ @Test(timeout=60000)
+ public void testTopicConsumerPrefetchZeroClientAckLoop() throws Exception {
+ ActiveMQTopic consumerDestination = new ActiveMQTopic(TOPIC_NAME + "?consumer.retroactive=true&consumer.prefetchSize=0");
+ Session consumerClientAckSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ consumer = consumerClientAckSession.createConsumer(consumerDestination);
+
+ final int count = 10;
+ for (int i=0;i<count;i++) {
+ Message txtMessage = session.createTextMessage("M:"+ i);
+ producer.send(txtMessage);
+ }
+
+ for (int i=0;i<count;i++) {
+ Message consumedMessage = consumer.receive(2000);
+ Assert.assertNotNull("should have received message[" + i +"]", consumedMessage);
+ }
+ }
+
/*
* test durable topic subscription with prefetch zero
*/