You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/09/27 16:46:19 UTC
[07/14] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-6422
https://issues.apache.org/jira/browse/AMQ-6422
Adds a split consumer test that uses presettled receivers.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/14c5c527
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/14c5c527
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/14c5c527
Branch: refs/heads/activemq-5.14.x
Commit: 14c5c5276c9f6bfa360c24d4e2b875483dc43888
Parents: 566e826
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Sep 9 18:34:03 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Sep 27 12:14:55 2016 -0400
----------------------------------------------------------------------
.../amqp/interop/AmqpSendReceiveTest.java | 79 ++++++++++++++++++++
1 file changed, 79 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/14c5c527/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
index f39fc3e..3132e6e 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
@@ -615,4 +615,83 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
receiver.close();
connection.close();
}
+
+ @Test(timeout = 60000)
+ public void testTwoPresettledReceiversReceiveAllMessages() throws Exception {
+ final int MSG_COUNT = 100;
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = client.connect();
+ AmqpSession session = connection.createSession();
+
+ final String address = "queue://" + getTestName();
+
+ AmqpSender sender = session.createSender(address);
+ AmqpReceiver receiver1 = session.createReceiver(address, null, false, true);
+ AmqpReceiver receiver2 = session.createReceiver(address, null, false, true);
+
+ for (int i = 0; i < MSG_COUNT; i++) {
+ AmqpMessage message = new AmqpMessage();
+ message.setMessageId("msg" + i);
+ sender.send(message);
+ }
+
+ final DestinationViewMBean destinationView = getProxyToQueue(getTestName());
+
+ LOG.info("Attempting to read first two messages with receiver #1");
+ receiver1.flow(2);
+ AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS);
+ AmqpMessage message2 = receiver1.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Should have read message 1", message1);
+ assertNotNull("Should have read message 2", message2);
+ assertEquals("msg0", message1.getMessageId());
+ assertEquals("msg1", message2.getMessageId());
+ message1.accept();
+ message2.accept();
+
+ LOG.info("Attempting to read next two messages with receiver #2");
+ receiver2.flow(2);
+ AmqpMessage message3 = receiver2.receive(10, TimeUnit.SECONDS);
+ AmqpMessage message4 = receiver2.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Should have read message 3", message3);
+ assertNotNull("Should have read message 4", message4);
+ assertEquals("msg2", message3.getMessageId());
+ assertEquals("msg3", message4.getMessageId());
+ message3.accept();
+ message4.accept();
+
+ assertTrue("Should be no inflight messages: " + destinationView.getInFlightCount(), Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return destinationView.getInFlightCount() == 0;
+ }
+ }));
+
+ LOG.info("*** Attempting to read remaining messages with both receivers");
+ int splitCredit = (MSG_COUNT - 4) / 2;
+
+ LOG.info("**** Receiver #1 granting creadit[{}] for its block of messages", splitCredit);
+ receiver1.flow(splitCredit);
+ for (int i = 0; i < splitCredit; i++) {
+ AmqpMessage message = receiver1.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Receiver #1 should have read a message", message);
+ LOG.info("Receiver #1 read message: {}", message.getMessageId());
+ message.accept();
+ }
+
+ LOG.info("**** Receiver #2 granting creadit[{}] for its block of messages", splitCredit);
+ receiver2.flow(splitCredit);
+ for (int i = 0; i < splitCredit; i++) {
+ AmqpMessage message = receiver2.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Receiver #2 should have read a message", message);
+ LOG.info("Receiver #2 read message: {}", message.getMessageId());
+ message.accept();
+ }
+
+ receiver1.close();
+ receiver2.close();
+
+ connection.close();
+ }
}