You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/10/05 19:20:09 UTC

[07/10] qpid-broker-j git commit: QPID-7957: [Java Broker, AMQP 1.0] Add support for max-message-size on Attach

QPID-7957: [Java Broker, AMQP 1.0] Add support for max-message-size on Attach


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/10325157
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/10325157
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/10325157

Branch: refs/heads/master
Commit: 103251579d14b82559386f45437e9ea65beca636
Parents: c1dabe6
Author: Lorenz Quack <lq...@apache.org>
Authored: Thu Oct 5 11:11:32 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Thu Oct 5 20:13:44 2017 +0100

----------------------------------------------------------------------
 .../server/bytebuffer/QpidByteBufferUtils.java  |  9 ++
 .../protocol/v1_0/AbstractLinkEndpoint.java     |  9 ++
 .../v1_0/AbstractReceivingLinkEndpoint.java     | 12 +++
 .../qpid/server/protocol/v1_0/Delivery.java     | 15 +++-
 .../protocol/v1_0/messaging/TransferTest.java   | 89 +++++++++++++++++++-
 5 files changed, 131 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10325157/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferUtils.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferUtils.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferUtils.java
index 2592559..a475ef7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferUtils.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferUtils.java
@@ -24,6 +24,15 @@ import java.util.List;
 
 public class QpidByteBufferUtils
 {
+    public static void dispose(List<QpidByteBuffer> in)
+    {
+        for (int i = 0, inSize = in.size(); i < inSize; i++)
+        {
+            final QpidByteBuffer qpidByteBuffer = in.get(i);
+            qpidByteBuffer.dispose();
+        }
+    }
+
     public static boolean hasRemaining(List<QpidByteBuffer> in)
     {
         if (in.isEmpty())

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10325157/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
index 429bf94..6328a17 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
@@ -41,6 +41,7 @@ import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
 import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
@@ -326,6 +327,14 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
         {
             attachToSend.setInitialDeliveryCount(_deliveryCount.unsignedIntegerValue());
         }
+        else
+        {
+            final long maxMessageSize = getSession().getConnection().getMaxMessageSize();
+            if (maxMessageSize != Long.MAX_VALUE)
+            {
+                attachToSend.setMaxMessageSize(UnsignedLong.valueOf(maxMessageSize));
+            }
+        }
 
         attachToSend = handleOversizedUnsettledMapIfNecessary(attachToSend);
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10325157/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
index 7423187..bdb97c1 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
@@ -42,6 +43,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.End;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.SessionError;
@@ -114,6 +116,16 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
                 _currentDelivery.addTransfer(transfer);
             }
 
+            if (_currentDelivery.getTotalPayloadSize() > getSession().getConnection().getMaxMessageSize())
+            {
+                error = new Error(LinkError.MESSAGE_SIZE_EXCEEDED,
+                                  String.format("delivery '%s' exceeds max-message-size %d",
+                                                _currentDelivery.getDeliveryTag(),
+                                                getSession().getConnection().getMaxMessageSize()));
+                close(error);
+                return;
+            }
+
             if (!_currentDelivery.getResume())
             {
                 _unsettled.put(_currentDelivery.getDeliveryTag(), _currentDelivery.getState());

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10325157/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
index 985d7ea..877e934 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
 import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
@@ -47,9 +48,11 @@ public class Delivery
     private volatile DeliveryState _state;
     private volatile ReceiverSettleMode _receiverSettleMode;
     private volatile boolean _resume;
+    private volatile long _totalPayloadSize;
 
     public Delivery(Transfer transfer, final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint)
     {
+        _totalPayloadSize = 0L;
         _deliveryId = transfer.getDeliveryId();
         _deliveryTag = transfer.getDeliveryTag();
         _linkEndpoint = endpoint;
@@ -97,12 +100,16 @@ public class Delivery
         return _messageFormat;
     }
 
-
     public boolean getResume()
     {
         return _resume;
     }
 
+    public long getTotalPayloadSize()
+    {
+        return _totalPayloadSize;
+    }
+
     final void addTransfer(Transfer transfer)
     {
         if (_aborted)
@@ -162,7 +169,13 @@ public class Delivery
             {
                 _receiverSettleMode = transfer.getRcvSettleMode();
             }
+        }
 
+        final List<QpidByteBuffer> payload = transfer.getPayload();
+        if (payload != null)
+        {
+            _totalPayloadSize += QpidByteBufferUtils.remaining(payload);
+            QpidByteBufferUtils.dispose(payload);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10325157/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index fdebd11..5ce5e5d 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -24,10 +24,12 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.both;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.isOneOf;
+import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeThat;
@@ -64,23 +66,25 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
 import org.apache.qpid.server.protocol.v1_0.type.transport.End;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.protocol.v1_0.MessageDecoder;
 import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
-import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 import org.apache.qpid.tests.protocol.v1_0.Response;
 import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 
 public class TransferTest extends BrokerAdminUsingTestBase
 {
     private static final String TEST_MESSAGE_DATA = "foo";
+    private static final long MAX_MAX_MESSAGE_SIZE_WE_ARE_WILLING_TO_TEST = 200 * 1024 * 1024L;
     private InetSocketAddress _brokerAddress;
     private String _originalMmsMessageStorePersistence;
 
@@ -794,4 +798,85 @@ public class TransferTest extends BrokerAdminUsingTestBase
             assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(2)));
         }
     }
+
+    @Test
+    @SpecificationTest(section = "2.7.3",
+            description = "max-message-size: This field indicates the maximum message size supported by the link"
+                          + " endpoint. Any attempt to deliver a message larger than this results in a"
+                          + " message-size-exceeded link-error. If this field is zero or unset, there is no maximum"
+                          + " size imposed by the link endpoint.")
+    public void exceedMaxMessageSizeLimit() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Binary deliveryTag = new Binary("testDeliveryTag".getBytes(UTF_8));
+
+            Interaction interaction = transport.newInteraction();
+            Open open = interaction.negotiateProtocol().consumeResponse()
+                                   .open().consumeResponse(Open.class)
+                                   .getLatestResponse(Open.class);
+
+            long maxFrameSize = open.getMaxFrameSize() == null ? Integer.MAX_VALUE : open.getMaxFrameSize().longValue();
+
+            Attach attach = interaction.begin().consumeResponse(Begin.class)
+                                       .attachRole(Role.SENDER)
+                                       .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                       .attach().consumeResponse(Attach.class)
+                                       .getLatestResponse(Attach.class);
+
+            final UnsignedLong maxMessageSizeLimit = attach.getMaxMessageSize();
+            assumeThat(maxMessageSizeLimit, is(notNullValue()));
+            assumeThat(maxMessageSizeLimit.longValue(),
+                       is(both(greaterThan(0L)).and(lessThan(MAX_MAX_MESSAGE_SIZE_WE_ARE_WILLING_TO_TEST))));
+
+            Flow flow = interaction.consumeResponse(Flow.class)
+                                   .getLatestResponse(Flow.class);
+            assertThat(flow.getLinkCredit().intValue(), is(greaterThan(1)));
+
+            final long chunkSize = Math.min(1024 * 1024, maxFrameSize - 100);
+            byte[] payloadChunk = createTestPaload(chunkSize);
+            interaction.transferDeliveryId(UnsignedInteger.ZERO)
+                       .transferDeliveryTag(deliveryTag)
+                       .transferPayloadData(payloadChunk)
+                       .transferSettled(true)
+                       .transferMore(true);
+            int payloadSize = 0;
+            while (payloadSize < maxMessageSizeLimit.longValue())
+            {
+                payloadSize += chunkSize;
+                interaction.transfer();
+            }
+
+            while (true)
+            {
+                Response<?> response = interaction.consumeResponse(Flow.class, Disposition.class, Detach.class).getLatestResponse();
+                if (response != null)
+                {
+                    if (response.getBody() instanceof Detach)
+                    {
+                        break;
+                    }
+                    else if (response.getBody() instanceof Disposition)
+                    {
+                        assertThat(((Disposition) response.getBody()).getState(), is(instanceOf(Rejected.class)));
+                        assertThat(((Rejected) ((Disposition) response.getBody()).getState()).getError(), is(notNullValue()));
+                        assertThat(((Rejected) ((Disposition) response.getBody()).getState()).getError().getCondition(), is(equalTo(LinkError.MESSAGE_SIZE_EXCEEDED)));
+                    }
+                }
+            }
+            Detach detach = interaction.getLatestResponse(Detach.class);
+
+            assertThat(detach.getError(), is(notNullValue()));
+            assertThat(detach.getError().getCondition(), is(equalTo(LinkError.MESSAGE_SIZE_EXCEEDED)));
+        }
+    }
+
+    private byte[] createTestPaload(final long payloadSize)
+    {
+        if (payloadSize > 1024*1024*1024)
+        {
+            throw new IllegalArgumentException(String.format("Payload size (%.2f MB) too big", payloadSize / (1024. * 1024.)));
+        }
+        return new byte[(int) payloadSize];
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org