You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/11/18 16:59:22 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6500
Repository: activemq
Updated Branches:
refs/heads/master 253726674 -> e050519ff
https://issues.apache.org/jira/browse/AMQ-6500
Better handle prefetch extension and pull consumers over Topics to avoid
the remote not receiving all the messages available based on the credit
it has issued.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e050519f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e050519f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e050519f
Branch: refs/heads/master
Commit: e050519ff6ae8079c5183f6e6372ddb3d03e91c7
Parents: 2537266
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Nov 18 11:59:01 2016 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Nov 18 11:59:01 2016 -0500
----------------------------------------------------------------------
.../activemq/transport/amqp/JMSClientTest.java | 77 +++++++-
.../amqp/interop/AmqpReceiverDrainTest.java | 148 +++++++++++++---
.../amqp/interop/AmqpReceiverTest.java | 43 ++++-
.../amqp/interop/AmqpSendReceiveTest.java | 107 +++++++++++-
.../broker/region/TopicSubscription.java | 98 ++++++-----
.../TopicSubscriptionZeroPrefetchTest.java | 174 +++++++++++++++++--
6 files changed, 546 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/e050519f/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index 3583656..97ce106 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -230,7 +230,7 @@ public class JMSClientTest extends JMSClientTestSupport {
assertEquals(totalCount, proxy.getQueueSize());
// Consume again..check we receive all the messages.
- Set<Integer> messageNumbers = new HashSet<Integer>();
+ Set<Integer> messageNumbers = new HashSet<>();
for (int i = 1; i <= totalCount; i++) {
messageNumbers.add(i);
}
@@ -644,7 +644,7 @@ public class JMSClientTest extends JMSClientTestSupport {
public void testDurableConsumerAsync() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
final CountDownLatch latch = new CountDownLatch(1);
- final AtomicReference<Message> received = new AtomicReference<Message>();
+ final AtomicReference<Message> received = new AtomicReference<>();
String durableClientId = getDestinationName() + "-ClientId";
connection = createConnection(durableClientId);
@@ -693,7 +693,7 @@ public class JMSClientTest extends JMSClientTestSupport {
message.setText("hello");
producer.send(message);
- final AtomicReference<Message> msg = new AtomicReference<Message>();
+ final AtomicReference<Message> msg = new AtomicReference<>();
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
@@ -712,7 +712,7 @@ public class JMSClientTest extends JMSClientTestSupport {
public void testTopicConsumerAsync() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
final CountDownLatch latch = new CountDownLatch(1);
- final AtomicReference<Message> received = new AtomicReference<Message>();
+ final AtomicReference<Message> received = new AtomicReference<>();
connection = createConnection();
{
@@ -760,7 +760,7 @@ public class JMSClientTest extends JMSClientTestSupport {
message.setText("hello");
producer.send(message);
- final AtomicReference<Message> msg = new AtomicReference<Message>();
+ final AtomicReference<Message> msg = new AtomicReference<>();
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
@@ -782,7 +782,7 @@ public class JMSClientTest extends JMSClientTestSupport {
final ConnectorViewMBean connector = getProxyToConnectionView(getTargetConnectorName());
LOG.info("Current number of Connections is: {}", connector.connectionCount());
- ArrayList<Connection> connections = new ArrayList<Connection>();
+ ArrayList<Connection> connections = new ArrayList<>();
for (int i = 0; i < 10; i++) {
connections.add(createConnection(null));
@@ -1265,4 +1265,69 @@ public class JMSClientTest extends JMSClientTestSupport {
assertFalse(message.getJMSRedelivered());
}
}
+
+ @Test(timeout = 30000)
+ public void testProduceAndConsumeLargeNumbersOfTopicMessagesClientAck() throws Exception {
+ doTestProduceAndConsumeLargeNumbersOfMessages(true, Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ @Test(timeout = 30000)
+ public void testProduceAndConsumeLargeNumbersOfQueueMessagesClientAck() throws Exception {
+ doTestProduceAndConsumeLargeNumbersOfMessages(false, Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ @Test(timeout = 30000)
+ public void testProduceAndConsumeLargeNumbersOfTopicMessagesAutoAck() throws Exception {
+ doTestProduceAndConsumeLargeNumbersOfMessages(true, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ @Test(timeout = 30000)
+ public void testProduceAndConsumeLargeNumbersOfQueueMessagesAutoAck() throws Exception {
+ doTestProduceAndConsumeLargeNumbersOfMessages(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void doTestProduceAndConsumeLargeNumbersOfMessages(boolean topic, int ackMode) throws Exception {
+
+ final int MSG_COUNT = 1000;
+ final CountDownLatch done = new CountDownLatch(MSG_COUNT);
+
+ JmsConnectionFactory factory = new JmsConnectionFactory(getAmqpURI());
+ factory.setForceSyncSend(true);
+
+ connection = factory.createConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, ackMode);
+ final Destination destination;
+ if (topic) {
+ destination = session.createTopic(getDestinationName());
+ } else {
+ destination = session.createQueue(getDestinationName());
+ }
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ consumer.setMessageListener(new MessageListener() {
+
+ @Override
+ public void onMessage(Message message) {
+ try {
+ message.acknowledge();
+ done.countDown();
+ } catch (JMSException ex) {
+ LOG.info("Caught exception.", ex);
+ }
+ }
+ });
+
+ MessageProducer producer = session.createProducer(destination);
+
+ TextMessage textMessage = session.createTextMessage();
+ textMessage.setText("messageText");
+
+ for (int i = 0; i < MSG_COUNT; i++) {
+ producer.send(textMessage);
+ }
+
+ assertTrue("Did not receive all messages: " + MSG_COUNT, done.await(15, TimeUnit.SECONDS));
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/e050519f/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverDrainTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverDrainTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverDrainTest.java
index aa74100..993f26b 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverDrainTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverDrainTest.java
@@ -22,7 +22,9 @@ import static org.junit.Assert.assertNull;
import java.util.concurrent.TimeUnit;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
@@ -37,47 +39,93 @@ import org.junit.Test;
public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
- public void testReceiverCanDrainMessages() throws Exception {
+ public void testReceiverCanDrainMessagesQueue() throws Exception {
+ doTestReceiverCanDrainMessages(false);
+ }
+
+ @Test(timeout = 60000)
+ public void testReceiverCanDrainMessagesTopic() throws Exception {
+ doTestReceiverCanDrainMessages(true);
+ }
+
+ private void doTestReceiverCanDrainMessages(boolean topic) throws Exception {
+ final String destinationName;
+ if (topic) {
+ destinationName = "topic://" + getTestName();
+ } else {
+ destinationName = "queue://" + getTestName();
+ }
+
int MSG_COUNT = 20;
- sendMessages(getTestName(), MSG_COUNT, false);
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+ AmqpReceiver receiver = session.createReceiver(destinationName);
+
+ sendMessages(getTestName(), MSG_COUNT, topic);
+
+ final DestinationViewMBean destinationView;
+ if (topic) {
+ destinationView = getProxyToTopic(getTestName());
+ } else {
+ destinationView = getProxyToQueue(getTestName());
+ }
- QueueViewMBean queueView = getProxyToQueue(getTestName());
- assertEquals(MSG_COUNT, queueView.getQueueSize());
- assertEquals(0, queueView.getDispatchCount());
+ assertEquals(MSG_COUNT, destinationView.getEnqueueCount());
+ assertEquals(0, destinationView.getDispatchCount());
receiver.drain(MSG_COUNT);
for (int i = 0; i < MSG_COUNT; ++i) {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
- assertNotNull(message);
+ assertNotNull("Failed to read message: " + (i + 1), message);
+ LOG.info("Read message: {}", message.getMessageId());
message.accept();
}
receiver.close();
- assertEquals(0, queueView.getQueueSize());
+ assertEquals(MSG_COUNT, destinationView.getDequeueCount());
connection.close();
}
@Test(timeout = 60000)
- public void testPullWithNoMessageGetDrained() throws Exception {
+ public void testPullWithNoMessageGetDrainedQueue() throws Exception {
+ doTestPullWithNoMessageGetDrained(false);
+ }
+
+ @Test(timeout = 60000)
+ public void testPullWithNoMessageGetDrainedTopic() throws Exception {
+ doTestPullWithNoMessageGetDrained(true);
+ }
+
+ private void doTestPullWithNoMessageGetDrained(boolean topic) throws Exception {
+
+ final String destinationName;
+ if (topic) {
+ destinationName = "topic://" + getTestName();
+ } else {
+ destinationName = "queue://" + getTestName();
+ }
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+ AmqpReceiver receiver = session.createReceiver(destinationName);
receiver.flow(10);
- QueueViewMBean queueView = getProxyToQueue(getTestName());
- assertEquals(0, queueView.getQueueSize());
- assertEquals(0, queueView.getDispatchCount());
+ final DestinationViewMBean destinationView;
+ if (topic) {
+ destinationView = getProxyToTopic(getTestName());
+ } else {
+ destinationView = getProxyToQueue(getTestName());
+ }
+
+ assertEquals(0, destinationView.getEnqueueCount());
+ assertEquals(0, destinationView.getDispatchCount());
assertEquals(10, receiver.getReceiver().getRemoteCredit());
@@ -89,19 +137,42 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
}
@Test(timeout = 60000)
- public void testPullOneFromRemote() throws Exception {
- int MSG_COUNT = 20;
- sendMessages(getTestName(), MSG_COUNT, false);
+ public void testPullOneFromRemoteQueue() throws Exception {
+ doTestPullOneFromRemote(false);
+ }
+
+ @Test(timeout = 60000)
+ public void testPullOneFromRemoteTopic() throws Exception {
+ doTestPullOneFromRemote(true);
+ }
+
+ private void doTestPullOneFromRemote(boolean topic) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+ final String destinationName;
+ if (topic) {
+ destinationName = "topic://" + getTestName();
+ } else {
+ destinationName = "queue://" + getTestName();
+ }
+
+ AmqpReceiver receiver = session.createReceiver(destinationName);
+
+ int MSG_COUNT = 20;
+ sendMessages(getTestName(), MSG_COUNT, topic);
+
+ final DestinationViewMBean destinationView;
+ if (topic) {
+ destinationView = getProxyToTopic(getTestName());
+ } else {
+ destinationView = getProxyToQueue(getTestName());
+ }
- QueueViewMBean queueView = getProxyToQueue(getTestName());
- assertEquals(MSG_COUNT, queueView.getQueueSize());
- assertEquals(0, queueView.getDispatchCount());
+ assertEquals(MSG_COUNT, destinationView.getEnqueueCount());
+ assertEquals(0, destinationView.getDispatchCount());
assertEquals(0, receiver.getReceiver().getRemoteCredit());
@@ -113,25 +184,48 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
receiver.close();
- assertEquals(MSG_COUNT - 1, queueView.getQueueSize());
- assertEquals(1, queueView.getDispatchCount());
+ assertEquals(MSG_COUNT - 1, destinationView.getEnqueueCount() - destinationView.getDequeueCount());
+ assertEquals(1, destinationView.getDispatchCount());
connection.close();
}
@Test(timeout = 60000)
- public void testMultipleZeroResultPulls() throws Exception {
+ public void testMultipleZeroResultPullsQueue() throws Exception {
+ doTestMultipleZeroResultPulls(false);
+ }
+
+ @Test(timeout = 60000)
+ public void testMultipleZeroResultPullsTopic() throws Exception {
+ doTestMultipleZeroResultPulls(true);
+ }
+
+ private void doTestMultipleZeroResultPulls(boolean topic) throws Exception {
+
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+ final String destinationName;
+ if (topic) {
+ destinationName = "topic://" + getTestName();
+ } else {
+ destinationName = "queue://" + getTestName();
+ }
+
+ AmqpReceiver receiver = session.createReceiver(destinationName);
receiver.flow(10);
- QueueViewMBean queueView = getProxyToQueue(getTestName());
- assertEquals(0, queueView.getQueueSize());
- assertEquals(0, queueView.getDispatchCount());
+ if (topic) {
+ TopicViewMBean topicView = getProxyToTopic(getTestName());
+ assertEquals(0, topicView.getEnqueueCount());
+ assertEquals(0, topicView.getDispatchCount());
+ } else {
+ QueueViewMBean queueView = getProxyToQueue(getTestName());
+ assertEquals(0, queueView.getQueueSize());
+ assertEquals(0, queueView.getDispatchCount());
+ }
assertEquals(10, receiver.getReceiver().getRemoteCredit());
http://git-wip-us.apache.org/repos/asf/activemq/blob/e050519f/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
index 08df785..f60af7b 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.junit.ActiveMQTestRunner;
import org.apache.activemq.junit.Repeat;
@@ -268,19 +269,43 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
@Repeat(repetitions = 1)
- public void testPresettledReceiverReadsAllMessagesInNonFlowBatch() throws Exception {
+ public void testPresettledReceiverReadsAllMessagesInNonFlowBatchQueue() throws Exception {
+ doTestPresettledReceiverReadsAllMessagesInNonFlowBatch(false);
+ }
+
+ @Test(timeout = 60000)
+ @Repeat(repetitions = 1)
+ public void testPresettledReceiverReadsAllMessagesInNonFlowBatchTopic() throws Exception {
+ doTestPresettledReceiverReadsAllMessagesInNonFlowBatch(true);
+ }
+
+ private void doTestPresettledReceiverReadsAllMessagesInNonFlowBatch(boolean topic) throws Exception {
+
+ final String destinationName;
+ if (topic) {
+ destinationName = "topic://" + getTestName();
+ } else {
+ destinationName = "queue://" + getTestName();
+ }
+
final int MSG_COUNT = 100;
- sendMessages(getTestName(), MSG_COUNT, false);
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), null, false, true);
+ AmqpReceiver receiver = session.createReceiver(destinationName, null, false, true);
- QueueViewMBean queueView = getProxyToQueue(getTestName());
- assertEquals(MSG_COUNT, queueView.getQueueSize());
- assertEquals(0, queueView.getDispatchCount());
+ sendMessages(getTestName(), MSG_COUNT, topic);
+
+ final DestinationViewMBean destinationView;
+ if (topic) {
+ destinationView = getProxyToTopic(getTestName());
+ } else {
+ destinationView = getProxyToQueue(getTestName());
+ }
+ assertEquals(MSG_COUNT, destinationView.getEnqueueCount());
+ assertEquals(0, destinationView.getDispatchCount());
receiver.flow(20);
// consume less that flow
@@ -302,7 +327,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
receiver.close();
- assertEquals(0, queueView.getQueueSize());
+ assertEquals(0, destinationView.getEnqueueCount() - destinationView.getDequeueCount());
connection.close();
}
@@ -481,7 +506,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
}
});
- Map<Symbol, DescribedType> filters = new HashMap<Symbol, DescribedType>();
+ Map<Symbol, DescribedType> filters = new HashMap<>();
filters.put(AmqpUnknownFilterType.UNKNOWN_FILTER_NAME, AmqpUnknownFilterType.UNKOWN_FILTER);
Source source = new Source();
http://git-wip-us.apache.org/repos/asf/activemq/blob/e050519f/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
index ef3f27d..cb60e54 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Queue;
import javax.jms.Topic;
@@ -60,13 +61,30 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(AmqpSendReceiveTest.class);
@Test(timeout = 60000)
- public void testSimpleSendOneReceiveOne() throws Exception {
+ public void testSimpleSendOneReceiveOneToQueue() throws Exception {
+ doTestSimpleSendOneReceiveOne(Queue.class);
+ }
+
+ @Test(timeout = 60000)
+ public void testSimpleSendOneReceiveOneToTopic() throws Exception {
+ doTestSimpleSendOneReceiveOne(Topic.class);
+ }
+
+ public void doTestSimpleSendOneReceiveOne(Class<?> destType) throws Exception {
+
+ final String address;
+ if (Queue.class.equals(destType)) {
+ address = "queue://" + getTestName();
+ } else {
+ address = "topic://" + getTestName();
+ }
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender("queue://" + getTestName());
+ AmqpSender sender = session.createSender(address);
+ AmqpReceiver receiver = session.createReceiver(address);
AmqpMessage message = new AmqpMessage();
@@ -78,7 +96,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
sender.close();
LOG.info("Attempting to read message with receiver");
- AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
receiver.flow(2);
AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull("Should have read message", received);
@@ -366,7 +383,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
private void doTestReceiveMessageAndRefillCreditBeforeAcceptOnTopicAsync(Class<?> destType) throws Exception {
final AmqpClient client = createAmqpClient();
- final LinkedList<Throwable> errors = new LinkedList<Throwable>();
+ final LinkedList<Throwable> errors = new LinkedList<>();
final CountDownLatch receiverReady = new CountDownLatch(1);
ExecutorService executorService = Executors.newCachedThreadPool();
@@ -609,7 +626,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
sender.send(message);
}
- List<AmqpMessage> pendingAcks = new ArrayList<AmqpMessage>();
+ List<AmqpMessage> pendingAcks = new ArrayList<>();
for (int i = 0; i < MSG_COUNT; i++) {
receiver.flow(1);
@@ -719,4 +736,84 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
connection.close();
}
+
+ @Test(timeout = 60000)
+ public void testSendReceiveLotsOfDurableMessagesOnQueue() throws Exception {
+ doTestSendReceiveLotsOfDurableMessages(Queue.class);
+ }
+
+ @Test(timeout = 60000)
+ public void testSendReceiveLotsOfDurableMessagesOnTopic() throws Exception {
+ doTestSendReceiveLotsOfDurableMessages(Topic.class);
+ }
+
+ private void doTestSendReceiveLotsOfDurableMessages(Class<?> destType) throws Exception {
+ final int MSG_COUNT = 1000;
+
+ AmqpClient client = createAmqpClient();
+
+ AmqpConnection connection = trackConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ final CountDownLatch done = new CountDownLatch(MSG_COUNT);
+ final AtomicBoolean error = new AtomicBoolean(false);
+ final ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ final String address;
+ if (Queue.class.equals(destType)) {
+ address = "queue://" + getTestName();
+ } else {
+ address = "topic://" + getTestName();
+ }
+
+ final AmqpReceiver receiver = session.createReceiver(address);
+ receiver.flow(MSG_COUNT);
+
+ AmqpSender sender = session.createSender(address);
+
+ final DestinationViewMBean destinationView;
+ if (Queue.class.equals(destType)) {
+ destinationView = getProxyToQueue(getTestName());
+ } else {
+ destinationView = getProxyToTopic(getTestName());
+ }
+
+ executor.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ for (int i = 0; i < MSG_COUNT; i++) {
+ try {
+ AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+ received.accept();
+ done.countDown();
+ } catch (Exception ex) {
+ LOG.info("Caught error: {}", ex.getClass().getSimpleName());
+ error.set(true);
+ }
+ }
+ }
+ });
+
+ for (int i = 0; i < MSG_COUNT; i++) {
+ AmqpMessage message = new AmqpMessage();
+ message.setMessageId("msg" + i);
+ sender.send(message);
+ }
+
+ assertTrue("did not read all messages, waiting on: " + done.getCount(), done.await(10, TimeUnit.SECONDS));
+ assertFalse("should not be any errors on receive", error.get());
+
+ assertTrue("Should be no inflight messages: " + destinationView.getInFlightCount(), Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return destinationView.getInFlightCount() == 0;
+ }
+ }));
+
+ sender.close();
+ receiver.close();
+ connection.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/e050519f/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index 6ab264d..d993333 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -20,7 +20,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSException;
@@ -276,37 +275,61 @@ public class TopicSubscription extends AbstractSubscription {
public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
super.acknowledge(context, ack);
- // Handle the standard acknowledgment case.
- if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
- if (context.isInTransaction()) {
- context.getTransaction().addSynchronization(new Synchronization() {
- @Override
- public void afterCommit() throws Exception {
- updateStatsOnAck(ack);
- dispatchMatched();
- }
- });
- } else {
- updateStatsOnAck(ack);
+ if (ack.isStandardAck()) {
+ updateStatsOnAck(context, ack);
+ } else if (ack.isPoisonAck()) {
+ if (ack.isInTransaction()) {
+ throw new JMSException("Poison ack cannot be transacted: " + ack);
+ }
+ updateStatsOnAck(context, ack);
+ if (getPrefetchSize() != 0) {
+ decrementPrefetchExtension(ack.getMessageCount());
+ }
+ } else if (ack.isIndividualAck()) {
+ updateStatsOnAck(context, ack);
+ if (getPrefetchSize() != 0 && ack.isInTransaction()) {
+ incrementPrefetchExtension(ack.getMessageCount());
}
- updatePrefetch(ack);
- dispatchMatched();
- return;
- } else if (ack.isDeliveredAck()) {
- // Message was delivered but not acknowledged: update pre-fetch counters.
- prefetchExtension.addAndGet(ack.getMessageCount());
- dispatchMatched();
- return;
} else if (ack.isExpiredAck()) {
updateStatsOnAck(ack);
- updatePrefetch(ack);
- dispatchMatched();
- return;
+ if (getPrefetchSize() != 0) {
+ incrementPrefetchExtension(ack.getMessageCount());
+ }
+ } else if (ack.isDeliveredAck()) {
+ // Message was delivered but not acknowledged: update pre-fetch counters.
+ if (getPrefetchSize() != 0) {
+ incrementPrefetchExtension(ack.getMessageCount());
+ }
} else if (ack.isRedeliveredAck()) {
- // nothing to do atm
+ // No processing for redelivered needed
return;
+ } else {
+ throw new JMSException("Invalid acknowledgment: " + ack);
+ }
+
+ dispatchMatched();
+ }
+
+ private void updateStatsOnAck(final ConnectionContext context, final MessageAck ack) {
+ if (context.isInTransaction()) {
+ context.getTransaction().addSynchronization(new Synchronization() {
+
+ @Override
+ public void beforeEnd() {
+ if (getPrefetchSize() != 0) {
+ decrementPrefetchExtension(ack.getMessageCount());
+ }
+ }
+
+ @Override
+ public void afterCommit() throws Exception {
+ updateStatsOnAck(ack);
+ dispatchMatched();
+ }
+ });
+ } else {
+ updateStatsOnAck(ack);
}
- throw new JMSException("Invalid acknowledgment: " + ack);
}
@Override
@@ -399,20 +422,20 @@ public class TopicSubscription extends AbstractSubscription {
}
}
- private void updatePrefetch(MessageAck ack) {
+ private void incrementPrefetchExtension(int amount) {
while (true) {
int currentExtension = prefetchExtension.get();
- int newExtension = Math.max(0, currentExtension - ack.getMessageCount());
+ int newExtension = Math.max(0, currentExtension + amount);
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
}
}
- private void decrementPrefetchExtension() {
+ private void decrementPrefetchExtension(int amount) {
while (true) {
int currentExtension = prefetchExtension.get();
- int newExtension = Math.max(0, currentExtension - 1);
+ int newExtension = Math.max(0, currentExtension - amount);
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
@@ -439,7 +462,7 @@ public class TopicSubscription extends AbstractSubscription {
@Override
public int getDispatchedQueueSize() {
return (int)(getSubscriptionStatistics().getDispatched().getCount() -
- prefetchExtension.get() - getSubscriptionStatistics().getDequeues().getCount());
+ getSubscriptionStatistics().getDequeues().getCount());
}
public int getMaximumPendingMessages() {
@@ -538,10 +561,7 @@ public class TopicSubscription extends AbstractSubscription {
// -------------------------------------------------------------------------
@Override
public boolean isFull() {
- if (info.getPrefetchSize() == 0) {
- return prefetchExtension.get() == 0;
- }
- return getDispatchedQueueSize() >= info.getPrefetchSize();
+ return getPrefetchSize() == 0 ? prefetchExtension.get() == 0 : getDispatchedQueueSize() - prefetchExtension.get() >= info.getPrefetchSize();
}
@Override
@@ -554,7 +574,7 @@ public class TopicSubscription extends AbstractSubscription {
*/
@Override
public boolean isLowWaterMark() {
- return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4);
+ return (getDispatchedQueueSize() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4);
}
/**
@@ -562,7 +582,7 @@ public class TopicSubscription extends AbstractSubscription {
*/
@Override
public boolean isHighWaterMark() {
- return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9);
+ return (getDispatchedQueueSize() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9);
}
/**
@@ -669,10 +689,10 @@ public class TopicSubscription extends AbstractSubscription {
}
if (getPrefetchSize() == 0) {
- decrementPrefetchExtension();
+ decrementPrefetchExtension(1);
}
-
}
+
if (info.isDispatchAsync()) {
if (node != null) {
md.setTransmitCallback(new TransmitCallback() {
http://git-wip-us.apache.org/repos/asf/activemq/blob/e050519f/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
index 38fa921..87be689 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -16,24 +16,42 @@
*/
package org.apache.activemq.usecases;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import javax.jms.Connection;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
-import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TopicSubscriptionZeroPrefetchTest {
- private static final String TOPIC_NAME = "slow.consumer";
+ private static final Logger LOG = LoggerFactory.getLogger(TopicSubscriptionZeroPrefetchTest.class);
+
+ @Rule
+ public TestName name = new TestName();
+
private Connection connection;
private Session session;
private ActiveMQTopic destination;
@@ -41,6 +59,10 @@ public class TopicSubscriptionZeroPrefetchTest {
private MessageConsumer consumer;
private BrokerService brokerService;
+ public String getTopicName() {
+ return name.getMethodName();
+ }
+
@Before
public void setUp() throws Exception {
@@ -52,7 +74,7 @@ public class TopicSubscriptionZeroPrefetchTest {
connection = activeMQConnectionFactory.createConnection();
connection.setClientID("ClientID-1");
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = new ActiveMQTopic(TOPIC_NAME);
+ destination = new ActiveMQTopic(getTopicName());
producer = session.createProducer(destination);
connection.start();
@@ -61,10 +83,10 @@ public class TopicSubscriptionZeroPrefetchTest {
/*
* test non durable topic subscription with prefetch set to zero
*/
- @Test(timeout=60000)
+ @Test(timeout = 60000)
public void testTopicConsumerPrefetchZero() throws Exception {
- ActiveMQTopic consumerDestination = new ActiveMQTopic(TOPIC_NAME + "?consumer.retroactive=true&consumer.prefetchSize=0");
+ ActiveMQTopic consumerDestination = new ActiveMQTopic(getTopicName() + "?consumer.retroactive=true&consumer.prefetchSize=0");
consumer = session.createConsumer(consumerDestination);
// publish messages
@@ -76,31 +98,153 @@ public class TopicSubscriptionZeroPrefetchTest {
Assert.assertNotNull("should have received a message the published message", consumedMessage);
}
- @Test(timeout=60000)
- public void testTopicConsumerPrefetchZeroClientAckLoop() throws Exception {
- ActiveMQTopic consumerDestination = new ActiveMQTopic(TOPIC_NAME + "?consumer.retroactive=true&consumer.prefetchSize=0");
+ @Test(timeout = 60000)
+ public void testTopicConsumerPrefetchZeroClientAckLoopReceive() throws Exception {
+ ActiveMQTopic consumerDestination = new ActiveMQTopic(getTopicName() + "?consumer.retroactive=true&consumer.prefetchSize=0");
+ Session consumerClientAckSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ consumer = consumerClientAckSession.createConsumer(consumerDestination);
+
+ final int count = 10;
+ for (int i = 0; i < count; i++) {
+ Message txtMessage = session.createTextMessage("M:" + i);
+ producer.send(txtMessage);
+ }
+
+ for (int i = 0; i < count; i++) {
+ Message consumedMessage = consumer.receive();
+ Assert.assertNotNull("should have received message[" + i + "]", consumedMessage);
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testTopicConsumerPrefetchZeroClientAckLoopTimedReceive() throws Exception {
+ ActiveMQTopic consumerDestination = new ActiveMQTopic(getTopicName() + "?consumer.retroactive=true&consumer.prefetchSize=0");
Session consumerClientAckSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
consumer = consumerClientAckSession.createConsumer(consumerDestination);
final int count = 10;
- for (int i=0;i<count;i++) {
- Message txtMessage = session.createTextMessage("M:"+ i);
+ for (int i = 0; i < count; i++) {
+ Message txtMessage = session.createTextMessage("M:" + i);
producer.send(txtMessage);
}
- for (int i=0;i<count;i++) {
+ for (int i = 0; i < count; i++) {
Message consumedMessage = consumer.receive(2000);
- Assert.assertNotNull("should have received message[" + i +"]", consumedMessage);
+ Assert.assertNotNull("should have received message[" + i + "]", consumedMessage);
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testTopicConsumerPrefetchZeroClientAckLoopReceiveNoWait() throws Exception {
+ ActiveMQTopic consumerDestination = new ActiveMQTopic(getTopicName() + "?consumer.retroactive=true&consumer.prefetchSize=0");
+ Session consumerClientAckSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ consumer = consumerClientAckSession.createConsumer(consumerDestination);
+
+ final int count = 10;
+ for (int i = 0; i < count; i++) {
+ Message txtMessage = session.createTextMessage("M:" + i);
+ producer.send(txtMessage);
+ }
+
+ for (int i = 0; i < count; i++) {
+ Message consumedMessage = consumer.receiveNoWait();
+ Assert.assertNotNull("should have received message[" + i + "]", consumedMessage);
}
}
+ @Test(timeout = 60000)
+ public void testTopicConsumerPrefetchZeroConcurrentProduceConsumeAutoAck() throws Exception {
+ doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ @Test(timeout = 60000)
+ public void testTopicConsumerPrefetchZeroConcurrentProduceConsumeClientAck() throws Exception {
+ doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ @Test(timeout = 60000)
+ public void testTopicConsumerPrefetchZeroConcurrentProduceConsumeDupsOk() throws Exception {
+ doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(Session.DUPS_OK_ACKNOWLEDGE);
+ }
+
+ @Test(timeout = 60000)
+ public void testTopicConsumerPrefetchZeroConcurrentProduceConsumeTransacted() throws Exception {
+ doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(Session.SESSION_TRANSACTED);
+ }
+
+ @Test(timeout = 60000)
+ public void testTopicConsumerPrefetchZeroConcurrentProduceConsumeTransactedComitInBatches() throws Exception {
+ doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(Session.SESSION_TRANSACTED);
+ }
+
+ @Test(timeout = 60000)
+ public void testTopicConsumerPrefetchZeroConcurrentProduceConsumeIndividual() throws Exception {
+ doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+ }
+
+ private void doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(int ackMode) throws Exception {
+ doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(ackMode, false);
+ }
+
+ private void doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(int ackMode, boolean commitBatch) throws Exception {
+
+ ActiveMQTopic consumerDestination = new ActiveMQTopic(getTopicName() + "?consumer.retroactive=true&consumer.prefetchSize=0");
+ Session consumerSession = connection.createSession(ackMode == Session.SESSION_TRANSACTED, ackMode);
+ consumer = consumerSession.createConsumer(consumerDestination);
+
+ final int MSG_COUNT = 2000;
+
+ final AtomicBoolean error = new AtomicBoolean();
+ final CountDownLatch done = new CountDownLatch(MSG_COUNT);
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ executor.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < MSG_COUNT; i++) {
+ Message consumedMessage = consumer.receive();
+ if (consumedMessage != null) {
+ done.countDown();
+ consumedMessage.acknowledge();
+ if (ackMode == Session.SESSION_TRANSACTED && commitBatch && ((i + 1) % 50) == 0) {
+ consumerSession.commit();
+ }
+ }
+ }
+ } catch (Exception ex) {
+ LOG.error("Caught exception during receive: {}", ex);
+ error.set(true);
+ } finally {
+ if (ackMode == Session.SESSION_TRANSACTED) {
+ try {
+ consumerSession.commit();
+ } catch (JMSException e) {
+ LOG.error("Caught exception on commit: {}", e);
+ error.set(true);
+ }
+ }
+ }
+ }
+ });
+
+ for (int i = 0; i < MSG_COUNT; i++) {
+ Message txtMessage = session.createTextMessage("M:" + i);
+ producer.send(txtMessage);
+ }
+
+ assertFalse("Should not have gotten any errors", error.get());
+ assertTrue("Should have read all messages", done.await(10, TimeUnit.SECONDS));
+ }
+
/*
* test durable topic subscription with prefetch zero
*/
- @Test(timeout=60000)
+ @Test(timeout = 60000)
public void testDurableTopicConsumerPrefetchZero() throws Exception {
- ActiveMQTopic consumerDestination = new ActiveMQTopic(TOPIC_NAME + "?consumer.prefetchSize=0");
+ ActiveMQTopic consumerDestination = new ActiveMQTopic(getTopicName() + "?consumer.prefetchSize=0");
consumer = session.createDurableSubscriber(consumerDestination, "mysub1");
// publish messages