You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2017/09/26 10:30:38 UTC
activemq git commit: [AMQ-6824] - fix up prefetchExtension growth on
transaction completion and delivered ack and tie in boolean
usePrefetchExtension
Repository: activemq
Updated Branches:
refs/heads/master a21dd4052 -> 41a100766
[AMQ-6824] - fix up prefetchExtension growth on transaction completion and delivered ack and tie in boolean usePrefetchExtension
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/41a10076
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/41a10076
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/41a10076
Branch: refs/heads/master
Commit: 41a100766c19655816d575841ba559d33c63313d
Parents: a21dd40
Author: gtully <ga...@gmail.com>
Authored: Tue Sep 26 11:30:18 2017 +0100
Committer: gtully <ga...@gmail.com>
Committed: Tue Sep 26 11:30:18 2017 +0100
----------------------------------------------------------------------
.../broker/region/AbstractSubscription.java | 33 +++++++
.../broker/region/PrefetchSubscription.java | 68 +++------------
.../apache/activemq/broker/region/Queue.java | 2 +
.../broker/region/TopicSubscription.java | 49 +++--------
.../activemq/transport/stomp/StompTest.java | 91 ++++++++++++++++++++
.../org/apache/activemq/JMSConsumerTest.java | 83 ++++++++++++++++++
.../org/apache/activemq/JMSXAConsumerTest.java | 6 ++
.../activemq/usecases/ExpiredMessagesTest.java | 77 ++++++++++++-----
8 files changed, 290 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/41a10076/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
index 9a51a83..fae9c7c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
@@ -330,4 +330,37 @@ public abstract class AbstractSubscription implements Subscription {
public AtomicInteger getPrefetchExtension() {
return this.prefetchExtension;
}
+
+ protected void contractPrefetchExtension(int amount) {
+ if (isUsePrefetchExtension() && getPrefetchSize() != 0) {
+ decrementPrefetchExtension(amount);
+ }
+ }
+
+ protected void expandPrefetchExtension(int amount) {
+ if (isUsePrefetchExtension() && getPrefetchSize() != 0) {
+ incrementPrefetchExtension(amount);
+ }
+ }
+
+ protected void decrementPrefetchExtension(int amount) {
+ while (true) {
+ int currentExtension = prefetchExtension.get();
+ int newExtension = Math.max(0, currentExtension - amount);
+ if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
+ break;
+ }
+ }
+ }
+
+ private void incrementPrefetchExtension(int amount) {
+ while (true) {
+ int currentExtension = prefetchExtension.get();
+ int newExtension = Math.max(currentExtension, currentExtension + amount);
+ if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
+ break;
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/41a10076/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index 314285f..fc68fc1 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -226,6 +226,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
getSubscriptionStatistics().getDequeues().increment();
((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
removeList.add(node);
+ contractPrefetchExtension(1);
} else {
registerRemoveSync(context, node);
}
@@ -258,28 +259,18 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
dispatched.remove(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
+ contractPrefetchExtension(1);
} else {
registerRemoveSync(context, node);
+ expandPrefetchExtension(1);
}
-
- if (isUsePrefetchExtension() && getPrefetchSize() != 0 && ack.isInTransaction()) {
- // allow transaction batch to exceed prefetch
- while (true) {
- int currentExtension = prefetchExtension.get();
- int newExtension = Math.max(currentExtension, currentExtension + 1);
- if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
- break;
- }
- }
- }
-
acknowledge(context, ack, node);
destination = (Destination) node.getRegionDestination();
callDispatchMatched = true;
break;
}
}
- }else if (ack.isDeliveredAck()) {
+ } else if (ack.isDeliveredAck()) {
// Message was delivered but not acknowledged: update pre-fetch
// counters.
int index = 0;
@@ -287,16 +278,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
final MessageReference node = iter.next();
Destination nodeDest = (Destination) node.getRegionDestination();
if (ack.getLastMessageId().equals(node.getMessageId())) {
- if (isUsePrefetchExtension() && getPrefetchSize() != 0) {
- // allow batch to exceed prefetch
- while (true) {
- int currentExtension = prefetchExtension.get();
- int newExtension = Math.max(currentExtension, index + 1);
- if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
- break;
- }
- }
- }
+ expandPrefetchExtension(ack.getMessageCount());
destination = nodeDest;
callDispatchMatched = true;
break;
@@ -327,17 +309,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
nodeDest.getDestinationStatistics().getInflight().decrement();
if (ack.getLastMessageId().equals(messageId)) {
- if (isUsePrefetchExtension() && getPrefetchSize() != 0) {
- // allow batch to exceed prefetch
- while (true) {
- int currentExtension = prefetchExtension.get();
- int newExtension = Math.max(currentExtension, index + 1);
- if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
- break;
- }
- }
- }
-
+ contractPrefetchExtension(1);
destination = (Destination) node.getRegionDestination();
callDispatchMatched = true;
break;
@@ -399,13 +371,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
index++;
acknowledge(context, ack, node);
if (ack.getLastMessageId().equals(messageId)) {
- while (true) {
- int currentExtension = prefetchExtension.get();
- int newExtension = Math.max(0, currentExtension - (index + 1));
- if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
- break;
- }
- }
+ contractPrefetchExtension(1);
destination = nodeDest;
callDispatchMatched = true;
break;
@@ -442,37 +408,23 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
new Synchronization() {
@Override
- public void beforeEnd() {
- if (isUsePrefetchExtension() && getPrefetchSize() != 0) {
- while (true) {
- int currentExtension = prefetchExtension.get();
- int newExtension = Math.max(0, currentExtension - 1);
- if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
- break;
- }
- }
- }
- }
-
- @Override
public void afterCommit()
throws Exception {
Destination nodeDest = (Destination) node.getRegionDestination();
- synchronized(dispatchLock) {
+ synchronized (dispatchLock) {
getSubscriptionStatistics().getDequeues().increment();
dispatched.remove(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
nodeDest.getDestinationStatistics().getInflight().decrement();
}
+ contractPrefetchExtension(1);
nodeDest.wakeup();
dispatchPending();
}
@Override
public void afterRollback() throws Exception {
- synchronized(dispatchLock) {
- // poisionAck will decrement - otherwise still inflight on client
- }
+ contractPrefetchExtension(1);
}
});
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/41a10076/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 49dd5a2..fba78cb 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1681,6 +1681,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
if (!added || browser.atMax()) {
browser.decrementQueueRef();
browserDispatches.remove(browserDispatch);
+ } else {
+ wakeup();
}
} catch (Exception e) {
LOG.warn("exception on dispatch to browser: {}", browserDispatch.getBrowser(), e);
http://git-wip-us.apache.org/repos/asf/activemq/blob/41a10076/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index 65c2ba9..4b958f3 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -281,24 +281,18 @@ public class TopicSubscription extends AbstractSubscription {
throw new JMSException("Poison ack cannot be transacted: " + ack);
}
updateStatsOnAck(context, ack);
- if (getPrefetchSize() != 0) {
- decrementPrefetchExtension(ack.getMessageCount());
- }
+ contractPrefetchExtension(ack.getMessageCount());
} else if (ack.isIndividualAck()) {
updateStatsOnAck(context, ack);
- if (getPrefetchSize() != 0 && ack.isInTransaction()) {
- incrementPrefetchExtension(ack.getMessageCount());
+ if (ack.isInTransaction()) {
+ expandPrefetchExtension(1);
}
} else if (ack.isExpiredAck()) {
updateStatsOnAck(ack);
- if (getPrefetchSize() != 0) {
- incrementPrefetchExtension(ack.getMessageCount());
- }
+ contractPrefetchExtension(ack.getMessageCount());
} else if (ack.isDeliveredAck()) {
// Message was delivered but not acknowledged: update pre-fetch counters.
- if (getPrefetchSize() != 0) {
- incrementPrefetchExtension(ack.getMessageCount());
- }
+ expandPrefetchExtension(ack.getMessageCount());
} else if (ack.isRedeliveredAck()) {
// No processing for redelivered needed
return;
@@ -314,14 +308,13 @@ public class TopicSubscription extends AbstractSubscription {
context.getTransaction().addSynchronization(new Synchronization() {
@Override
- public void beforeEnd() {
- if (getPrefetchSize() != 0) {
- decrementPrefetchExtension(ack.getMessageCount());
- }
+ public void afterRollback() {
+ contractPrefetchExtension(ack.getMessageCount());
}
@Override
public void afterCommit() throws Exception {
+ contractPrefetchExtension(ack.getMessageCount());
updateStatsOnAck(ack);
dispatchMatched();
}
@@ -417,29 +410,9 @@ public class TopicSubscription extends AbstractSubscription {
if (ack.isExpiredAck()) {
destination.getDestinationStatistics().getExpired().add(ack.getMessageCount());
}
- }
- }
- }
-
- private void incrementPrefetchExtension(int amount) {
- if (!isUsePrefetchExtension()) {
- return;
- }
- while (true) {
- int currentExtension = prefetchExtension.get();
- int newExtension = Math.max(0, currentExtension + amount);
- if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
- break;
- }
- }
- }
-
- private void decrementPrefetchExtension(int amount) {
- while (true) {
- int currentExtension = prefetchExtension.get();
- int newExtension = Math.max(0, currentExtension - amount);
- if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
- break;
+ if (!ack.isInTransaction()) {
+ contractPrefetchExtension(1);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/41a10076/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
index c6a50eb..7ded503 100644
--- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
+++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
@@ -28,6 +28,7 @@ import java.io.StringReader;
import java.net.SocketTimeoutException;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -46,10 +47,15 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.ObjectName;
+import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.AbstractSubscription;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.util.Wait;
@@ -2263,6 +2269,91 @@ public class StompTest extends StompTestSupport {
assertEquals(bigBody, sframe.getBody());
}
+ @Test(timeout = 60000)
+ public void testAckInTransactionTopic() throws Exception {
+ doTestAckInTransaction(true);
+ }
+
+ @Test(timeout = 60000)
+ public void testAckInTransactionQueue() throws Exception {
+ doTestAckInTransaction(false);
+ }
+
+ public void doTestAckInTransaction(boolean topic) throws Exception {
+
+ String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
+
+ stompConnection.sendFrame(frame);
+ stompConnection.receive();
+ String destination = (topic ? "/topic" : "/queue") + "/test";
+ stompConnection.subscribe(destination, Stomp.Headers.Subscribe.AckModeValues.CLIENT);
+
+ for (int j = 0; j < 5; j++) {
+
+ for (int i = 0; i < 10; i++) {
+ stompConnection.send(destination , "message" + i);
+ }
+
+ stompConnection.begin("tx"+j);
+
+ for (int i = 0; i < 10; i++) {
+ StompFrame message = stompConnection.receive();
+ stompConnection.ack(message, "tx"+j);
+
+ }
+ stompConnection.commit("tx"+j);
+ }
+
+ List<Subscription> subs = getDestinationConsumers(brokerService,
+ ActiveMQDestination.createDestination("test", topic ? ActiveMQDestination.TOPIC_TYPE : ActiveMQDestination.QUEUE_TYPE));
+
+
+ for (Subscription subscription : subs) {
+ final AbstractSubscription abstractSubscription = (AbstractSubscription) subscription;
+
+ assertTrue("prefetchExtension should be back to Zero after commit", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ LOG.info("ext: " + abstractSubscription.getPrefetchExtension().get());
+ return abstractSubscription.getPrefetchExtension().get() == 0;
+ }
+ }));
+ }
+ }
+
+ public static List<Subscription> getDestinationConsumers(BrokerService broker, ActiveMQDestination destination) {
+ List<Subscription> result = null;
+ org.apache.activemq.broker.region.Destination dest = getDestination(broker, destination);
+ if (dest != null) {
+ result = dest.getConsumers();
+ }
+ return result;
+ }
+
+ public static org.apache.activemq.broker.region.Destination getDestination(BrokerService target, ActiveMQDestination destination) {
+ org.apache.activemq.broker.region.Destination result = null;
+ for (org.apache.activemq.broker.region.Destination dest : getDestinationMap(target, destination).values()) {
+ if (dest.getName().equals(destination.getPhysicalName())) {
+ result = dest;
+ break;
+ }
+ }
+ return result;
+ }
+
+ private static Map<ActiveMQDestination, org.apache.activemq.broker.region.Destination> getDestinationMap(BrokerService target,
+ ActiveMQDestination destination) {
+ RegionBroker regionBroker = (RegionBroker) target.getRegionBroker();
+ if (destination.isTemporary()) {
+ return destination.isQueue() ? regionBroker.getTempQueueRegion().getDestinationMap() :
+ regionBroker.getTempTopicRegion().getDestinationMap();
+ }
+ return destination.isQueue() ?
+ regionBroker.getQueueRegion().getDestinationMap() :
+ regionBroker.getTopicRegion().getDestinationMap();
+ }
+
+
protected SamplePojo createObjectFromJson(String data) throws Exception {
HierarchicalStreamReader in = new JettisonMappedXmlDriver().createReader(new StringReader(data));
return createObject(in);
http://git-wip-us.apache.org/repos/asf/activemq/blob/41a10076/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
index 5ccf1bd..abf9f62 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -42,11 +43,16 @@ import javax.management.ObjectName;
import junit.framework.Test;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.region.QueueSubscription;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.activemq.TestSupport.getDestinationConsumers;
+
/**
* Test cases used to test the JMS message consumer.
*/
@@ -223,6 +229,83 @@ public class JMSConsumerTest extends JmsTestSupport {
message.acknowledge();
}
+ public void testReceiveTopicWithPrefetch1() throws Exception {
+
+ // Set prefetch to 1
+ connection.getPrefetchPolicy().setAll(1);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ destination = createDestination(session, Byte.valueOf(ActiveMQDestination.TOPIC_TYPE));
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Send the messages
+ sendMessages(session, destination, 4);
+
+ // Make sure 4 messages were delivered.
+ Message message = null;
+ for (int i = 0; i < 4; i++) {
+ message = consumer.receive(1000);
+ assertNotNull(message);
+ }
+
+ final List<Subscription> subscriptions = getDestinationConsumers(broker, destination);
+
+ assertTrue("prefetch extension back to 0",
+ subscriptions.stream().
+ filter(s -> s instanceof TopicSubscription).
+ mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()).
+ allMatch(e -> e == 4));
+
+ assertNull(consumer.receiveNoWait());
+ message.acknowledge();
+
+ assertTrue("prefetch extension back to 0",
+ subscriptions.stream().
+ filter(s -> s instanceof TopicSubscription).
+ mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()).
+ allMatch(e -> e == 0));
+
+ }
+
+ public void testReceiveQueueWithPrefetch1() throws Exception {
+
+ // Set prefetch to 1
+ connection.getPrefetchPolicy().setAll(1);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ destination = createDestination(session, Byte.valueOf(ActiveMQDestination.QUEUE_TYPE));
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Send the messages
+ sendMessages(session, destination, 4);
+
+ // Make sure 4 messages were delivered.
+ Message message = null;
+ for (int i = 0; i < 4; i++) {
+ message = consumer.receive(1000);
+ assertNotNull(message);
+ }
+
+ final List<Subscription> subscriptions = getDestinationConsumers(broker, destination);
+
+ assertTrue("prefetch extension..",
+ subscriptions.stream().
+ filter(s -> s instanceof QueueSubscription).
+ mapToInt(s -> ((QueueSubscription)s).getPrefetchExtension().get()).
+ allMatch(e -> e == 4));
+
+ assertNull(consumer.receiveNoWait());
+ message.acknowledge();
+
+ assertTrue("prefetch extension back to 0",
+ subscriptions.stream().
+ filter(s -> s instanceof QueueSubscription).
+ mapToInt(s -> ((QueueSubscription)s).getPrefetchExtension().get()).
+ allMatch(e -> e == 0));
+ }
+
public void initCombosForTestDurableConsumerSelectorChange() {
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
http://git-wip-us.apache.org/repos/asf/activemq/blob/41a10076/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java
index 89fb25b..e50e35f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java
@@ -51,4 +51,10 @@ public class JMSXAConsumerTest extends JMSConsumerTest {
// needs client ack, xa is auto ack if no transaction
public void testExceptionOnClientAckAfterConsumerClose() throws Exception {
}
+
+ public void testReceiveTopicWithPrefetch1() throws Exception {
+ }
+
+ public void testReceiveQueueWithPrefetch1() throws Exception {
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/41a10076/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
index 9b53542..3a7924e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.usecases;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationStatistics;
@@ -98,46 +99,76 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
verifyDestinationDlq(destination, numMessagesToSend, view);
}
- public void testExpiredMessages_onTopic_withPrefetchExtension() throws Exception {
- final ActiveMQDestination destination = new ActiveMQTopic("test");
- final int numMessagesToSend = 10000;
-
+ public void testClientAckInflight_onTopic_withPrefetchExtension() throws Exception {
usePrefetchExtension = true;
+ doTestClientAckInflight_onTopic_checkPrefetchExtension();
+ }
- buildBroker(destination);
-
- verifyMessageExpirationOnDestination(destination, numMessagesToSend);
- // We don't check the DLQ because non-persistent messages on topics are discarded instead.
-
- final List<Subscription> subscriptions = getDestinationConsumers(broker, destination);
-
- assertTrue("prefetch extension was not incremented",
- subscriptions.stream().
- filter(s -> s instanceof TopicSubscription).
- mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()).
- allMatch(e -> e > 0));
+ public void testClientAckInflight_onTopic_withOutPrefetchExtension() throws Exception {
+ usePrefetchExtension = false;
+ doTestClientAckInflight_onTopic_checkPrefetchExtension();
}
- public void testExpiredMessages_onTopic_withoutPrefetchExtension() throws Exception {
+ public void doTestClientAckInflight_onTopic_checkPrefetchExtension() throws Exception {
final ActiveMQDestination destination = new ActiveMQTopic("test");
- final int numMessagesToSend = 10000;
+ buildBroker(destination);
- usePrefetchExtension = false;
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+ "failover://"+brokerUri);
+ ActiveMQPrefetchPolicy prefetchTwo = new ActiveMQPrefetchPolicy();
+ prefetchTwo.setAll(6);
+ factory.setPrefetchPolicy(prefetchTwo);
+ connection = factory.createConnection();
+ connection.start();
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- buildBroker(destination);
+ MessageConsumer consumer = session.createConsumer(destination);
- verifyMessageExpirationOnDestination(destination, numMessagesToSend);
- // We don't check the DLQ because non-persistent messages on topics are discarded instead.
+ produce(10, destination);
+
+ Message m = null;
+ for (int i=0; i<5; i++) {
+ m = consumer.receive(4000);
+ }
+ assertNotNull(m);
final List<Subscription> subscriptions = getDestinationConsumers(broker, destination);
- assertTrue("prefetch extension was incremented",
+ assertTrue("prefetch extension was not incremented",
+ subscriptions.stream().
+ filter(s -> s instanceof TopicSubscription).
+ mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()).
+ allMatch(e -> usePrefetchExtension ? e > 1 : e == 0));
+
+ m.acknowledge();
+
+ assertTrue("prefetch extension was not incremented",
subscriptions.stream().
filter(s -> s instanceof TopicSubscription).
mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()).
allMatch(e -> e == 0));
+
}
+ private void produce(int num, ActiveMQDestination destination) throws Exception {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+ "failover://"+brokerUri);
+ Connection connection = factory.createConnection();
+ connection.start();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producer = session.createProducer(destination);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ int i = 0;
+ while (i++ < num) {
+ Message message = useTextMessage ? session
+ .createTextMessage("test") : session
+ .createObjectMessage("test");
+ producer.send(message);
+ }
+ connection.close();
+ }
+
+
private void buildBroker(ActiveMQDestination destination) throws Exception {
broker = createBroker(deleteAllMessages, usePrefetchExtension, 100, destination);
brokerUri = broker.getTransportConnectors().get(0).getPublishableConnectString();