You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/09/27 16:46:14 UTC
[02/14] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-6422
https://issues.apache.org/jira/browse/AMQ-6422
I've made a few minor test changes and added a couple more cases. Under
heavy CPU load I'm able to get test,
testReceiveMessageAndRefillCreditBeforeAcceptOnQueue to fail on the
second receive call where it should get the second message since it
granted credit.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/cc0e7879
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/cc0e7879
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/cc0e7879
Branch: refs/heads/activemq-5.14.x
Commit: cc0e78790fe1c1d68a727d58fdf1b7cdad22b598
Parents: 94ffb1b
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Sep 7 14:05:21 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Sep 27 12:12:43 2016 -0400
----------------------------------------------------------------------
.../amqp/interop/AmqpSendReceiveTest.java | 284 ++++++++++++-------
1 file changed, 175 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/cc0e7879/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
index c27c0f9..8a4958f 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
@@ -28,6 +28,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import javax.jms.Queue;
+import javax.jms.Topic;
+
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.junit.ActiveMQTestRunner;
import org.apache.activemq.junit.Repeat;
@@ -44,7 +47,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Test basic send and receive scenarios using only AMQP sender and receiver links.
+ * Test basic send and receive scenarios using only AMQP sender and receiver
+ * links.
*/
@RunWith(ActiveMQTestRunner.class)
public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@@ -52,6 +56,37 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(AmqpSendReceiveTest.class);
@Test(timeout = 60000)
+ public void testSimpleSendOneReceiveOne() throws Exception {
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = client.connect();
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender("queue://" + getTestName());
+
+ AmqpMessage message = new AmqpMessage();
+
+ message.setMessageId("msg" + 1);
+ message.setMessageAnnotation("serialNo", 1);
+ message.setText("Test-Message");
+
+ sender.send(message);
+ sender.close();
+
+ LOG.info("Attempting to read message with receiver");
+ AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+ receiver.flow(2);
+ AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Should have read message", received);
+ assertEquals("msg1", received.getMessageId());
+ received.accept();
+
+ receiver.close();
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
public void testCloseBusyReceiver() throws Exception {
final int MSG_COUNT = 20;
@@ -96,108 +131,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
}
@Test(timeout = 60000)
- public void testReceiveFlowDispositionSingleCredit() throws Exception {
- AmqpClient client = createAmqpClient();
- AmqpConnection connection = client.connect();
- AmqpSession session = connection.createSession();
-
- AmqpSender sender = session.createSender("queue://" + getTestName());
- for (int i=0;i<2; i++) {
- AmqpMessage message = new AmqpMessage();
- message.setMessageId("msg" + i);
- sender.send(message);
- }
- sender.close();
- connection.close();
-
- LOG.info("Starting consumer connection");
- connection = client.connect();
- session = connection.createSession();
- AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
- receiver.flow(1);
- AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
- assertNotNull(received);
-
- receiver.flow(1);
- received.accept();
-
- received = receiver.receive(5, TimeUnit.SECONDS);
- assertNotNull(received);
- received.accept();
-
- receiver.close();
- connection.close();
- }
-
- @Test(timeout = 60000)
- public void testReceiveFlowDispositionSingleCreditTopic() throws Exception {
- final AmqpClient client = createAmqpClient();
- final LinkedList<Throwable> errors = new LinkedList<Throwable>();
- final CountDownLatch receiverReady = new CountDownLatch(1);
- ExecutorService executorService = Executors.newCachedThreadPool();
-
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- try {
- LOG.info("Starting consumer connection");
- AmqpConnection connection = client.connect();
- AmqpSession session = connection.createSession();
- AmqpReceiver receiver = session.createReceiver("topic://" + getTestName());
- receiver.flow(1);
- receiverReady.countDown();
- AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
- assertNotNull(received);
-
- receiver.flow(1);
- received.accept();
-
- received = receiver.receive(5, TimeUnit.SECONDS);
- assertNotNull(received);
- received.accept();
-
- receiver.close();
- connection.close();
-
- } catch (Exception error) {
- errors.add(error);
- }
-
- }
- });
-
- // producer
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- try {
-
- receiverReady.await(20, TimeUnit.SECONDS);
- AmqpConnection connection = client.connect();
- AmqpSession session = connection.createSession();
-
- AmqpSender sender = session.createSender("topic://" + getTestName());
- for (int i = 0; i < 2; i++) {
- AmqpMessage message = new AmqpMessage();
- message.setMessageId("msg" + i);
- sender.send(message);
- }
- sender.close();
- connection.close();
- } catch (Exception ignored) {
- ignored.printStackTrace();
- }
-
- }
- });
-
- executorService.shutdown();
- executorService.awaitTermination(20, TimeUnit.SECONDS);
- assertTrue("no errors: " + errors, errors.isEmpty());
- }
-
-
- @Test(timeout = 60000)
public void testReceiveWithJMSSelectorFilter() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
@@ -279,7 +212,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
LOG.info("Attempting to read remaining messages with receiver #1");
receiver1.flow(MSG_COUNT - 4);
- for (int i = 4; i < MSG_COUNT - 4; i++) {
+ for (int i = 4; i < MSG_COUNT; i++) {
AmqpMessage message = receiver1.receive(10, TimeUnit.SECONDS);
assertNotNull("Should have read a message", message);
assertEquals("msg" + i, message.getMessageId());
@@ -341,20 +274,24 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
message3.accept();
message4.accept();
- LOG.info("Attempting to read remaining messages with both receivers");
+ LOG.info("*** Attempting to read remaining messages with both receivers");
int splitCredit = (MSG_COUNT - 4) / 2;
+ LOG.info("**** Receiver #1 granting creadit[{}] for its block of messages", splitCredit);
receiver1.flow(splitCredit);
- for (int i = 4; i < splitCredit; i++) {
+ for (int i = 0; i < splitCredit; i++) {
AmqpMessage message = receiver1.receive(10, TimeUnit.SECONDS);
- assertNotNull("Should have read a message", message);
+ assertNotNull("Receiver #1 should have read a message", message);
+ LOG.info("Receiver #1 read message: {}", message.getMessageId());
message.accept();
}
+ LOG.info("**** Receiver #2 granting creadit[{}] for its block of messages", splitCredit);
receiver2.flow(splitCredit);
- for (int i = 4; i < splitCredit; i++) {
+ for (int i = 0; i < splitCredit; i++) {
AmqpMessage message = receiver2.receive(10, TimeUnit.SECONDS);
- assertNotNull("Should have read a message", message);
+ assertNotNull("Receiver #2 should have read a message", message);
+ LOG.info("Receiver #2 read message: {}", message.getMessageId());
message.accept();
}
@@ -365,6 +302,135 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
}
@Test(timeout = 60000)
+ public void testReceiveMessageAndRefillCreditBeforeAcceptOnQueue() throws Exception {
+ doTestReceiveMessageAndRefillCreditBeforeAccept(Queue.class);
+ }
+
+ @Test(timeout = 60000)
+ public void testReceiveMessageAndRefillCreditBeforeAcceptOnTopic() throws Exception {
+ doTestReceiveMessageAndRefillCreditBeforeAccept(Topic.class);
+ }
+
+ private void doTestReceiveMessageAndRefillCreditBeforeAccept(Class<?> destType) throws Exception {
+
+ AmqpClient client = createAmqpClient();
+
+ AmqpConnection connection = client.connect();
+ AmqpSession session = connection.createSession();
+
+ final String address;
+ if (Queue.class.equals(destType)) {
+ address = "queue://" + getTestName();
+ } else {
+ address = "topic://" + getTestName();
+ }
+
+ AmqpReceiver receiver = session.createReceiver(address);
+ AmqpSender sender = session.createSender(address);
+
+ for (int i = 0; i < 2; i++) {
+ AmqpMessage message = new AmqpMessage();
+ message.setMessageId("msg" + i);
+ sender.send(message);
+ }
+ sender.close();
+
+ receiver.flow(1);
+ AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(received);
+
+ receiver.flow(1);
+ received.accept();
+
+ received = receiver.receive(10, TimeUnit.SECONDS);
+ assertNotNull(received);
+ received.accept();
+
+ receiver.close();
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testReceiveMessageAndRefillCreditBeforeAcceptOnQueueAsync() throws Exception {
+ doTestReceiveMessageAndRefillCreditBeforeAcceptOnTopicAsync(Queue.class);
+ }
+
+ @Test(timeout = 60000)
+ public void testReceiveMessageAndRefillCreditBeforeAcceptOnTopicAsync() throws Exception {
+ doTestReceiveMessageAndRefillCreditBeforeAcceptOnTopicAsync(Topic.class);
+ }
+
+ private void doTestReceiveMessageAndRefillCreditBeforeAcceptOnTopicAsync(Class<?> destType) throws Exception {
+ final AmqpClient client = createAmqpClient();
+ final LinkedList<Throwable> errors = new LinkedList<Throwable>();
+ final CountDownLatch receiverReady = new CountDownLatch(1);
+ ExecutorService executorService = Executors.newCachedThreadPool();
+
+ final String address;
+ if (Queue.class.equals(destType)) {
+ address = "queue://" + getTestName();
+ } else {
+ address = "topic://" + getTestName();
+ }
+
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ LOG.info("Starting consumer connection");
+ AmqpConnection connection = client.connect();
+ AmqpSession session = connection.createSession();
+ AmqpReceiver receiver = session.createReceiver(address);
+ receiver.flow(1);
+ receiverReady.countDown();
+ AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(received);
+
+ receiver.flow(1);
+ received.accept();
+
+ received = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(received);
+ received.accept();
+
+ receiver.close();
+ connection.close();
+
+ } catch (Exception error) {
+ errors.add(error);
+ }
+ }
+ });
+
+ // producer
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ receiverReady.await(20, TimeUnit.SECONDS);
+ AmqpConnection connection = client.connect();
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(address);
+ for (int i = 0; i < 2; i++) {
+ AmqpMessage message = new AmqpMessage();
+ message.setMessageId("msg" + i);
+ sender.send(message);
+ }
+ sender.close();
+ connection.close();
+ } catch (Exception ignored) {
+ ignored.printStackTrace();
+ }
+ }
+ });
+
+ executorService.shutdown();
+ executorService.awaitTermination(20, TimeUnit.SECONDS);
+ assertTrue("no errors: " + errors, errors.isEmpty());
+ }
+
+ @Test(timeout = 60000)
public void testMessageDurabliltyFollowsSpec() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();