You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/06/05 23:01:34 UTC
[1/2] qpid-broker-j git commit: QPID-8203: [Broker-J][AMQP 0-9] Fix
maximum message size check
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 560d4a3b3 -> c101fbcd2
QPID-8203: [Broker-J][AMQP 0-9] Fix maximum message size check
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/025b48f3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/025b48f3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/025b48f3
Branch: refs/heads/master
Commit: 025b48f3193e2b10b1c41d2bc3bcfc9cfc238a27
Parents: 560d4a3
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Jun 5 20:01:13 2018 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Jun 5 23:59:19 2018 +0100
----------------------------------------------------------------------
.../qpid/server/protocol/v0_8/AMQChannel.java | 5 +-
.../extension/maxsize/MaximumMessageSize.java | 83 ++++++++++++++++++++
2 files changed, 87 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/025b48f3/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index e7bafd0..ac9ea95 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -2217,7 +2217,10 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
closeChannel(ErrorCodes.MESSAGE_TOO_LARGE,
"Message size of " + bodySize + " greater than allowed maximum of " + _connection.getMaxMessageSize());
}
- publishContentHeader(new ContentHeaderBody(properties, bodySize));
+ else
+ {
+ publishContentHeader(new ContentHeaderBody(properties, bodySize));
+ }
}
else
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/025b48f3/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/maxsize/MaximumMessageSize.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/maxsize/MaximumMessageSize.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/maxsize/MaximumMessageSize.java
new file mode 100644
index 0000000..837b6e5
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/maxsize/MaximumMessageSize.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8.extension.maxsize;
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseOkBody;
+import org.apache.qpid.tests.protocol.ChannelClosedResponse;
+import org.apache.qpid.tests.protocol.v0_8.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_8.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+import org.apache.qpid.tests.utils.ConfigItem;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+@ConfigItem(name = "qpid.max_message_size", value = "1000")
+public class MaximumMessageSize extends BrokerAdminUsingTestBase
+{
+ private InetSocketAddress _brokerAddress;
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ }
+
+ @Test
+ public void limitExceeded() throws Exception
+ {
+ String content = Stream.generate(() -> String.valueOf('.')).limit(1001).collect(Collectors.joining());
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .basic().contentHeaderPropertiesContentType("text/plain")
+ .contentHeaderPropertiesDeliveryMode((byte)1)
+ .contentHeaderPropertiesPriority((byte)1)
+ .publishExchange("")
+ .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+ .content(content)
+ .publishMessage()
+ .consumeResponse(ChannelCloseBody.class)
+ .channel().closeOk()
+ .connection().close()
+ .consumeResponse(ConnectionCloseOkBody.class, ChannelClosedResponse.class);
+
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-broker-j git commit: QPID-8202: [Broker-J][AMQP 0-9] Make
sure that message content is loaded from disk only once before sending it to
the client
Posted by or...@apache.org.
QPID-8202: [Broker-J][AMQP 0-9] Make sure that message content is loaded from disk only once before sending it to the client
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c101fbcd
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c101fbcd
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c101fbcd
Branch: refs/heads/master
Commit: c101fbcd26f68b249bc8ae47b7afec7e1875f1f9
Parents: 025b48f
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Jun 5 23:38:32 2018 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Jun 5 23:59:53 2018 +0100
----------------------------------------------------------------------
.../v0_8/ProtocolOutputConverterImpl.java | 31 +++++++++++---------
1 file changed, 17 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c101fbcd/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
index d9465b3..bcc515b 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
@@ -183,20 +183,23 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
int writtenSize = capacity;
- AMQBody firstContentBody = new MessageContentSourceBody(content, 0, capacity);
+ try (QpidByteBuffer contentByteBuffer = content.getContent())
+ {
+ AMQBody firstContentBody = new MessageContentSourceBody(contentByteBuffer, 0, capacity);
- CompositeAMQBodyBlock
- compositeBlock =
- new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
- writeFrame(compositeBlock);
+ CompositeAMQBodyBlock
+ compositeBlock =
+ new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
+ writeFrame(compositeBlock);
- while (writtenSize < bodySize)
- {
- capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
- AMQBody body = new MessageContentSourceBody(content, writtenSize, capacity);
- writtenSize += capacity;
+ while (writtenSize < bodySize)
+ {
+ capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+ AMQBody body = new MessageContentSourceBody(contentByteBuffer, writtenSize, capacity);
+ writtenSize += capacity;
- writeFrame(new AMQFrame(channelId, body));
+ writeFrame(new AMQFrame(channelId, body));
+ }
}
}
}
@@ -210,10 +213,10 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
public static final byte TYPE = 3;
private final int _length;
- private final MessageContentSource _content;
+ private final QpidByteBuffer _content;
private final int _offset;
- public MessageContentSourceBody(MessageContentSource content, int offset, int length)
+ public MessageContentSourceBody(QpidByteBuffer content, int offset, int length)
{
_content = content;
_offset = offset;
@@ -236,7 +239,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
public long writePayload(final ByteBufferSender sender)
{
long size;
- try (final QpidByteBuffer content = _content.getContent(_offset, _length))
+ try (final QpidByteBuffer content = _content.view(_offset, _length))
{
size = content.remaining();
sender.send(content);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org