You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/08/08 18:27:50 UTC
[2/2] activemq-artemis git commit: ARTEMIS-1978: update to proton-j
0.27.3 to resolve sequencing issues
ARTEMIS-1978: update to proton-j 0.27.3 to resolve sequencing issues
Adds test exposing broker behaviour from issues stemming from PROTON-1892 and PROTON-1901
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b0c65ba2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b0c65ba2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b0c65ba2
Branch: refs/heads/master
Commit: b0c65ba2dd5b7d1bc0dc75f9370c52da49f39bbf
Parents: 6534b6c
Author: Robbie Gemmell <ro...@apache.org>
Authored: Tue Aug 7 15:35:16 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Aug 8 14:27:42 2018 -0400
----------------------------------------------------------------------
pom.xml | 2 +-
.../transport/amqp/client/AmqpReceiver.java | 18 ++-
.../integration/amqp/AmqpLargeMessageTest.java | 138 ++++++++++++++++++-
3 files changed, 149 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0c65ba2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 221cff9..fb95a5e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -92,7 +92,7 @@
<maven.assembly.plugin.version>2.4</maven.assembly.plugin.version>
<mockito.version>2.8.47</mockito.version>
<netty.version>4.1.24.Final</netty.version>
- <proton.version>0.27.1</proton.version>
+ <proton.version>0.27.3</proton.version>
<resteasy.version>3.0.19.Final</resteasy.version>
<slf4j.version>1.7.21</slf4j.version>
<qpid.jms.version>0.33.0</qpid.jms.version>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0c65ba2/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index e9fc75b..fb4e4da 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -363,6 +363,20 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
* if an error occurs while sending the flow.
*/
public void flow(final int credit) throws IOException {
+ flow(credit, false);
+ }
+
+ /**
+ * Controls the amount of credit given to the receiver link.
+ *
+ * @param credit
+ * the amount of credit to grant.
+ * @param deferWrite
+ * defer writing to the wire, hold until for the next operation writes.
+ * @throws IOException
+ * if an error occurs while sending the flow.
+ */
+ public void flow(final int credit, final boolean deferWrite) throws IOException {
checkClosed();
final ClientFuture request = new ClientFuture();
session.getScheduler().execute(new Runnable() {
@@ -372,7 +386,9 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
checkClosed();
try {
getEndpoint().flow(credit);
- session.pumpToProtonTransport(request);
+ if (!deferWrite) {
+ session.pumpToProtonTransport(request);
+ }
request.onSuccess();
} catch (Exception e) {
request.onFailure(e);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0c65ba2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
index 8465c61..6393bae 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
@@ -393,13 +393,7 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
receiver2.flow(numMsgs);
for (int i = 0; i < numMsgs; ++i) {
AmqpMessage message = receiver2.receive(5, TimeUnit.SECONDS);
- assertNotNull("failed at " + i, message);
-
- Section body = message.getWrappedMessage().getBody();
- assertNotNull("No message body for msg " + i, body);
-
- assertTrue("Unexpected message body type for msg " + body.getClass(), body instanceof Data);
- assertEquals("Unexpected body content for msg", new Binary(payload, 0, payload.length), ((Data) body).getValue());
+ validateMessage(payload, i, message);
message.accept();
}
@@ -412,6 +406,136 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
}
@Test(timeout = 60000)
+ public void testReceiveLargeMessagesMultiplexedOnSameSession() throws Exception {
+ server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
+
+ int numMsgs = 10;
+ int maxFrameSize = FRAME_SIZE; // Match the brokers outgoing frame size limit to make window sizing easy
+ int msgSizeA = FRAME_SIZE * 4; // Bigger multi-frame messages
+ int msgSizeB = maxFrameSize / 2; // Smaller single frame messages
+ int sessionCapacity = msgSizeA + maxFrameSize; // Restrict session to 1.X of the larger messages in flight at once, make it likely send is partial.
+
+ byte[] payloadA = createLargePayload(msgSizeA);
+ assertEquals(msgSizeA, payloadA.length);
+ byte[] payloadB = createLargePayload(msgSizeB);
+ assertEquals(msgSizeB, payloadB.length);
+
+ String testQueueNameA = getTestName() + "A";
+ String testQueueNameB = getTestName() + "B";
+
+ AmqpClient client = createAmqpClient();
+
+ AmqpConnection connection = client.createConnection();
+ connection.setMaxFrameSize(maxFrameSize);
+ connection.setSessionIncomingCapacity(sessionCapacity);
+
+ connection.connect();
+ addConnection(connection);
+ try {
+ AmqpSession session = connection.createSession();
+ AmqpSender senderA = session.createSender(testQueueNameA);
+ AmqpSender senderB = session.createSender(testQueueNameB);
+
+ // Send in the messages
+ for (int i = 0; i < numMsgs; ++i) {
+ AmqpMessage messageA = new AmqpMessage();
+ messageA.setBytes(payloadA);
+
+ senderA.send(messageA);
+
+ AmqpMessage messageB = new AmqpMessage();
+ messageB.setBytes(payloadB);
+
+ senderB.send(messageB);
+ }
+
+ Wait.assertEquals(numMsgs, () -> getMessageCount(server.getPostOffice(), testQueueNameA), 5000, 10);
+ Wait.assertEquals(numMsgs, () -> getMessageCount(server.getPostOffice(), testQueueNameB), 5000, 10);
+
+ AmqpReceiver receiverA = session.createReceiver(testQueueNameA);
+ AmqpReceiver receiverB = session.createReceiver(testQueueNameB);
+
+ // Split credit flow to encourage overlapping
+ // Flow initial credit for both consumers, in the same TCP frame.
+ receiverA.flow(numMsgs / 2, true);
+ receiverB.flow(numMsgs / 2);
+
+ // Flow remaining credit for both consumers, in the same TCP frame.
+ receiverA.flow(numMsgs / 2, true);
+ receiverB.flow(numMsgs / 2);
+
+ ArrayList<AmqpMessage> messagesA = new ArrayList<>();
+ ArrayList<AmqpMessage> messagesB = new ArrayList<>();
+
+ long timeout = 6000;
+ long start = System.nanoTime();
+
+ // Validate the messages are all received
+ boolean timeRemaining = true;
+ while (timeRemaining) {
+ if (messagesA.size() < numMsgs) {
+ LOG.debug("Attempting to receive message for receiver A");
+ AmqpMessage messageA = receiverA.receive(20, TimeUnit.MILLISECONDS);
+ if (messageA != null) {
+ LOG.debug("Got message for receiver A");
+ messagesA.add(messageA);
+ messageA.accept();
+ }
+ }
+
+ if (messagesB.size() < numMsgs) {
+ LOG.debug("Attempting to receive message for receiver B");
+ AmqpMessage messageB = receiverB.receive(20, TimeUnit.MILLISECONDS);
+ if (messageB != null) {
+ LOG.debug("Got message for receiver B");
+ messagesB.add(messageB);
+ messageB.accept();
+ }
+ }
+
+ if (messagesA.size() == numMsgs && messagesB.size() == numMsgs) {
+ LOG.debug("Received expected messages");
+ break;
+ }
+
+ timeRemaining = System.nanoTime() - start < TimeUnit.MILLISECONDS.toNanos(timeout);
+ }
+
+ assertTrue("Failed to receive all messages in expected time: A=" + messagesA.size() + ", B=" + messagesB.size(), timeRemaining);
+
+ // Validate there aren't any extras
+ assertNull("Unexpected additional message present for A", receiverA.receiveNoWait());
+ assertNull("Unexpected additional message present for B", receiverB.receiveNoWait());
+
+ // Validate the transfers were reconstituted to give the expected delivery payload.
+ for (int i = 0; i < numMsgs; ++i) {
+ AmqpMessage messageA = messagesA.get(i);
+ validateMessage(payloadA, i, messageA);
+
+ AmqpMessage messageB = messagesB.get(i);
+ validateMessage(payloadB, i, messageB);
+ }
+
+ receiverA.close();
+ receiverB.close();
+
+ session.close();
+ } finally {
+ connection.close();
+ }
+ }
+
+ private void validateMessage(byte[] expectedPayload, int msgNum, AmqpMessage message) {
+ assertNotNull("failed at " + msgNum, message);
+
+ Section body = message.getWrappedMessage().getBody();
+ assertNotNull("No message body for msg " + msgNum, body);
+
+ assertTrue("Unexpected message body type for msg " + body.getClass(), body instanceof Data);
+ assertEquals("Unexpected body content for msg", new Binary(expectedPayload, 0, expectedPayload.length), ((Data) body).getValue());
+ }
+
+ @Test(timeout = 60000)
public void testMessageWithAmqpValueAndEmptyBinaryPreservesBody() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));