You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2017/09/26 10:30:38 UTC

activemq git commit: [AMQ-6824] - fix up prefetchExtension growth on transaction completion and delivered ack and tie in boolean usePrefetchExtension

Repository: activemq
Updated Branches:
  refs/heads/master a21dd4052 -> 41a100766


[AMQ-6824] - fix up prefetchExtension growth on transaction completion and delivered ack and tie in boolean usePrefetchExtension


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/41a10076
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/41a10076
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/41a10076

Branch: refs/heads/master
Commit: 41a100766c19655816d575841ba559d33c63313d
Parents: a21dd40
Author: gtully <ga...@gmail.com>
Authored: Tue Sep 26 11:30:18 2017 +0100
Committer: gtully <ga...@gmail.com>
Committed: Tue Sep 26 11:30:18 2017 +0100

----------------------------------------------------------------------
 .../broker/region/AbstractSubscription.java     | 33 +++++++
 .../broker/region/PrefetchSubscription.java     | 68 +++------------
 .../apache/activemq/broker/region/Queue.java    |  2 +
 .../broker/region/TopicSubscription.java        | 49 +++--------
 .../activemq/transport/stomp/StompTest.java     | 91 ++++++++++++++++++++
 .../org/apache/activemq/JMSConsumerTest.java    | 83 ++++++++++++++++++
 .../org/apache/activemq/JMSXAConsumerTest.java  |  6 ++
 .../activemq/usecases/ExpiredMessagesTest.java  | 77 ++++++++++++-----
 8 files changed, 290 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/41a10076/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
index 9a51a83..fae9c7c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
@@ -330,4 +330,37 @@ public abstract class AbstractSubscription implements Subscription {
     public AtomicInteger getPrefetchExtension() {
         return this.prefetchExtension;
     }
+
+    protected void contractPrefetchExtension(int amount) {
+        if (isUsePrefetchExtension() && getPrefetchSize() != 0) {
+            decrementPrefetchExtension(amount);
+        }
+    }
+
+    protected void expandPrefetchExtension(int amount) {
+        if (isUsePrefetchExtension() && getPrefetchSize() != 0) {
+            incrementPrefetchExtension(amount);
+        }
+    }
+
+    protected void decrementPrefetchExtension(int amount) {
+        while (true) {
+            int currentExtension = prefetchExtension.get();
+            int newExtension = Math.max(0, currentExtension - amount);
+            if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
+                break;
+            }
+        }
+    }
+
+    private void incrementPrefetchExtension(int amount) {
+        while (true) {
+            int currentExtension = prefetchExtension.get();
+            int newExtension = Math.max(currentExtension, currentExtension + amount);
+            if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
+                break;
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/41a10076/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index 314285f..fc68fc1 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -226,6 +226,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                             getSubscriptionStatistics().getDequeues().increment();
                             ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
                             removeList.add(node);
+                            contractPrefetchExtension(1);
                         } else {
                             registerRemoveSync(context, node);
                         }
@@ -258,28 +259,18 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                             ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
                             dispatched.remove(node);
                             getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
+                            contractPrefetchExtension(1);
                         } else {
                             registerRemoveSync(context, node);
+                            expandPrefetchExtension(1);
                         }
-
-                        if (isUsePrefetchExtension() && getPrefetchSize() != 0 && ack.isInTransaction()) {
-                            // allow transaction batch to exceed prefetch
-                            while (true) {
-                                int currentExtension = prefetchExtension.get();
-                                int newExtension = Math.max(currentExtension, currentExtension + 1);
-                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
-                                    break;
-                                }
-                            }
-                        }
-
                         acknowledge(context, ack, node);
                         destination = (Destination) node.getRegionDestination();
                         callDispatchMatched = true;
                         break;
                     }
                 }
-            }else if (ack.isDeliveredAck()) {
+            } else if (ack.isDeliveredAck()) {
                 // Message was delivered but not acknowledged: update pre-fetch
                 // counters.
                 int index = 0;
@@ -287,16 +278,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                     final MessageReference node = iter.next();
                     Destination nodeDest = (Destination) node.getRegionDestination();
                     if (ack.getLastMessageId().equals(node.getMessageId())) {
-                        if (isUsePrefetchExtension() && getPrefetchSize() != 0) {
-                            // allow  batch to exceed prefetch
-                            while (true) {
-                                int currentExtension = prefetchExtension.get();
-                                int newExtension = Math.max(currentExtension, index + 1);
-                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
-                                    break;
-                                }
-                            }
-                        }
+                        expandPrefetchExtension(ack.getMessageCount());
                         destination = nodeDest;
                         callDispatchMatched = true;
                         break;
@@ -327,17 +309,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                         nodeDest.getDestinationStatistics().getInflight().decrement();
 
                         if (ack.getLastMessageId().equals(messageId)) {
-                            if (isUsePrefetchExtension() && getPrefetchSize() != 0) {
-                                // allow  batch to exceed prefetch
-                                while (true) {
-                                    int currentExtension = prefetchExtension.get();
-                                    int newExtension = Math.max(currentExtension, index + 1);
-                                    if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
-                                        break;
-                                    }
-                                }
-                            }
-
+                            contractPrefetchExtension(1);
                             destination = (Destination) node.getRegionDestination();
                             callDispatchMatched = true;
                             break;
@@ -399,13 +371,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                         index++;
                         acknowledge(context, ack, node);
                         if (ack.getLastMessageId().equals(messageId)) {
-                            while (true) {
-                                int currentExtension = prefetchExtension.get();
-                                int newExtension = Math.max(0, currentExtension - (index + 1));
-                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
-                                    break;
-                                }
-                            }
+                            contractPrefetchExtension(1);
                             destination = nodeDest;
                             callDispatchMatched = true;
                             break;
@@ -442,37 +408,23 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                 new Synchronization() {
 
                     @Override
-                    public void beforeEnd() {
-                        if (isUsePrefetchExtension() && getPrefetchSize() != 0) {
-                            while (true) {
-                                int currentExtension = prefetchExtension.get();
-                                int newExtension = Math.max(0, currentExtension - 1);
-                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
-                                    break;
-                                }
-                            }
-                        }
-                    }
-
-                    @Override
                     public void afterCommit()
                             throws Exception {
                         Destination nodeDest = (Destination) node.getRegionDestination();
-                        synchronized(dispatchLock) {
+                        synchronized (dispatchLock) {
                             getSubscriptionStatistics().getDequeues().increment();
                             dispatched.remove(node);
                             getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
                             nodeDest.getDestinationStatistics().getInflight().decrement();
                         }
+                        contractPrefetchExtension(1);
                         nodeDest.wakeup();
                         dispatchPending();
                     }
 
                     @Override
                     public void afterRollback() throws Exception {
-                        synchronized(dispatchLock) {
-                            // poisionAck will decrement - otherwise still inflight on client
-                        }
+                        contractPrefetchExtension(1);
                     }
                 });
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/41a10076/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 49dd5a2..fba78cb 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1681,6 +1681,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
                         if (!added || browser.atMax()) {
                             browser.decrementQueueRef();
                             browserDispatches.remove(browserDispatch);
+                        } else {
+                            wakeup();
                         }
                     } catch (Exception e) {
                         LOG.warn("exception on dispatch to browser: {}", browserDispatch.getBrowser(), e);

http://git-wip-us.apache.org/repos/asf/activemq/blob/41a10076/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index 65c2ba9..4b958f3 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -281,24 +281,18 @@ public class TopicSubscription extends AbstractSubscription {
                 throw new JMSException("Poison ack cannot be transacted: " + ack);
             }
             updateStatsOnAck(context, ack);
-            if (getPrefetchSize() != 0) {
-                decrementPrefetchExtension(ack.getMessageCount());
-            }
+            contractPrefetchExtension(ack.getMessageCount());
         } else if (ack.isIndividualAck()) {
             updateStatsOnAck(context, ack);
-            if (getPrefetchSize() != 0 && ack.isInTransaction()) {
-                incrementPrefetchExtension(ack.getMessageCount());
+            if (ack.isInTransaction()) {
+                expandPrefetchExtension(1);
             }
         } else if (ack.isExpiredAck()) {
             updateStatsOnAck(ack);
-            if (getPrefetchSize() != 0) {
-                incrementPrefetchExtension(ack.getMessageCount());
-            }
+            contractPrefetchExtension(ack.getMessageCount());
         } else if (ack.isDeliveredAck()) {
             // Message was delivered but not acknowledged: update pre-fetch counters.
-            if (getPrefetchSize() != 0) {
-                incrementPrefetchExtension(ack.getMessageCount());
-            }
+           expandPrefetchExtension(ack.getMessageCount());
         } else if (ack.isRedeliveredAck()) {
             // No processing for redelivered needed
             return;
@@ -314,14 +308,13 @@ public class TopicSubscription extends AbstractSubscription {
             context.getTransaction().addSynchronization(new Synchronization() {
 
                 @Override
-                public void beforeEnd() {
-                    if (getPrefetchSize() != 0) {
-                        decrementPrefetchExtension(ack.getMessageCount());
-                    }
+                public void afterRollback() {
+                    contractPrefetchExtension(ack.getMessageCount());
                 }
 
                 @Override
                 public void afterCommit() throws Exception {
+                    contractPrefetchExtension(ack.getMessageCount());
                     updateStatsOnAck(ack);
                     dispatchMatched();
                 }
@@ -417,29 +410,9 @@ public class TopicSubscription extends AbstractSubscription {
                 if (ack.isExpiredAck()) {
                     destination.getDestinationStatistics().getExpired().add(ack.getMessageCount());
                 }
-            }
-        }
-    }
-
-    private void incrementPrefetchExtension(int amount) {
-        if (!isUsePrefetchExtension()) {
-            return;
-        }
-        while (true) {
-            int currentExtension = prefetchExtension.get();
-            int newExtension = Math.max(0, currentExtension + amount);
-            if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
-                break;
-            }
-        }
-    }
-
-    private void decrementPrefetchExtension(int amount) {
-        while (true) {
-            int currentExtension = prefetchExtension.get();
-            int newExtension = Math.max(0, currentExtension - amount);
-            if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
-                break;
+                if (!ack.isInTransaction()) {
+                    contractPrefetchExtension(1);
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/41a10076/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
index c6a50eb..7ded503 100644
--- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
+++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
@@ -28,6 +28,7 @@ import java.io.StringReader;
 import java.net.SocketTimeoutException;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -46,10 +47,15 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.management.ObjectName;
 
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.BrokerViewMBean;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.AbstractSubscription;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.util.Wait;
@@ -2263,6 +2269,91 @@ public class StompTest extends StompTestSupport {
         assertEquals(bigBody, sframe.getBody());
     }
 
+    @Test(timeout = 60000)
+    public void testAckInTransactionTopic() throws Exception {
+        doTestAckInTransaction(true);
+    }
+
+    @Test(timeout = 60000)
+    public void testAckInTransactionQueue() throws Exception {
+        doTestAckInTransaction(false);
+    }
+
+    public void doTestAckInTransaction(boolean topic) throws Exception {
+
+        String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
+
+        stompConnection.sendFrame(frame);
+        stompConnection.receive();
+        String destination = (topic ? "/topic" : "/queue") + "/test";
+        stompConnection.subscribe(destination, Stomp.Headers.Subscribe.AckModeValues.CLIENT);
+
+        for (int j = 0; j < 5; j++) {
+
+            for (int i = 0; i < 10; i++) {
+                stompConnection.send(destination , "message" + i);
+            }
+
+            stompConnection.begin("tx"+j);
+
+            for (int i = 0; i < 10; i++) {
+                StompFrame message = stompConnection.receive();
+                stompConnection.ack(message, "tx"+j);
+
+            }
+            stompConnection.commit("tx"+j);
+        }
+
+        List<Subscription> subs = getDestinationConsumers(brokerService,
+                ActiveMQDestination.createDestination("test", topic ? ActiveMQDestination.TOPIC_TYPE : ActiveMQDestination.QUEUE_TYPE));
+
+
+        for (Subscription subscription : subs) {
+            final AbstractSubscription abstractSubscription = (AbstractSubscription) subscription;
+
+            assertTrue("prefetchExtension should be back to Zero after commit", Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    LOG.info("ext: " + abstractSubscription.getPrefetchExtension().get());
+                    return abstractSubscription.getPrefetchExtension().get() == 0;
+                }
+            }));
+        }
+    }
+
+    public static List<Subscription> getDestinationConsumers(BrokerService broker, ActiveMQDestination destination) {
+        List<Subscription> result = null;
+        org.apache.activemq.broker.region.Destination dest = getDestination(broker, destination);
+        if (dest != null) {
+            result = dest.getConsumers();
+        }
+        return result;
+    }
+
+    public static org.apache.activemq.broker.region.Destination getDestination(BrokerService target, ActiveMQDestination destination) {
+        org.apache.activemq.broker.region.Destination result = null;
+        for (org.apache.activemq.broker.region.Destination dest : getDestinationMap(target, destination).values()) {
+            if (dest.getName().equals(destination.getPhysicalName())) {
+                result = dest;
+                break;
+            }
+        }
+        return result;
+    }
+
+    private static Map<ActiveMQDestination, org.apache.activemq.broker.region.Destination> getDestinationMap(BrokerService target,
+                                                                                                             ActiveMQDestination destination) {
+        RegionBroker regionBroker = (RegionBroker) target.getRegionBroker();
+        if (destination.isTemporary()) {
+            return destination.isQueue() ? regionBroker.getTempQueueRegion().getDestinationMap() :
+                    regionBroker.getTempTopicRegion().getDestinationMap();
+        }
+        return destination.isQueue() ?
+                regionBroker.getQueueRegion().getDestinationMap() :
+                regionBroker.getTopicRegion().getDestinationMap();
+    }
+
+
     protected SamplePojo createObjectFromJson(String data) throws Exception {
         HierarchicalStreamReader in = new JettisonMappedXmlDriver().createReader(new StringReader(data));
         return createObject(in);

http://git-wip-us.apache.org/repos/asf/activemq/blob/41a10076/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
index 5ccf1bd..abf9f62 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -42,11 +43,16 @@ import javax.management.ObjectName;
 import junit.framework.Test;
 
 import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.region.QueueSubscription;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.TopicSubscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.activemq.TestSupport.getDestinationConsumers;
+
 /**
  * Test cases used to test the JMS message consumer.
  */
@@ -223,6 +229,83 @@ public class JMSConsumerTest extends JmsTestSupport {
         message.acknowledge();
     }
 
+    public void testReceiveTopicWithPrefetch1() throws Exception {
+
+        // Set prefetch to 1
+        connection.getPrefetchPolicy().setAll(1);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        destination = createDestination(session, Byte.valueOf(ActiveMQDestination.TOPIC_TYPE));
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        sendMessages(session, destination, 4);
+
+        // Make sure 4 messages were delivered.
+        Message message = null;
+        for (int i = 0; i < 4; i++) {
+            message = consumer.receive(1000);
+            assertNotNull(message);
+        }
+
+        final List<Subscription> subscriptions = getDestinationConsumers(broker, destination);
+
+        assertTrue("prefetch extension back to 0",
+                subscriptions.stream().
+                        filter(s -> s instanceof TopicSubscription).
+                        mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()).
+                        allMatch(e -> e == 4));
+
+        assertNull(consumer.receiveNoWait());
+        message.acknowledge();
+
+        assertTrue("prefetch extension back to 0",
+                subscriptions.stream().
+                        filter(s -> s instanceof TopicSubscription).
+                        mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()).
+                        allMatch(e -> e == 0));
+
+    }
+
+    public void testReceiveQueueWithPrefetch1() throws Exception {
+
+        // Set prefetch to 1
+        connection.getPrefetchPolicy().setAll(1);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        destination = createDestination(session, Byte.valueOf(ActiveMQDestination.QUEUE_TYPE));
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        sendMessages(session, destination, 4);
+
+        // Make sure 4 messages were delivered.
+        Message message = null;
+        for (int i = 0; i < 4; i++) {
+            message = consumer.receive(1000);
+            assertNotNull(message);
+        }
+
+        final List<Subscription> subscriptions = getDestinationConsumers(broker, destination);
+
+        assertTrue("prefetch extension..",
+                subscriptions.stream().
+                        filter(s -> s instanceof QueueSubscription).
+                        mapToInt(s -> ((QueueSubscription)s).getPrefetchExtension().get()).
+                        allMatch(e -> e == 4));
+
+        assertNull(consumer.receiveNoWait());
+        message.acknowledge();
+
+        assertTrue("prefetch extension back to 0",
+                subscriptions.stream().
+                        filter(s -> s instanceof QueueSubscription).
+                        mapToInt(s -> ((QueueSubscription)s).getPrefetchExtension().get()).
+                        allMatch(e -> e == 0));
+    }
+
     public void initCombosForTestDurableConsumerSelectorChange() {
         addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
         addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});

http://git-wip-us.apache.org/repos/asf/activemq/blob/41a10076/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java
index 89fb25b..e50e35f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java
@@ -51,4 +51,10 @@ public class JMSXAConsumerTest extends JMSConsumerTest {
     // needs client ack, xa is auto ack if no transaction
     public void testExceptionOnClientAckAfterConsumerClose() throws Exception {
     }
+
+    public void testReceiveTopicWithPrefetch1() throws Exception {
+    }
+
+    public void testReceiveQueueWithPrefetch1() throws Exception {
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/41a10076/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
index 9b53542..3a7924e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.usecases;
 import junit.framework.Test;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.DestinationStatistics;
@@ -98,46 +99,76 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
         verifyDestinationDlq(destination, numMessagesToSend, view);
     }
 
-    public void testExpiredMessages_onTopic_withPrefetchExtension() throws Exception {
-        final ActiveMQDestination destination = new ActiveMQTopic("test");
-        final int numMessagesToSend = 10000;
-
+    public void testClientAckInflight_onTopic_withPrefetchExtension() throws Exception {
         usePrefetchExtension = true;
+        doTestClientAckInflight_onTopic_checkPrefetchExtension();
+    }
 
-        buildBroker(destination);
-
-        verifyMessageExpirationOnDestination(destination, numMessagesToSend);
-        // We don't check the DLQ because non-persistent messages on topics are discarded instead.
-
-        final List<Subscription> subscriptions = getDestinationConsumers(broker, destination);
-
-        assertTrue("prefetch extension was not incremented",
-            subscriptions.stream().
-                filter(s -> s instanceof TopicSubscription).
-                mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()).
-                allMatch(e -> e > 0));
+    public void testClientAckInflight_onTopic_withOutPrefetchExtension() throws Exception {
+        usePrefetchExtension = false;
+        doTestClientAckInflight_onTopic_checkPrefetchExtension();
     }
 
-    public void testExpiredMessages_onTopic_withoutPrefetchExtension() throws Exception {
+    public void doTestClientAckInflight_onTopic_checkPrefetchExtension() throws Exception {
         final ActiveMQDestination destination = new ActiveMQTopic("test");
-        final int numMessagesToSend = 10000;
+        buildBroker(destination);
 
-        usePrefetchExtension = false;
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+                "failover://"+brokerUri);
+        ActiveMQPrefetchPolicy prefetchTwo = new ActiveMQPrefetchPolicy();
+        prefetchTwo.setAll(6);
+        factory.setPrefetchPolicy(prefetchTwo);
+        connection = factory.createConnection();
+        connection.start();
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 
-        buildBroker(destination);
+        MessageConsumer consumer = session.createConsumer(destination);
 
-        verifyMessageExpirationOnDestination(destination, numMessagesToSend);
-        // We don't check the DLQ because non-persistent messages on topics are discarded instead.
+        produce(10, destination);
+
+        Message m = null;
+        for (int i=0; i<5; i++) {
+            m = consumer.receive(4000);
+        }
+        assertNotNull(m);
 
         final List<Subscription> subscriptions = getDestinationConsumers(broker, destination);
 
-        assertTrue("prefetch extension was incremented",
+        assertTrue("prefetch extension was not incremented",
+                subscriptions.stream().
+                        filter(s -> s instanceof TopicSubscription).
+                        mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()).
+                        allMatch(e -> usePrefetchExtension ? e > 1 : e == 0));
+
+        m.acknowledge();
+
+        assertTrue("prefetch extension was not incremented",
                 subscriptions.stream().
                         filter(s -> s instanceof TopicSubscription).
                         mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()).
                         allMatch(e -> e == 0));
+
     }
 
+    private void produce(int num, ActiveMQDestination destination) throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+                "failover://"+brokerUri);
+        Connection connection = factory.createConnection();
+        connection.start();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        int i = 0;
+        while (i++ < num) {
+            Message message = useTextMessage ? session
+                    .createTextMessage("test") : session
+                    .createObjectMessage("test");
+            producer.send(message);
+        }
+        connection.close();
+    }
+
+
     private void buildBroker(ActiveMQDestination destination) throws Exception {
         broker = createBroker(deleteAllMessages, usePrefetchExtension, 100, destination);
         brokerUri = broker.getTransportConnectors().get(0).getPublishableConnectString();