You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/06/05 23:01:34 UTC

[1/2] qpid-broker-j git commit: QPID-8203: [Broker-J][AMQP 0-9] Fix maximum message size check

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 560d4a3b3 -> c101fbcd2


QPID-8203: [Broker-J][AMQP 0-9] Fix maximum message size check


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/025b48f3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/025b48f3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/025b48f3

Branch: refs/heads/master
Commit: 025b48f3193e2b10b1c41d2bc3bcfc9cfc238a27
Parents: 560d4a3
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Jun 5 20:01:13 2018 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Jun 5 23:59:19 2018 +0100

----------------------------------------------------------------------
 .../qpid/server/protocol/v0_8/AMQChannel.java   |  5 +-
 .../extension/maxsize/MaximumMessageSize.java   | 83 ++++++++++++++++++++
 2 files changed, 87 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/025b48f3/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index e7bafd0..ac9ea95 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -2217,7 +2217,10 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                 closeChannel(ErrorCodes.MESSAGE_TOO_LARGE,
                              "Message size of " + bodySize + " greater than allowed maximum of " + _connection.getMaxMessageSize());
             }
-            publishContentHeader(new ContentHeaderBody(properties, bodySize));
+            else
+            {
+                publishContentHeader(new ContentHeaderBody(properties, bodySize));
+            }
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/025b48f3/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/maxsize/MaximumMessageSize.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/maxsize/MaximumMessageSize.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/maxsize/MaximumMessageSize.java
new file mode 100644
index 0000000..837b6e5
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/maxsize/MaximumMessageSize.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8.extension.maxsize;
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseOkBody;
+import org.apache.qpid.tests.protocol.ChannelClosedResponse;
+import org.apache.qpid.tests.protocol.v0_8.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_8.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+import org.apache.qpid.tests.utils.ConfigItem;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+@ConfigItem(name = "qpid.max_message_size", value = "1000")
+public class MaximumMessageSize extends BrokerAdminUsingTestBase
+{
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+    }
+
+    @Test
+    public void limitExceeded() throws Exception
+    {
+        String content = Stream.generate(() -> String.valueOf('.')).limit(1001).collect(Collectors.joining());
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .basic().contentHeaderPropertiesContentType("text/plain")
+                       .contentHeaderPropertiesDeliveryMode((byte)1)
+                       .contentHeaderPropertiesPriority((byte)1)
+                       .publishExchange("")
+                       .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+                       .content(content)
+                       .publishMessage()
+                       .consumeResponse(ChannelCloseBody.class)
+                       .channel().closeOk()
+                       .connection().close()
+                       .consumeResponse(ConnectionCloseOkBody.class, ChannelClosedResponse.class);
+
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-broker-j git commit: QPID-8202: [Broker-J][AMQP 0-9] Make sure that message content is loaded from disk only once before sending it to the client

Posted by or...@apache.org.
QPID-8202: [Broker-J][AMQP 0-9] Make sure that message content is loaded from disk only once before sending it to the client


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c101fbcd
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c101fbcd
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c101fbcd

Branch: refs/heads/master
Commit: c101fbcd26f68b249bc8ae47b7afec7e1875f1f9
Parents: 025b48f
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Jun 5 23:38:32 2018 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Jun 5 23:59:53 2018 +0100

----------------------------------------------------------------------
 .../v0_8/ProtocolOutputConverterImpl.java       | 31 +++++++++++---------
 1 file changed, 17 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c101fbcd/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
index d9465b3..bcc515b 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
@@ -183,20 +183,23 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
 
             int writtenSize = capacity;
 
-            AMQBody firstContentBody = new MessageContentSourceBody(content, 0, capacity);
+            try (QpidByteBuffer contentByteBuffer = content.getContent())
+            {
+                AMQBody firstContentBody = new MessageContentSourceBody(contentByteBuffer, 0, capacity);
 
-            CompositeAMQBodyBlock
-                    compositeBlock =
-                    new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
-            writeFrame(compositeBlock);
+                CompositeAMQBodyBlock
+                        compositeBlock =
+                        new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
+                writeFrame(compositeBlock);
 
-            while (writtenSize < bodySize)
-            {
-                capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
-                AMQBody body = new MessageContentSourceBody(content, writtenSize, capacity);
-                writtenSize += capacity;
+                while (writtenSize < bodySize)
+                {
+                    capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+                    AMQBody body = new MessageContentSourceBody(contentByteBuffer, writtenSize, capacity);
+                    writtenSize += capacity;
 
-                writeFrame(new AMQFrame(channelId, body));
+                    writeFrame(new AMQFrame(channelId, body));
+                }
             }
         }
     }
@@ -210,10 +213,10 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
     {
         public static final byte TYPE = 3;
         private final int _length;
-        private final MessageContentSource _content;
+        private final QpidByteBuffer _content;
         private final int _offset;
 
-        public MessageContentSourceBody(MessageContentSource content, int offset, int length)
+        public MessageContentSourceBody(QpidByteBuffer content, int offset, int length)
         {
             _content = content;
             _offset = offset;
@@ -236,7 +239,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
         public long writePayload(final ByteBufferSender sender)
         {
             long size;
-            try (final QpidByteBuffer content = _content.getContent(_offset, _length))
+            try (final QpidByteBuffer content = _content.view(_offset, _length))
             {
                 size = content.remaining();
                 sender.send(content);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org