You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/09/28 13:36:01 UTC

[04/12] qpid-broker-j git commit: QPID-7837: [Java Broker, AMQP1.0] Detach link when receiving message body containing anything but sections.

QPID-7837: [Java Broker, AMQP1.0] Detach link when receiving message body containing anything but sections.

Previously if the message body contained anything but sections we closed the connection due to a ClassCastException.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/85459e55
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/85459e55
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/85459e55

Branch: refs/heads/master
Commit: 85459e5559a91ffed65f84d7a8aa03da641de57f
Parents: 16caf69
Author: Lorenz Quack <lq...@apache.org>
Authored: Mon Sep 25 10:29:22 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Thu Sep 28 14:30:17 2017 +0100

----------------------------------------------------------------------
 .../server/protocol/v1_0/MessageFormat_1_0.java | 30 +++++------
 .../v1_0/StandardReceivingLinkEndpoint.java     | 27 +++++++---
 .../v1_0/messaging/SectionDecoderImpl.java      | 17 +++++-
 .../v1_0/type/AmqpErrorRuntimeException.java    | 34 ++++++++++++
 .../protocol/v1_0/messaging/MessageFormat.java  | 57 ++++++++++++++++++--
 5 files changed, 135 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85459e55/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageFormat_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageFormat_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageFormat_1_0.java
index ff692c4..a474f4c 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageFormat_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageFormat_1_0.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.plugin.PluggableService;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorRuntimeException;
 import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequenceSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
@@ -94,7 +95,17 @@ public class MessageFormat_1_0 implements MessageFormat<Message_1_0>
     {
         List<EncodingRetainingSection<?>> dataSections = new ArrayList<>();
 
-        MessageMetaData_1_0 mmd = createMessageMetaData(buf, dataSections);
+        List<EncodingRetainingSection<?>> allSections;
+        try
+        {
+            allSections = getSectionDecoder().parseAll(buf);
+        }
+        catch (AmqpErrorException e)
+        {
+            throw new AmqpErrorRuntimeException(e);
+        }
+        MessageMetaData_1_0 mmd = createMessageMetaData(allSections, dataSections);
+
         MessageHandle<MessageMetaData_1_0> handle = store.addMessage(mmd);
 
         for (EncodingRetainingSection<?> dataSection : dataSections)
@@ -112,22 +123,9 @@ public class MessageFormat_1_0 implements MessageFormat<Message_1_0>
         return message;
     }
 
-    private MessageMetaData_1_0 createMessageMetaData(final List<QpidByteBuffer> fragments,
+    private MessageMetaData_1_0 createMessageMetaData(final List<EncodingRetainingSection<?>> allSections,
                                                       final List<EncodingRetainingSection<?>> dataSections)
     {
-
-        List<EncodingRetainingSection<?>> sections;
-        try
-        {
-            sections = getSectionDecoder().parseAll(fragments);
-        }
-        catch (AmqpErrorException e)
-        {
-            LOGGER.error("Decoding read section error", e);
-            // TODO - fix error handling
-            throw new IllegalArgumentException(e);
-        }
-
         long contentSize = 0L;
 
         HeaderSection headerSection = null;
@@ -137,7 +135,7 @@ public class MessageFormat_1_0 implements MessageFormat<Message_1_0>
         ApplicationPropertiesSection applicationPropertiesSection = null;
         FooterSection footerSection = null;
 
-        Iterator<EncodingRetainingSection<?>> iter = sections.iterator();
+        Iterator<EncodingRetainingSection<?>> iter = allSections.iterator();
         EncodingRetainingSection<?> s = iter.hasNext() ? iter.next() : null;
         if (s instanceof HeaderSection)
         {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85459e55/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 1c2b9e3..ea4b8b8 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.model.PublishingLink;
 import org.apache.qpid.server.plugin.MessageFormat;
 import org.apache.qpid.server.protocol.MessageFormatRegistry;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorRuntimeException;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -151,11 +152,28 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
             ServerMessage<?> serverMessage;
             UnsignedInteger messageFormat = delivery.getMessageFormat();
             DeliveryState xfrState = delivery.getState();
-            List<QpidByteBuffer> fragments = delivery.getPayload();
             MessageFormat format = MessageFormatRegistry.getFormat(messageFormat.intValue());
             if(format != null)
             {
-                serverMessage = format.createMessage(fragments, getAddressSpace().getMessageStore(), getSession().getConnection().getReference());
+                List<QpidByteBuffer> fragments = delivery.getPayload();
+                try
+                {
+                    serverMessage = format.createMessage(fragments,
+                                                         getAddressSpace().getMessageStore(),
+                                                         getSession().getConnection().getReference());
+                }
+                catch (AmqpErrorRuntimeException e)
+                {
+                    return e.getCause().getError();
+                }
+                finally
+                {
+                    for(QpidByteBuffer fragment: fragments)
+                    {
+                        fragment.dispose();
+                    }
+                    fragments = null;
+                }
             }
             else
             {
@@ -165,11 +183,6 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
                 return err;
             }
 
-            for(QpidByteBuffer fragment: fragments)
-            {
-                fragment.dispose();
-            }
-            fragments = null;
 
             MessageReference<?> reference = serverMessage.newReference();
             try

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85459e55/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionDecoderImpl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionDecoderImpl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionDecoderImpl.java
index f55c49f..982a242 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionDecoderImpl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionDecoderImpl.java
@@ -28,6 +28,8 @@ import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
 import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 
 public class SectionDecoderImpl implements SectionDecoder
 {
@@ -47,8 +49,19 @@ public class SectionDecoderImpl implements SectionDecoder
         List<EncodingRetainingSection<?>> obj = new ArrayList<>();
         while(QpidByteBufferUtils.hasRemaining(buf))
         {
-            EncodingRetainingSection<?> section = (EncodingRetainingSection<?>) _valueHandler.parse(buf);
-            obj.add(section);
+
+            final Object parsedObject = _valueHandler.parse(buf);
+            if (parsedObject instanceof EncodingRetainingSection)
+            {
+                EncodingRetainingSection<?> section = (EncodingRetainingSection<?>) parsedObject;
+                obj.add(section);
+            }
+            else
+            {
+                throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+                                             String.format("Invalid Message: Expected type \"section\" but found \"%s\"",
+                                                           parsedObject.getClass().getSimpleName()));
+            }
         }
 
         return obj;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85459e55/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/AmqpErrorRuntimeException.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/AmqpErrorRuntimeException.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/AmqpErrorRuntimeException.java
new file mode 100644
index 0000000..8640cca
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/AmqpErrorRuntimeException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.type;
+
+public class AmqpErrorRuntimeException extends RuntimeException
+{
+    public AmqpErrorRuntimeException(final AmqpErrorException underlyingException)
+    {
+        super(underlyingException);
+    }
+
+    @Override
+    public synchronized AmqpErrorException getCause()
+    {
+        return ((AmqpErrorException) super.getCause());
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85459e55/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
index a4058a5..b9148aa 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
@@ -20,21 +20,28 @@
 
 package org.apache.qpid.tests.protocol.v1_0.messaging;
 
+import static org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError.DECODE_ERROR;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.fail;
 
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.codec.StringWriter;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotations;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotationsSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
@@ -45,12 +52,12 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
-import org.apache.qpid.tests.protocol.v1_0.Utils;
-import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
-import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 import org.apache.qpid.tests.protocol.v1_0.Response;
 import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 
 public class MessageFormat extends BrokerAdminUsingTestBase
 {
@@ -121,4 +128,44 @@ public class MessageFormat extends BrokerAdminUsingTestBase
             assertThat(error, is(notNullValue()));
         }
     }
+
+    @Test
+    @SpecificationTest(section = "3.2",
+            description = "Altogether a message consists of the following sections: Zero or one header,"
+                          + " Zero or one delivery-annotations, [...]")
+    public void illegalMessageFormatPayload() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+
+            List<QpidByteBuffer> combinedPayload = new ArrayList<>();
+            final HeaderSection headerSection = new Header().createEncodingRetainingSection();
+            combinedPayload.addAll(headerSection.getEncodedForm());
+            headerSection.dispose();
+            final StringWriter stringWriter = new StringWriter("string in between annotation sections");
+            QpidByteBuffer encodedString = QpidByteBuffer.allocate(stringWriter.getEncodedSize());
+            stringWriter.writeToBuffer(encodedString);
+            encodedString.flip();
+            combinedPayload.add(encodedString);
+            final DeliveryAnnotationsSection deliveryAnnotationsSection = new DeliveryAnnotations(Collections.emptyMap()).createEncodingRetainingSection();
+            combinedPayload.addAll(deliveryAnnotationsSection.getEncodedForm());
+            deliveryAnnotationsSection.dispose();
+
+            final Detach detachResponse = transport.newInteraction()
+                                                   .negotiateProtocol().consumeResponse()
+                                                   .open().consumeResponse(Open.class)
+                                                   .begin().consumeResponse(Begin.class)
+                                                   .attachRole(Role.SENDER)
+                                                   .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                   .attach().consumeResponse(Attach.class)
+                                                   .consumeResponse(Flow.class)
+                                                   .transferMessageFormat(UnsignedInteger.ZERO)
+                                                   .transferPayload(combinedPayload)
+                                                   .transfer()
+                                                   .consumeResponse()
+                                                   .getLatestResponse(Detach.class);
+            assertThat(detachResponse.getError(), is(notNullValue()));
+            assertThat(detachResponse.getError().getCondition(), is(equalTo(DECODE_ERROR)));
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org