You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/09/27 16:46:14 UTC

[02/14] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6422

https://issues.apache.org/jira/browse/AMQ-6422

I've made a few minor test changes and added a couple more cases.  Under
heavy CPU load I'm able to get test,
testReceiveMessageAndRefillCreditBeforeAcceptOnQueue to fail on the
second receive call where it should get the second message since it
granted credit.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/cc0e7879
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/cc0e7879
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/cc0e7879

Branch: refs/heads/activemq-5.14.x
Commit: cc0e78790fe1c1d68a727d58fdf1b7cdad22b598
Parents: 94ffb1b
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Sep 7 14:05:21 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Sep 27 12:12:43 2016 -0400

----------------------------------------------------------------------
 .../amqp/interop/AmqpSendReceiveTest.java       | 284 ++++++++++++-------
 1 file changed, 175 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/cc0e7879/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
index c27c0f9..8a4958f 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
@@ -28,6 +28,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import javax.jms.Queue;
+import javax.jms.Topic;
+
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.junit.ActiveMQTestRunner;
 import org.apache.activemq.junit.Repeat;
@@ -44,7 +47,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Test basic send and receive scenarios using only AMQP sender and receiver links.
+ * Test basic send and receive scenarios using only AMQP sender and receiver
+ * links.
  */
 @RunWith(ActiveMQTestRunner.class)
 public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@@ -52,6 +56,37 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
     protected static final Logger LOG = LoggerFactory.getLogger(AmqpSendReceiveTest.class);
 
     @Test(timeout = 60000)
+    public void testSimpleSendOneReceiveOne() throws Exception {
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+
+        AmqpMessage message = new AmqpMessage();
+
+        message.setMessageId("msg" + 1);
+        message.setMessageAnnotation("serialNo", 1);
+        message.setText("Test-Message");
+
+        sender.send(message);
+        sender.close();
+
+        LOG.info("Attempting to read message with receiver");
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+        receiver.flow(2);
+        AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
+        assertNotNull("Should have read message", received);
+        assertEquals("msg1", received.getMessageId());
+        received.accept();
+
+        receiver.close();
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
     public void testCloseBusyReceiver() throws Exception {
         final int MSG_COUNT = 20;
 
@@ -96,108 +131,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
     }
 
     @Test(timeout = 60000)
-    public void testReceiveFlowDispositionSingleCredit() throws Exception {
-        AmqpClient client = createAmqpClient();
-        AmqpConnection connection = client.connect();
-        AmqpSession session = connection.createSession();
-
-        AmqpSender sender = session.createSender("queue://" + getTestName());
-        for (int i=0;i<2; i++) {
-            AmqpMessage message = new AmqpMessage();
-            message.setMessageId("msg" + i);
-            sender.send(message);
-        }
-        sender.close();
-        connection.close();
-
-        LOG.info("Starting consumer connection");
-        connection = client.connect();
-        session = connection.createSession();
-        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
-        receiver.flow(1);
-        AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
-        assertNotNull(received);
-
-        receiver.flow(1);
-        received.accept();
-
-        received = receiver.receive(5, TimeUnit.SECONDS);
-        assertNotNull(received);
-        received.accept();
-
-        receiver.close();
-        connection.close();
-    }
-
-    @Test(timeout = 60000)
-    public void testReceiveFlowDispositionSingleCreditTopic() throws Exception {
-        final AmqpClient client = createAmqpClient();
-        final LinkedList<Throwable> errors = new LinkedList<Throwable>();
-        final CountDownLatch receiverReady = new CountDownLatch(1);
-        ExecutorService executorService = Executors.newCachedThreadPool();
-
-        executorService.submit(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    LOG.info("Starting consumer connection");
-                    AmqpConnection connection = client.connect();
-                    AmqpSession session = connection.createSession();
-                    AmqpReceiver receiver = session.createReceiver("topic://" + getTestName());
-                    receiver.flow(1);
-                    receiverReady.countDown();
-                    AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
-                    assertNotNull(received);
-
-                    receiver.flow(1);
-                    received.accept();
-
-                    received = receiver.receive(5, TimeUnit.SECONDS);
-                    assertNotNull(received);
-                    received.accept();
-
-                    receiver.close();
-                    connection.close();
-
-                } catch (Exception error) {
-                    errors.add(error);
-                }
-
-            }
-        });
-
-        // producer
-        executorService.submit(new Runnable() {
-            @Override
-            public void run() {
-                try {
-
-                    receiverReady.await(20, TimeUnit.SECONDS);
-                    AmqpConnection connection = client.connect();
-                    AmqpSession session = connection.createSession();
-
-                    AmqpSender sender = session.createSender("topic://" + getTestName());
-                    for (int i = 0; i < 2; i++) {
-                        AmqpMessage message = new AmqpMessage();
-                        message.setMessageId("msg" + i);
-                        sender.send(message);
-                    }
-                    sender.close();
-                    connection.close();
-                } catch (Exception ignored) {
-                    ignored.printStackTrace();
-                }
-
-            }
-        });
-
-        executorService.shutdown();
-        executorService.awaitTermination(20, TimeUnit.SECONDS);
-        assertTrue("no errors: " + errors, errors.isEmpty());
-    }
-
-
-    @Test(timeout = 60000)
     public void testReceiveWithJMSSelectorFilter() throws Exception {
         AmqpClient client = createAmqpClient();
         AmqpConnection connection = client.connect();
@@ -279,7 +212,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
 
         LOG.info("Attempting to read remaining messages with receiver #1");
         receiver1.flow(MSG_COUNT - 4);
-        for (int i = 4; i < MSG_COUNT - 4; i++) {
+        for (int i = 4; i < MSG_COUNT; i++) {
             AmqpMessage message = receiver1.receive(10, TimeUnit.SECONDS);
             assertNotNull("Should have read a message", message);
             assertEquals("msg" + i, message.getMessageId());
@@ -341,20 +274,24 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
         message3.accept();
         message4.accept();
 
-        LOG.info("Attempting to read remaining messages with both receivers");
+        LOG.info("*** Attempting to read remaining messages with both receivers");
         int splitCredit = (MSG_COUNT - 4) / 2;
 
+        LOG.info("**** Receiver #1 granting creadit[{}] for its block of messages", splitCredit);
         receiver1.flow(splitCredit);
-        for (int i = 4; i < splitCredit; i++) {
+        for (int i = 0; i < splitCredit; i++) {
             AmqpMessage message = receiver1.receive(10, TimeUnit.SECONDS);
-            assertNotNull("Should have read a message", message);
+            assertNotNull("Receiver #1 should have read a message", message);
+            LOG.info("Receiver #1 read message: {}", message.getMessageId());
             message.accept();
         }
 
+        LOG.info("**** Receiver #2 granting creadit[{}] for its block of messages", splitCredit);
         receiver2.flow(splitCredit);
-        for (int i = 4; i < splitCredit; i++) {
+        for (int i = 0; i < splitCredit; i++) {
             AmqpMessage message = receiver2.receive(10, TimeUnit.SECONDS);
-            assertNotNull("Should have read a message", message);
+            assertNotNull("Receiver #2 should have read a message", message);
+            LOG.info("Receiver #2 read message: {}", message.getMessageId());
             message.accept();
         }
 
@@ -365,6 +302,135 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
     }
 
     @Test(timeout = 60000)
+    public void testReceiveMessageAndRefillCreditBeforeAcceptOnQueue() throws Exception {
+        doTestReceiveMessageAndRefillCreditBeforeAccept(Queue.class);
+    }
+
+    @Test(timeout = 60000)
+    public void testReceiveMessageAndRefillCreditBeforeAcceptOnTopic() throws Exception {
+        doTestReceiveMessageAndRefillCreditBeforeAccept(Topic.class);
+    }
+
+    private void doTestReceiveMessageAndRefillCreditBeforeAccept(Class<?> destType) throws Exception {
+
+        AmqpClient client = createAmqpClient();
+
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        final String address;
+        if (Queue.class.equals(destType)) {
+            address = "queue://" + getTestName();
+        } else {
+            address = "topic://" + getTestName();
+        }
+
+        AmqpReceiver receiver = session.createReceiver(address);
+        AmqpSender sender = session.createSender(address);
+
+        for (int i = 0; i < 2; i++) {
+            AmqpMessage message = new AmqpMessage();
+            message.setMessageId("msg" + i);
+            sender.send(message);
+        }
+        sender.close();
+
+        receiver.flow(1);
+        AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+        assertNotNull(received);
+
+        receiver.flow(1);
+        received.accept();
+
+        received = receiver.receive(10, TimeUnit.SECONDS);
+        assertNotNull(received);
+        received.accept();
+
+        receiver.close();
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testReceiveMessageAndRefillCreditBeforeAcceptOnQueueAsync() throws Exception {
+        doTestReceiveMessageAndRefillCreditBeforeAcceptOnTopicAsync(Queue.class);
+    }
+
+    @Test(timeout = 60000)
+    public void testReceiveMessageAndRefillCreditBeforeAcceptOnTopicAsync() throws Exception {
+        doTestReceiveMessageAndRefillCreditBeforeAcceptOnTopicAsync(Topic.class);
+    }
+
+    private void doTestReceiveMessageAndRefillCreditBeforeAcceptOnTopicAsync(Class<?> destType) throws Exception {
+        final AmqpClient client = createAmqpClient();
+        final LinkedList<Throwable> errors = new LinkedList<Throwable>();
+        final CountDownLatch receiverReady = new CountDownLatch(1);
+        ExecutorService executorService = Executors.newCachedThreadPool();
+
+        final String address;
+        if (Queue.class.equals(destType)) {
+            address = "queue://" + getTestName();
+        } else {
+            address = "topic://" + getTestName();
+        }
+
+        executorService.submit(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    LOG.info("Starting consumer connection");
+                    AmqpConnection connection = client.connect();
+                    AmqpSession session = connection.createSession();
+                    AmqpReceiver receiver = session.createReceiver(address);
+                    receiver.flow(1);
+                    receiverReady.countDown();
+                    AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+                    assertNotNull(received);
+
+                    receiver.flow(1);
+                    received.accept();
+
+                    received = receiver.receive(5, TimeUnit.SECONDS);
+                    assertNotNull(received);
+                    received.accept();
+
+                    receiver.close();
+                    connection.close();
+
+                } catch (Exception error) {
+                    errors.add(error);
+                }
+            }
+        });
+
+        // producer
+        executorService.submit(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    receiverReady.await(20, TimeUnit.SECONDS);
+                    AmqpConnection connection = client.connect();
+                    AmqpSession session = connection.createSession();
+
+                    AmqpSender sender = session.createSender(address);
+                    for (int i = 0; i < 2; i++) {
+                        AmqpMessage message = new AmqpMessage();
+                        message.setMessageId("msg" + i);
+                        sender.send(message);
+                    }
+                    sender.close();
+                    connection.close();
+                } catch (Exception ignored) {
+                    ignored.printStackTrace();
+                }
+            }
+        });
+
+        executorService.shutdown();
+        executorService.awaitTermination(20, TimeUnit.SECONDS);
+        assertTrue("no errors: " + errors, errors.isEmpty());
+    }
+
+    @Test(timeout = 60000)
     public void testMessageDurabliltyFollowsSpec() throws Exception {
         AmqpClient client = createAmqpClient();
         AmqpConnection connection = client.connect();