You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/10/07 22:40:56 UTC
[1/2] activemq-artemis git commit: This closes #824
Repository: activemq-artemis
Updated Branches:
refs/heads/master 07ab1be33 -> ca2bf0ed8
This closes #824
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ca2bf0ed
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ca2bf0ed
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ca2bf0ed
Branch: refs/heads/master
Commit: ca2bf0ed89e98b1144996c191f8a7d45884bd7d5
Parents: 07ab1be bc6ad1d
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Oct 7 18:40:49 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 7 18:40:49 2016 -0400
----------------------------------------------------------------------
.../transport/amqp/client/AmqpMessage.java | 6 +-
.../integration/amqp/AmqpSendReceiveTest.java | 557 +++++++++++++++++++
2 files changed, 560 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
[2/2] activemq-artemis git commit: ARTEMIS-775 Tests for AMQP credit
handling
Posted by cl...@apache.org.
ARTEMIS-775 Tests for AMQP credit handling
Some new tests that cover some AMQP credit handling scenarios. Test
case 'testCloseBusyReceiver' currently fails due to being dispatched a
duplicate message.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/bc6ad1df
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/bc6ad1df
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/bc6ad1df
Branch: refs/heads/master
Commit: bc6ad1dfdbada759e06c50107bbc7bbee2c09a15
Parents: 07ab1be
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Oct 7 12:09:28 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 7 18:40:49 2016 -0400
----------------------------------------------------------------------
.../transport/amqp/client/AmqpMessage.java | 6 +-
.../integration/amqp/AmqpSendReceiveTest.java | 557 +++++++++++++++++++
2 files changed, 560 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bc6ad1df/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 2d95d29..3ca17a6 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -269,7 +269,7 @@ public class AmqpMessage {
* @return the set message ID in String form or null if not set.
*/
public String getMessageId() {
- if (message.getProperties() == null) {
+ if (message.getProperties() == null || message.getProperties().getMessageId() == null) {
return null;
}
@@ -319,7 +319,7 @@ public class AmqpMessage {
* @return the set correlation ID in String form or null if not set.
*/
public String getCorrelationId() {
- if (message.getProperties() == null) {
+ if (message.getProperties() == null || message.getProperties().getCorrelationId() == null) {
return null;
}
@@ -394,7 +394,7 @@ public class AmqpMessage {
* @return true if the message is marked as being durable.
*/
public boolean isDurable() {
- if (message.getHeader() == null) {
+ if (message.getHeader() == null || message.getHeader().getDurable() == null) {
return false;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bc6ad1df/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
new file mode 100644
index 0000000..6597a62
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test basic send and receive scenarios using only AMQP sender and receiver links.
+ */
+public class AmqpSendReceiveTest extends AmqpClientTestSupport {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(AmqpSendReceiveTest.class);
+
+ @Test(timeout = 60000)
+ public void testSimpleSendOneReceiveOne() throws Exception {
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(getTestName());
+
+ AmqpMessage message = new AmqpMessage();
+
+ message.setMessageId("msg" + 1);
+ message.setMessageAnnotation("serialNo", 1);
+ message.setText("Test-Message");
+
+ sender.send(message);
+ sender.close();
+
+ LOG.info("Attempting to read message with receiver");
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+ receiver.flow(2);
+ AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Should have read message", received);
+ assertEquals("msg1", received.getMessageId());
+ received.accept();
+
+ receiver.close();
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testCloseBusyReceiver() throws Exception {
+ final int MSG_COUNT = 20;
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(getTestName());
+
+ for (int i = 0; i < MSG_COUNT; i++) {
+ AmqpMessage message = new AmqpMessage();
+
+ message.setMessageId("msg" + i);
+ message.setMessageAnnotation("serialNo", i);
+ message.setText("Test-Message");
+
+ System.out.println("Sending message: " + message.getMessageId());
+
+ sender.send(message);
+ }
+
+ sender.close();
+
+ Queue queue = getProxyToQueue(getTestName());
+ assertEquals(MSG_COUNT, queue.getMessageCount());
+
+ AmqpReceiver receiver1 = session.createReceiver(getTestName());
+ receiver1.flow(MSG_COUNT);
+ AmqpMessage received = receiver1.receive(5, TimeUnit.SECONDS);
+ assertNotNull("Should have got a message", received);
+ assertEquals("msg0", received.getMessageId());
+ receiver1.close();
+
+ AmqpReceiver receiver2 = session.createReceiver(getTestName());
+ receiver2.flow(200);
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ received = receiver2.receive(5, TimeUnit.SECONDS);
+ assertNotNull("Should have got a message", received);
+ System.out.println("Read message: " + received.getMessageId());
+ assertEquals("msg" + i, received.getMessageId());
+ }
+
+ receiver2.close();
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testReceiveWithJMSSelectorFilter() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpMessage message1 = new AmqpMessage();
+ message1.setGroupId("abcdefg");
+ message1.setApplicationProperty("sn", 100);
+
+ AmqpMessage message2 = new AmqpMessage();
+ message2.setGroupId("hijklm");
+ message2.setApplicationProperty("sn", 200);
+
+ AmqpSender sender = session.createSender(getTestName());
+ sender.send(message1);
+ sender.send(message2);
+ sender.close();
+
+ AmqpReceiver receiver = session.createReceiver(getTestName(), "sn = 100");
+ receiver.flow(2);
+ AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull("Should have read a message", received);
+ assertEquals(100, received.getApplicationProperty("sn"));
+ assertEquals("abcdefg", received.getGroupId());
+ received.accept();
+
+ assertNull(receiver.receive(1, TimeUnit.SECONDS));
+
+ receiver.close();
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testAdvancedLinkFlowControl() throws Exception {
+ final int MSG_COUNT = 20;
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(getTestName());
+
+ for (int i = 0; i < MSG_COUNT; i++) {
+ AmqpMessage message = new AmqpMessage();
+
+ message.setMessageId("msg" + i);
+ message.setMessageAnnotation("serialNo", i);
+ message.setText("Test-Message");
+
+ sender.send(message);
+ }
+
+ sender.close();
+
+ LOG.info("Attempting to read first two messages with receiver #1");
+ AmqpReceiver receiver1 = session.createReceiver(getTestName());
+ receiver1.flow(2);
+ AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS);
+ AmqpMessage message2 = receiver1.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Should have read message 1", message1);
+ assertNotNull("Should have read message 2", message2);
+ assertEquals("msg0", message1.getMessageId());
+ assertEquals("msg1", message2.getMessageId());
+ message1.accept();
+ message2.accept();
+
+ LOG.info("Attempting to read next two messages with receiver #2");
+ AmqpReceiver receiver2 = session.createReceiver(getTestName());
+ receiver2.flow(2);
+ AmqpMessage message3 = receiver2.receive(10, TimeUnit.SECONDS);
+ AmqpMessage message4 = receiver2.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Should have read message 3", message3);
+ assertNotNull("Should have read message 4", message4);
+ assertEquals("msg2", message3.getMessageId());
+ assertEquals("msg3", message4.getMessageId());
+ message3.accept();
+ message4.accept();
+
+ LOG.info("Attempting to read remaining messages with receiver #1");
+ receiver1.flow(MSG_COUNT - 4);
+ for (int i = 4; i < MSG_COUNT; i++) {
+ AmqpMessage message = receiver1.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Should have read a message", message);
+ assertEquals("msg" + i, message.getMessageId());
+ message.accept();
+ }
+
+ receiver1.close();
+ receiver2.close();
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testDispatchOrderWithPrefetchOfOne() throws Exception {
+ final int MSG_COUNT = 20;
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(getTestName());
+
+ for (int i = 0; i < MSG_COUNT; i++) {
+ AmqpMessage message = new AmqpMessage();
+
+ message.setMessageId("msg" + i);
+ message.setMessageAnnotation("serialNo", i);
+ message.setText("Test-Message");
+
+ sender.send(message);
+ }
+
+ sender.close();
+
+ AmqpReceiver receiver1 = session.createReceiver(getTestName());
+ receiver1.flow(1);
+
+ AmqpReceiver receiver2 = session.createReceiver(getTestName());
+ receiver2.flow(1);
+
+ AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS);
+ AmqpMessage message2 = receiver2.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Should have read message 1", message1);
+ assertNotNull("Should have read message 2", message2);
+ assertEquals("msg0", message1.getMessageId());
+ assertEquals("msg1", message2.getMessageId());
+ message1.accept();
+ message2.accept();
+
+ receiver1.flow(1);
+ AmqpMessage message3 = receiver1.receive(10, TimeUnit.SECONDS);
+ receiver2.flow(1);
+ AmqpMessage message4 = receiver2.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Should have read message 3", message3);
+ assertNotNull("Should have read message 4", message4);
+ assertEquals("msg2", message3.getMessageId());
+ assertEquals("msg3", message4.getMessageId());
+ message3.accept();
+ message4.accept();
+
+ LOG.info("*** Attempting to read remaining messages with both receivers");
+ int splitCredit = (MSG_COUNT - 4) / 2;
+
+ LOG.info("**** Receiver #1 granting creadit[{}] for its block of messages", splitCredit);
+ receiver1.flow(splitCredit);
+ for (int i = 0; i < splitCredit; i++) {
+ AmqpMessage message = receiver1.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Receiver #1 should have read a message", message);
+ LOG.info("Receiver #1 read message: {}", message.getMessageId());
+ message.accept();
+ }
+
+ LOG.info("**** Receiver #2 granting creadit[{}] for its block of messages", splitCredit);
+ receiver2.flow(splitCredit);
+ for (int i = 0; i < splitCredit; i++) {
+ AmqpMessage message = receiver2.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Receiver #2 should have read message[" + i + "]", message);
+ LOG.info("Receiver #2 read message: {}", message.getMessageId());
+ message.accept();
+ }
+
+ receiver1.close();
+ receiver2.close();
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testReceiveMessageAndRefillCreditBeforeAccept() throws Exception {
+ AmqpClient client = createAmqpClient();
+
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ final String address = getTestName();
+
+ AmqpReceiver receiver = session.createReceiver(address);
+ AmqpSender sender = session.createSender(address);
+
+ for (int i = 0; i < 2; i++) {
+ AmqpMessage message = new AmqpMessage();
+ message.setMessageId("msg" + i);
+ sender.send(message);
+ }
+ sender.close();
+
+ receiver.flow(1);
+ AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(received);
+
+ receiver.flow(1);
+ received.accept();
+
+ received = receiver.receive(10, TimeUnit.SECONDS);
+ assertNotNull(received);
+ received.accept();
+
+ receiver.close();
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testReceiveMessageAndRefillCreditBeforeAcceptOnQueueAsync() throws Exception {
+ final AmqpClient client = createAmqpClient();
+ final LinkedList<Throwable> errors = new LinkedList<>();
+ final CountDownLatch receiverReady = new CountDownLatch(1);
+ ExecutorService executorService = Executors.newCachedThreadPool();
+
+ final String address = getTestName();
+
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ LOG.info("Starting consumer connection");
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+ AmqpReceiver receiver = session.createReceiver(address);
+ receiver.flow(1);
+ receiverReady.countDown();
+ AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(received);
+
+ receiver.flow(1);
+ received.accept();
+
+ received = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(received);
+ received.accept();
+
+ receiver.close();
+ connection.close();
+
+ } catch (Exception error) {
+ errors.add(error);
+ }
+ }
+ });
+
+ // producer
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ receiverReady.await(20, TimeUnit.SECONDS);
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(address);
+ for (int i = 0; i < 2; i++) {
+ AmqpMessage message = new AmqpMessage();
+ message.setMessageId("msg" + i);
+ sender.send(message);
+ }
+ sender.close();
+ connection.close();
+ } catch (Exception ignored) {
+ ignored.printStackTrace();
+ }
+ }
+ });
+
+ executorService.shutdown();
+ executorService.awaitTermination(20, TimeUnit.SECONDS);
+ assertTrue("no errors: " + errors, errors.isEmpty());
+ }
+
+ @Test(timeout = 60000)
+ public void testMessageDurabliltyFollowsSpec() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(getTestName());
+ AmqpReceiver receiver1 = session.createReceiver(getTestName());
+
+ Queue queue = getProxyToQueue(getTestName());
+
+ // Create default message that should be sent as non-durable
+ AmqpMessage message1 = new AmqpMessage();
+ message1.setText("Test-Message -> non-durable");
+ message1.setDurable(false);
+ message1.setMessageId("ID:Message:1");
+ sender.send(message1);
+
+ assertEquals(1, queue.getMessageCount());
+ receiver1.flow(1);
+ message1 = receiver1.receive(50, TimeUnit.SECONDS);
+ assertNotNull("Should have read a message", message1);
+ assertFalse("First message sent should not be durable", message1.isDurable());
+ message1.accept();
+
+ // Create default message that should be sent as non-durable
+ AmqpMessage message2 = new AmqpMessage();
+ message2.setText("Test-Message -> durable");
+ message2.setDurable(true);
+ message2.setMessageId("ID:Message:2");
+ sender.send(message2);
+
+ assertEquals(1, queue.getMessageCount());
+ receiver1.flow(1);
+ message2 = receiver1.receive(50, TimeUnit.SECONDS);
+ assertNotNull("Should have read a message", message2);
+ assertTrue("Second message sent should be durable", message2.isDurable());
+ message2.accept();
+
+ sender.close();
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testReceiveMessageBeyondAckedAmountQueue() throws Exception {
+ final int MSG_COUNT = 50;
+
+ AmqpClient client = createAmqpClient();
+
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ final String address = getTestName();
+
+ AmqpReceiver receiver = session.createReceiver(address);
+ AmqpSender sender = session.createSender(address);
+
+ final Queue destinationView = getProxyToQueue(address);
+
+ for (int i = 0; i < MSG_COUNT; i++) {
+ AmqpMessage message = new AmqpMessage();
+ message.setMessageId("msg" + i);
+ sender.send(message);
+ }
+
+ List<AmqpMessage> pendingAcks = new ArrayList<>();
+
+ for (int i = 0; i < MSG_COUNT; i++) {
+ receiver.flow(1);
+ AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(received);
+ pendingAcks.add(received);
+ }
+
+ // Send one more to check in-flight stays at zero with no credit and all
+ // pending messages settled.
+ AmqpMessage message = new AmqpMessage();
+ message.setMessageId("msg-final");
+ sender.send(message);
+
+ for (AmqpMessage pendingAck : pendingAcks) {
+ pendingAck.accept();
+ }
+
+ assertTrue("Should be no inflight messages: " + destinationView.getDeliveringCount(), Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return destinationView.getDeliveringCount() == 0;
+ }
+ }));
+
+ sender.close();
+ receiver.close();
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testTwoPresettledReceiversReceiveAllMessages() throws Exception {
+ final int MSG_COUNT = 100;
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ final String address = getTestName();
+
+ AmqpSender sender = session.createSender(address);
+ AmqpReceiver receiver1 = session.createReceiver(address, null, false, true);
+ AmqpReceiver receiver2 = session.createReceiver(address, null, false, true);
+
+ for (int i = 0; i < MSG_COUNT; i++) {
+ AmqpMessage message = new AmqpMessage();
+ message.setMessageId("msg" + i);
+ sender.send(message);
+ }
+
+ LOG.info("Attempting to read first two messages with receiver #1");
+ receiver1.flow(2);
+ AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS);
+ AmqpMessage message2 = receiver1.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Should have read message 1", message1);
+ assertNotNull("Should have read message 2", message2);
+ assertEquals("msg0", message1.getMessageId());
+ assertEquals("msg1", message2.getMessageId());
+ message1.accept();
+ message2.accept();
+
+ LOG.info("Attempting to read next two messages with receiver #2");
+ receiver2.flow(2);
+ AmqpMessage message3 = receiver2.receive(10, TimeUnit.SECONDS);
+ AmqpMessage message4 = receiver2.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Should have read message 3", message3);
+ assertNotNull("Should have read message 4", message4);
+ assertEquals("msg2", message3.getMessageId());
+ assertEquals("msg3", message4.getMessageId());
+ message3.accept();
+ message4.accept();
+
+ LOG.info("*** Attempting to read remaining messages with both receivers");
+ int splitCredit = (MSG_COUNT - 4) / 2;
+
+ LOG.info("**** Receiver #1 granting credit[{}] for its block of messages", splitCredit);
+ receiver1.flow(splitCredit);
+ for (int i = 0; i < splitCredit; i++) {
+ AmqpMessage message = receiver1.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Receiver #1 should have read a message", message);
+ LOG.info("Receiver #1 read message: {}", message.getMessageId());
+ message.accept();
+ }
+
+ LOG.info("**** Receiver #2 granting credit[{}] for its block of messages", splitCredit);
+ receiver2.flow(splitCredit);
+ for (int i = 0; i < splitCredit; i++) {
+ AmqpMessage message = receiver2.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Receiver #2 should have read a message[" + i + "]", message);
+ LOG.info("Receiver #2 read message: {}", message.getMessageId());
+ message.accept();
+ }
+
+ receiver1.close();
+ receiver2.close();
+
+ connection.close();
+ }
+}