You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2017/01/05 16:48:57 UTC

qpid-jms git commit: QPIDJMS-239: add test using the test peer

Repository: qpid-jms
Updated Branches:
  refs/heads/master e7692d9d0 -> 84c95aa1e


QPIDJMS-239: add test using the test peer


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/84c95aa1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/84c95aa1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/84c95aa1

Branch: refs/heads/master
Commit: 84c95aa1e1f5f9b43e32b490dcc1091b343cde3e
Parents: e7692d9
Author: Robert Gemmell <ro...@apache.org>
Authored: Thu Jan 5 15:43:48 2017 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Thu Jan 5 16:48:09 2017 +0000

----------------------------------------------------------------------
 .../integration/ConsumerIntegrationTest.java    | 96 ++++++++++++++++++++
 .../qpid/jms/test/testpeer/AmqpDataFramer.java  | 15 ++-
 .../jms/test/testpeer/TestAmqpPeerRunner.java   |  2 +-
 3 files changed, 108 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/84c95aa1/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index 6c667ec..59eec0a 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -20,12 +20,14 @@ package org.apache.qpid.jms.integration;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -61,12 +63,17 @@ import org.apache.qpid.jms.test.testpeer.matchers.ReleasedMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import org.apache.qpid.jms.util.QpidJMSTestRunner;
+import org.apache.qpid.jms.util.Repeat;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.hamcrest.Matchers;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@RunWith(QpidJMSTestRunner.class)
 public class ConsumerIntegrationTest extends QpidJmsTestCase {
 
     private static final Logger LOG = LoggerFactory.getLogger(ConsumerIntegrationTest.class);
@@ -1066,4 +1073,93 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(2000);
         }
     }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout=20000)
+    public void testRecoverOrderingWithAsyncConsumer() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicReference<Throwable> asyncError = new AtomicReference<Throwable>(null);
+
+        final int recoverCount = 5;
+        final int messageCount = 8;
+        final int testPayloadLength = 255; // Don't go over 255, Proton <= 0.16.0 issue affecting the test[ only].
+        String payload = new String(new byte[testPayloadLength], StandardCharsets.UTF_8);
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            Queue destination = session.createQueue(getTestName());
+            connection.start();
+
+            testPeer.expectReceiverAttach();
+
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType(payload),
+                    messageCount, false, false, Matchers.greaterThan(UnsignedInteger.valueOf(messageCount)), 1, true);
+
+            MessageConsumer consumer = session.createConsumer(destination);
+            consumer.setMessageListener(new MessageListener() {
+                boolean complete;
+                private int messageSeen = 0;
+                private int expectedIndex = 0;
+
+                @Override
+                public void onMessage(Message message) {
+                    if (complete) {
+                        LOG.debug("Ignoring message as test already completed (either pass or fail)");
+                        return;
+                    }
+
+                    try {
+                        int actualIndex = message.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER);
+                        LOG.debug("Got message {}", actualIndex);
+                        assertEquals("Received Message Out Of Order", expectedIndex, actualIndex);
+
+                        // don't ack the message until we receive it X times
+                        if (messageSeen < recoverCount) {
+                            LOG.debug("Ignoring message " + actualIndex + " and calling recover");
+                            session.recover();
+                            messageSeen++;
+                        } else {
+                            messageSeen = 0;
+                            expectedIndex++;
+
+                            // Have the peer expect the accept the disposition (1-based, hence pre-incremented).
+                            testPeer.expectDisposition(true, new AcceptedMatcher(), expectedIndex, expectedIndex);
+
+                            LOG.debug("Acknowledging message {}", actualIndex);
+                            message.acknowledge();
+
+                            //testPeer.waitForAllHandlersToComplete(2000);
+
+                            if (expectedIndex == messageCount) {
+                                complete = true;
+                                latch.countDown();
+                            }
+                        }
+                    } catch (Throwable t) {
+                        complete = true;
+                        asyncError.set(t);
+                        latch.countDown();
+                    }
+                }
+            });
+
+            boolean await = latch.await(15, TimeUnit.SECONDS);
+            assertTrue("Messages not received within given timeout." + latch.getCount(), await);
+
+            Throwable ex = asyncError.get();
+            assertNull("Unexpected exception", ex);
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/84c95aa1/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java
index a2ab389..43b7965 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java
@@ -30,23 +30,29 @@ import org.apache.qpid.proton.codec.Data;
  */
 public class AmqpDataFramer
 {
-    private static final int CAPACITY = 2024;
+    private static final int INITIAL_CAPACITY = 2048;
     private static final byte FRAME_PREAMBLE_SIZE_IN_FOUR_BYTE_WORDS = 2;
+    private static final int FRAME_HEADER_SIZE = 8;
 
     public static byte[] encodeFrame(FrameType type, int channel, DescribedType describedType, Binary payload)
     {
-        ByteBuffer buffer = ByteBuffer.allocate(CAPACITY);  //TODO: set a proper size
+        ByteBuffer buffer = ByteBuffer.allocate(INITIAL_CAPACITY);
 
-        buffer.position(8); // leave hole for frame header
+        buffer.position(FRAME_HEADER_SIZE); // leave hole for frame header
 
         if (describedType != null) {
             Data frameBody = Data.Factory.create();
             frameBody.putDescribedType(describedType);
-            frameBody.encode(buffer);
+
+            long encodedLength = frameBody.encode(buffer);
+            if(encodedLength > buffer.capacity() - FRAME_HEADER_SIZE) {
+                throw new IllegalStateException("Performative encoding exceeded buffer size");
+            }
         }
 
         if(payload != null)
         {
+            //TODO grow buffer if needed rather than throw BOE
             buffer.put(payload.asByteBuffer());
         }
 
@@ -63,4 +69,5 @@ public class AmqpDataFramer
         buffer.get(target, 0, frameSize);
         return target;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/84c95aa1/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
index 38005b0..0f01256 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
@@ -206,7 +206,7 @@ class TestAmqpPeerRunner implements Runnable
 
     public void sendBytes(byte[] bytes)
     {
-        LOGGER.debug("Sending: {}", new Binary(bytes));
+        LOGGER.debug("Sending: {} ({} bytes)", new Binary(bytes), bytes.length);
         try
         {
             _networkOutputStream.write(bytes);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org