You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ro...@apache.org on 2018/08/08 19:09:27 UTC

activemq-artemis git commit: ARTEMIS-1978: update to proton-j 0.27.3 to resolve sequencing issues

Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x 9f7afab73 -> 7214d5658


ARTEMIS-1978: update to proton-j 0.27.3 to resolve sequencing issues

Adds test exposing broker behaviour from issues stemming from PROTON-1892 and PROTON-1901

(cherry picked from commit b0c65ba2dd5b7d1bc0dc75f9370c52da49f39bbf)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7214d565
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7214d565
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7214d565

Branch: refs/heads/2.6.x
Commit: 7214d56581cbd5082b9bfddfc57f95e946d2433f
Parents: 9f7afab
Author: Robbie Gemmell <ro...@apache.org>
Authored: Tue Aug 7 15:35:16 2018 +0100
Committer: Robbie Gemmell <ro...@apache.org>
Committed: Wed Aug 8 19:42:39 2018 +0100

----------------------------------------------------------------------
 pom.xml                                         |   2 +-
 .../transport/amqp/client/AmqpReceiver.java     |  18 ++-
 .../integration/amqp/AmqpLargeMessageTest.java  | 130 +++++++++++++++++++
 3 files changed, 148 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7214d565/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7249c2d..6224fa1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -92,7 +92,7 @@
       <maven.assembly.plugin.version>2.4</maven.assembly.plugin.version>
       <mockito.version>2.8.47</mockito.version>
       <netty.version>4.1.24.Final</netty.version>
-      <proton.version>0.27.1</proton.version>
+      <proton.version>0.27.3</proton.version>
       <resteasy.version>3.0.19.Final</resteasy.version>
       <slf4j.version>1.7.21</slf4j.version>
       <qpid.jms.version>0.32.0</qpid.jms.version>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7214d565/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index e9fc75b..fb4e4da 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -363,6 +363,20 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
     *         if an error occurs while sending the flow.
     */
    public void flow(final int credit) throws IOException {
+      flow(credit, false);
+   }
+
+   /**
+    * Controls the amount of credit given to the receiver link.
+    *
+    * @param credit
+    *        the amount of credit to grant.
+    * @param deferWrite
+    *        defer writing to the wire, hold until for the next operation writes.
+    * @throws IOException
+    *         if an error occurs while sending the flow.
+    */
+   public void flow(final int credit, final boolean deferWrite) throws IOException {
       checkClosed();
       final ClientFuture request = new ClientFuture();
       session.getScheduler().execute(new Runnable() {
@@ -372,7 +386,9 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
             checkClosed();
             try {
                getEndpoint().flow(credit);
-               session.pumpToProtonTransport(request);
+               if (!deferWrite) {
+                  session.pumpToProtonTransport(request);
+               }
                request.onSuccess();
             } catch (Exception e) {
                request.onFailure(e);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7214d565/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
index 6bd550a..ef8800c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
@@ -411,6 +411,136 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
       }
    }
 
+   @Test(timeout = 60000)
+   public void testReceiveLargeMessagesMultiplexedOnSameSession() throws Exception {
+      server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
+
+      int numMsgs = 10;
+      int maxFrameSize = FRAME_SIZE; // Match the brokers outgoing frame size limit to make window sizing easy
+      int msgSizeA = FRAME_SIZE * 4; // Bigger multi-frame messages
+      int msgSizeB = maxFrameSize / 2; // Smaller single frame messages
+      int sessionCapacity = msgSizeA + maxFrameSize; // Restrict session to 1.X of the larger messages in flight at once, make it likely send is partial.
+
+      byte[] payloadA = createLargePayload(msgSizeA);
+      assertEquals(msgSizeA, payloadA.length);
+      byte[] payloadB = createLargePayload(msgSizeB);
+      assertEquals(msgSizeB, payloadB.length);
+
+      String testQueueNameA = getTestName() + "A";
+      String testQueueNameB = getTestName() + "B";
+
+      AmqpClient client = createAmqpClient();
+
+      AmqpConnection connection = client.createConnection();
+      connection.setMaxFrameSize(maxFrameSize);
+      connection.setSessionIncomingCapacity(sessionCapacity);
+
+      connection.connect();
+      addConnection(connection);
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender senderA = session.createSender(testQueueNameA);
+         AmqpSender senderB = session.createSender(testQueueNameB);
+
+         // Send in the messages
+         for (int i = 0; i < numMsgs; ++i) {
+            AmqpMessage messageA = new AmqpMessage();
+            messageA.setBytes(payloadA);
+
+            senderA.send(messageA);
+
+            AmqpMessage messageB = new AmqpMessage();
+            messageB.setBytes(payloadB);
+
+            senderB.send(messageB);
+         }
+
+         Wait.assertEquals(numMsgs, () -> getMessageCount(server.getPostOffice(), testQueueNameA), 5000, 10);
+         Wait.assertEquals(numMsgs, () -> getMessageCount(server.getPostOffice(), testQueueNameB), 5000, 10);
+
+         AmqpReceiver receiverA = session.createReceiver(testQueueNameA);
+         AmqpReceiver receiverB = session.createReceiver(testQueueNameB);
+
+         // Split credit flow to encourage overlapping
+         // Flow initial credit for both consumers, in the same TCP frame.
+         receiverA.flow(numMsgs / 2, true);
+         receiverB.flow(numMsgs / 2);
+
+         // Flow remaining credit for both consumers, in the same TCP frame.
+         receiverA.flow(numMsgs / 2, true);
+         receiverB.flow(numMsgs / 2);
+
+         ArrayList<AmqpMessage> messagesA = new ArrayList<>();
+         ArrayList<AmqpMessage> messagesB = new ArrayList<>();
+
+         long timeout = 6000;
+         long start = System.nanoTime();
+
+         // Validate the messages are all received
+         boolean timeRemaining = true;
+         while (timeRemaining) {
+            if (messagesA.size() < numMsgs) {
+               LOG.debug("Attempting to receive message for receiver A");
+               AmqpMessage messageA = receiverA.receive(20, TimeUnit.MILLISECONDS);
+               if (messageA != null) {
+                  LOG.debug("Got message for receiver A");
+                  messagesA.add(messageA);
+                  messageA.accept();
+               }
+            }
+
+            if (messagesB.size() < numMsgs) {
+               LOG.debug("Attempting to receive message for receiver B");
+               AmqpMessage messageB = receiverB.receive(20, TimeUnit.MILLISECONDS);
+               if (messageB != null) {
+                  LOG.debug("Got message for receiver B");
+                  messagesB.add(messageB);
+                  messageB.accept();
+               }
+            }
+
+            if (messagesA.size() == numMsgs && messagesB.size() == numMsgs) {
+               LOG.debug("Received expected messages");
+               break;
+            }
+
+            timeRemaining = System.nanoTime() - start < TimeUnit.MILLISECONDS.toNanos(timeout);
+         }
+
+         assertTrue("Failed to receive all messages in expected time: A=" + messagesA.size() + ", B=" + messagesB.size(), timeRemaining);
+
+         // Validate there aren't any extras
+         assertNull("Unexpected additional message present for A", receiverA.receiveNoWait());
+         assertNull("Unexpected additional message present for B", receiverB.receiveNoWait());
+
+         // Validate the transfers were reconstituted to give the expected delivery payload.
+         for (int i = 0; i < numMsgs; ++i) {
+            AmqpMessage messageA = messagesA.get(i);
+            validateMessage(payloadA, i, messageA);
+
+            AmqpMessage messageB = messagesB.get(i);
+            validateMessage(payloadB, i, messageB);
+         }
+
+         receiverA.close();
+         receiverB.close();
+
+         session.close();
+      } finally {
+         connection.close();
+      }
+   }
+
+   private void validateMessage(byte[] expectedPayload, int msgNum, AmqpMessage message) {
+      assertNotNull("failed at " + msgNum, message);
+
+      Section body = message.getWrappedMessage().getBody();
+      assertNotNull("No message body for msg " + msgNum, body);
+
+      assertTrue("Unexpected message body type for msg " + body.getClass(), body instanceof Data);
+      assertEquals("Unexpected body content for msg", new Binary(expectedPayload, 0, expectedPayload.length), ((Data) body).getValue());
+   }
+
    private void sendObjectMessages(int nMsgs, ConnectionFactory factory) throws Exception {
       try (Connection connection = factory.createConnection()) {
          Session session = connection.createSession();