You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2017/01/05 16:48:57 UTC
qpid-jms git commit: QPIDJMS-239: add test using the test peer
Repository: qpid-jms
Updated Branches:
refs/heads/master e7692d9d0 -> 84c95aa1e
QPIDJMS-239: add test using the test peer
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/84c95aa1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/84c95aa1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/84c95aa1
Branch: refs/heads/master
Commit: 84c95aa1e1f5f9b43e32b490dcc1091b343cde3e
Parents: e7692d9
Author: Robert Gemmell <ro...@apache.org>
Authored: Thu Jan 5 15:43:48 2017 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Thu Jan 5 16:48:09 2017 +0000
----------------------------------------------------------------------
.../integration/ConsumerIntegrationTest.java | 96 ++++++++++++++++++++
.../qpid/jms/test/testpeer/AmqpDataFramer.java | 15 ++-
.../jms/test/testpeer/TestAmqpPeerRunner.java | 2 +-
3 files changed, 108 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/84c95aa1/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index 6c667ec..59eec0a 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -20,12 +20,14 @@ package org.apache.qpid.jms.integration;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -61,12 +63,17 @@ import org.apache.qpid.jms.test.testpeer.matchers.ReleasedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import org.apache.qpid.jms.util.QpidJMSTestRunner;
+import org.apache.qpid.jms.util.Repeat;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.hamcrest.Matchers;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@RunWith(QpidJMSTestRunner.class)
public class ConsumerIntegrationTest extends QpidJmsTestCase {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerIntegrationTest.class);
@@ -1066,4 +1073,93 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(2000);
}
}
+
+ @Repeat(repetitions = 1)
+ @Test(timeout=20000)
+ public void testRecoverOrderingWithAsyncConsumer() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicReference<Throwable> asyncError = new AtomicReference<Throwable>(null);
+
+ final int recoverCount = 5;
+ final int messageCount = 8;
+ final int testPayloadLength = 255; // Don't go over 255, Proton <= 0.16.0 issue affecting the test[ only].
+ String payload = new String(new byte[testPayloadLength], StandardCharsets.UTF_8);
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+
+ final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue destination = session.createQueue(getTestName());
+ connection.start();
+
+ testPeer.expectReceiverAttach();
+
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType(payload),
+ messageCount, false, false, Matchers.greaterThan(UnsignedInteger.valueOf(messageCount)), 1, true);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ consumer.setMessageListener(new MessageListener() {
+ boolean complete;
+ private int messageSeen = 0;
+ private int expectedIndex = 0;
+
+ @Override
+ public void onMessage(Message message) {
+ if (complete) {
+ LOG.debug("Ignoring message as test already completed (either pass or fail)");
+ return;
+ }
+
+ try {
+ int actualIndex = message.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER);
+ LOG.debug("Got message {}", actualIndex);
+ assertEquals("Received Message Out Of Order", expectedIndex, actualIndex);
+
+ // don't ack the message until we receive it X times
+ if (messageSeen < recoverCount) {
+ LOG.debug("Ignoring message " + actualIndex + " and calling recover");
+ session.recover();
+ messageSeen++;
+ } else {
+ messageSeen = 0;
+ expectedIndex++;
+
+ // Have the peer expect the accept the disposition (1-based, hence pre-incremented).
+ testPeer.expectDisposition(true, new AcceptedMatcher(), expectedIndex, expectedIndex);
+
+ LOG.debug("Acknowledging message {}", actualIndex);
+ message.acknowledge();
+
+ //testPeer.waitForAllHandlersToComplete(2000);
+
+ if (expectedIndex == messageCount) {
+ complete = true;
+ latch.countDown();
+ }
+ }
+ } catch (Throwable t) {
+ complete = true;
+ asyncError.set(t);
+ latch.countDown();
+ }
+ }
+ });
+
+ boolean await = latch.await(15, TimeUnit.SECONDS);
+ assertTrue("Messages not received within given timeout." + latch.getCount(), await);
+
+ Throwable ex = asyncError.get();
+ assertNull("Unexpected exception", ex);
+
+ testPeer.waitForAllHandlersToComplete(2000);
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(2000);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/84c95aa1/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java
index a2ab389..43b7965 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java
@@ -30,23 +30,29 @@ import org.apache.qpid.proton.codec.Data;
*/
public class AmqpDataFramer
{
- private static final int CAPACITY = 2024;
+ private static final int INITIAL_CAPACITY = 2048;
private static final byte FRAME_PREAMBLE_SIZE_IN_FOUR_BYTE_WORDS = 2;
+ private static final int FRAME_HEADER_SIZE = 8;
public static byte[] encodeFrame(FrameType type, int channel, DescribedType describedType, Binary payload)
{
- ByteBuffer buffer = ByteBuffer.allocate(CAPACITY); //TODO: set a proper size
+ ByteBuffer buffer = ByteBuffer.allocate(INITIAL_CAPACITY);
- buffer.position(8); // leave hole for frame header
+ buffer.position(FRAME_HEADER_SIZE); // leave hole for frame header
if (describedType != null) {
Data frameBody = Data.Factory.create();
frameBody.putDescribedType(describedType);
- frameBody.encode(buffer);
+
+ long encodedLength = frameBody.encode(buffer);
+ if(encodedLength > buffer.capacity() - FRAME_HEADER_SIZE) {
+ throw new IllegalStateException("Performative encoding exceeded buffer size");
+ }
}
if(payload != null)
{
+ //TODO grow buffer if needed rather than throw BOE
buffer.put(payload.asByteBuffer());
}
@@ -63,4 +69,5 @@ public class AmqpDataFramer
buffer.get(target, 0, frameSize);
return target;
}
+
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/84c95aa1/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
index 38005b0..0f01256 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
@@ -206,7 +206,7 @@ class TestAmqpPeerRunner implements Runnable
public void sendBytes(byte[] bytes)
{
- LOGGER.debug("Sending: {}", new Binary(bytes));
+ LOGGER.debug("Sending: {} ({} bytes)", new Binary(bytes), bytes.length);
try
{
_networkOutputStream.write(bytes);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org