You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/10/05 19:20:09 UTC
[07/10] qpid-broker-j git commit: QPID-7957: [Java Broker,
AMQP 1.0] Add support for max-message-size on Attach
QPID-7957: [Java Broker, AMQP 1.0] Add support for max-message-size on Attach
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/10325157
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/10325157
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/10325157
Branch: refs/heads/master
Commit: 103251579d14b82559386f45437e9ea65beca636
Parents: c1dabe6
Author: Lorenz Quack <lq...@apache.org>
Authored: Thu Oct 5 11:11:32 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Thu Oct 5 20:13:44 2017 +0100
----------------------------------------------------------------------
.../server/bytebuffer/QpidByteBufferUtils.java | 9 ++
.../protocol/v1_0/AbstractLinkEndpoint.java | 9 ++
.../v1_0/AbstractReceivingLinkEndpoint.java | 12 +++
.../qpid/server/protocol/v1_0/Delivery.java | 15 +++-
.../protocol/v1_0/messaging/TransferTest.java | 89 +++++++++++++++++++-
5 files changed, 131 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10325157/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferUtils.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferUtils.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferUtils.java
index 2592559..a475ef7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferUtils.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBufferUtils.java
@@ -24,6 +24,15 @@ import java.util.List;
public class QpidByteBufferUtils
{
+ public static void dispose(List<QpidByteBuffer> in)
+ {
+ for (int i = 0, inSize = in.size(); i < inSize; i++)
+ {
+ final QpidByteBuffer qpidByteBuffer = in.get(i);
+ qpidByteBuffer.dispose();
+ }
+ }
+
public static boolean hasRemaining(List<QpidByteBuffer> in)
{
if (in.isEmpty())
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10325157/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
index 429bf94..6328a17 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
@@ -41,6 +41,7 @@ import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
@@ -326,6 +327,14 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
{
attachToSend.setInitialDeliveryCount(_deliveryCount.unsignedIntegerValue());
}
+ else
+ {
+ final long maxMessageSize = getSession().getConnection().getMaxMessageSize();
+ if (maxMessageSize != Long.MAX_VALUE)
+ {
+ attachToSend.setMaxMessageSize(UnsignedLong.valueOf(maxMessageSize));
+ }
+ }
attachToSend = handleOversizedUnsettledMapIfNecessary(attachToSend);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10325157/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
index 7423187..bdb97c1 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
@@ -42,6 +43,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.End;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.SessionError;
@@ -114,6 +116,16 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
_currentDelivery.addTransfer(transfer);
}
+ if (_currentDelivery.getTotalPayloadSize() > getSession().getConnection().getMaxMessageSize())
+ {
+ error = new Error(LinkError.MESSAGE_SIZE_EXCEEDED,
+ String.format("delivery '%s' exceeds max-message-size %d",
+ _currentDelivery.getDeliveryTag(),
+ getSession().getConnection().getMaxMessageSize()));
+ close(error);
+ return;
+ }
+
if (!_currentDelivery.getResume())
{
_unsettled.put(_currentDelivery.getDeliveryTag(), _currentDelivery.getState());
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10325157/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
index 985d7ea..877e934 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
@@ -47,9 +48,11 @@ public class Delivery
private volatile DeliveryState _state;
private volatile ReceiverSettleMode _receiverSettleMode;
private volatile boolean _resume;
+ private volatile long _totalPayloadSize;
public Delivery(Transfer transfer, final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint)
{
+ _totalPayloadSize = 0L;
_deliveryId = transfer.getDeliveryId();
_deliveryTag = transfer.getDeliveryTag();
_linkEndpoint = endpoint;
@@ -97,12 +100,16 @@ public class Delivery
return _messageFormat;
}
-
public boolean getResume()
{
return _resume;
}
+ public long getTotalPayloadSize()
+ {
+ return _totalPayloadSize;
+ }
+
final void addTransfer(Transfer transfer)
{
if (_aborted)
@@ -162,7 +169,13 @@ public class Delivery
{
_receiverSettleMode = transfer.getRcvSettleMode();
}
+ }
+ final List<QpidByteBuffer> payload = transfer.getPayload();
+ if (payload != null)
+ {
+ _totalPayloadSize += QpidByteBufferUtils.remaining(payload);
+ QpidByteBufferUtils.dispose(payload);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10325157/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index fdebd11..5ce5e5d 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -24,10 +24,12 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isOneOf;
+import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeThat;
@@ -64,23 +66,25 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
import org.apache.qpid.server.protocol.v1_0.type.transport.End;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.protocol.v1_0.MessageDecoder;
import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
-import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
import org.apache.qpid.tests.protocol.v1_0.Response;
import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
public class TransferTest extends BrokerAdminUsingTestBase
{
private static final String TEST_MESSAGE_DATA = "foo";
+ private static final long MAX_MAX_MESSAGE_SIZE_WE_ARE_WILLING_TO_TEST = 200 * 1024 * 1024L;
private InetSocketAddress _brokerAddress;
private String _originalMmsMessageStorePersistence;
@@ -794,4 +798,85 @@ public class TransferTest extends BrokerAdminUsingTestBase
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(2)));
}
}
+
+ @Test
+ @SpecificationTest(section = "2.7.3",
+ description = "max-message-size: This field indicates the maximum message size supported by the link"
+ + " endpoint. Any attempt to deliver a message larger than this results in a"
+ + " message-size-exceeded link-error. If this field is zero or unset, there is no maximum"
+ + " size imposed by the link endpoint.")
+ public void exceedMaxMessageSizeLimit() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Binary deliveryTag = new Binary("testDeliveryTag".getBytes(UTF_8));
+
+ Interaction interaction = transport.newInteraction();
+ Open open = interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .getLatestResponse(Open.class);
+
+ long maxFrameSize = open.getMaxFrameSize() == null ? Integer.MAX_VALUE : open.getMaxFrameSize().longValue();
+
+ Attach attach = interaction.begin().consumeResponse(Begin.class)
+ .attachRole(Role.SENDER)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attach().consumeResponse(Attach.class)
+ .getLatestResponse(Attach.class);
+
+ final UnsignedLong maxMessageSizeLimit = attach.getMaxMessageSize();
+ assumeThat(maxMessageSizeLimit, is(notNullValue()));
+ assumeThat(maxMessageSizeLimit.longValue(),
+ is(both(greaterThan(0L)).and(lessThan(MAX_MAX_MESSAGE_SIZE_WE_ARE_WILLING_TO_TEST))));
+
+ Flow flow = interaction.consumeResponse(Flow.class)
+ .getLatestResponse(Flow.class);
+ assertThat(flow.getLinkCredit().intValue(), is(greaterThan(1)));
+
+ final long chunkSize = Math.min(1024 * 1024, maxFrameSize - 100);
+ byte[] payloadChunk = createTestPaload(chunkSize);
+ interaction.transferDeliveryId(UnsignedInteger.ZERO)
+ .transferDeliveryTag(deliveryTag)
+ .transferPayloadData(payloadChunk)
+ .transferSettled(true)
+ .transferMore(true);
+ int payloadSize = 0;
+ while (payloadSize < maxMessageSizeLimit.longValue())
+ {
+ payloadSize += chunkSize;
+ interaction.transfer();
+ }
+
+ while (true)
+ {
+ Response<?> response = interaction.consumeResponse(Flow.class, Disposition.class, Detach.class).getLatestResponse();
+ if (response != null)
+ {
+ if (response.getBody() instanceof Detach)
+ {
+ break;
+ }
+ else if (response.getBody() instanceof Disposition)
+ {
+ assertThat(((Disposition) response.getBody()).getState(), is(instanceOf(Rejected.class)));
+ assertThat(((Rejected) ((Disposition) response.getBody()).getState()).getError(), is(notNullValue()));
+ assertThat(((Rejected) ((Disposition) response.getBody()).getState()).getError().getCondition(), is(equalTo(LinkError.MESSAGE_SIZE_EXCEEDED)));
+ }
+ }
+ }
+ Detach detach = interaction.getLatestResponse(Detach.class);
+
+ assertThat(detach.getError(), is(notNullValue()));
+ assertThat(detach.getError().getCondition(), is(equalTo(LinkError.MESSAGE_SIZE_EXCEEDED)));
+ }
+ }
+
+ private byte[] createTestPaload(final long payloadSize)
+ {
+ if (payloadSize > 1024*1024*1024)
+ {
+ throw new IllegalArgumentException(String.format("Payload size (%.2f MB) too big", payloadSize / (1024. * 1024.)));
+ }
+ return new byte[(int) payloadSize];
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org