You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/09/28 13:36:01 UTC
[04/12] qpid-broker-j git commit: QPID-7837: [Java Broker,
AMQP1.0] Detach link when receiving message body containing anything
but sections.
QPID-7837: [Java Broker, AMQP1.0] Detach link when receiving message body containing anything but sections.
Previously if the message body contained anything but sections we closed the connection due to a ClassCastException.
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/85459e55
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/85459e55
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/85459e55
Branch: refs/heads/master
Commit: 85459e5559a91ffed65f84d7a8aa03da641de57f
Parents: 16caf69
Author: Lorenz Quack <lq...@apache.org>
Authored: Mon Sep 25 10:29:22 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Thu Sep 28 14:30:17 2017 +0100
----------------------------------------------------------------------
.../server/protocol/v1_0/MessageFormat_1_0.java | 30 +++++------
.../v1_0/StandardReceivingLinkEndpoint.java | 27 +++++++---
.../v1_0/messaging/SectionDecoderImpl.java | 17 +++++-
.../v1_0/type/AmqpErrorRuntimeException.java | 34 ++++++++++++
.../protocol/v1_0/messaging/MessageFormat.java | 57 ++++++++++++++++++--
5 files changed, 135 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85459e55/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageFormat_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageFormat_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageFormat_1_0.java
index ff692c4..a474f4c 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageFormat_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageFormat_1_0.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorRuntimeException;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequenceSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
@@ -94,7 +95,17 @@ public class MessageFormat_1_0 implements MessageFormat<Message_1_0>
{
List<EncodingRetainingSection<?>> dataSections = new ArrayList<>();
- MessageMetaData_1_0 mmd = createMessageMetaData(buf, dataSections);
+ List<EncodingRetainingSection<?>> allSections;
+ try
+ {
+ allSections = getSectionDecoder().parseAll(buf);
+ }
+ catch (AmqpErrorException e)
+ {
+ throw new AmqpErrorRuntimeException(e);
+ }
+ MessageMetaData_1_0 mmd = createMessageMetaData(allSections, dataSections);
+
MessageHandle<MessageMetaData_1_0> handle = store.addMessage(mmd);
for (EncodingRetainingSection<?> dataSection : dataSections)
@@ -112,22 +123,9 @@ public class MessageFormat_1_0 implements MessageFormat<Message_1_0>
return message;
}
- private MessageMetaData_1_0 createMessageMetaData(final List<QpidByteBuffer> fragments,
+ private MessageMetaData_1_0 createMessageMetaData(final List<EncodingRetainingSection<?>> allSections,
final List<EncodingRetainingSection<?>> dataSections)
{
-
- List<EncodingRetainingSection<?>> sections;
- try
- {
- sections = getSectionDecoder().parseAll(fragments);
- }
- catch (AmqpErrorException e)
- {
- LOGGER.error("Decoding read section error", e);
- // TODO - fix error handling
- throw new IllegalArgumentException(e);
- }
-
long contentSize = 0L;
HeaderSection headerSection = null;
@@ -137,7 +135,7 @@ public class MessageFormat_1_0 implements MessageFormat<Message_1_0>
ApplicationPropertiesSection applicationPropertiesSection = null;
FooterSection footerSection = null;
- Iterator<EncodingRetainingSection<?>> iter = sections.iterator();
+ Iterator<EncodingRetainingSection<?>> iter = allSections.iterator();
EncodingRetainingSection<?> s = iter.hasNext() ? iter.next() : null;
if (s instanceof HeaderSection)
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85459e55/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 1c2b9e3..ea4b8b8 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.model.PublishingLink;
import org.apache.qpid.server.plugin.MessageFormat;
import org.apache.qpid.server.protocol.MessageFormatRegistry;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorRuntimeException;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -151,11 +152,28 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
ServerMessage<?> serverMessage;
UnsignedInteger messageFormat = delivery.getMessageFormat();
DeliveryState xfrState = delivery.getState();
- List<QpidByteBuffer> fragments = delivery.getPayload();
MessageFormat format = MessageFormatRegistry.getFormat(messageFormat.intValue());
if(format != null)
{
- serverMessage = format.createMessage(fragments, getAddressSpace().getMessageStore(), getSession().getConnection().getReference());
+ List<QpidByteBuffer> fragments = delivery.getPayload();
+ try
+ {
+ serverMessage = format.createMessage(fragments,
+ getAddressSpace().getMessageStore(),
+ getSession().getConnection().getReference());
+ }
+ catch (AmqpErrorRuntimeException e)
+ {
+ return e.getCause().getError();
+ }
+ finally
+ {
+ for(QpidByteBuffer fragment: fragments)
+ {
+ fragment.dispose();
+ }
+ fragments = null;
+ }
}
else
{
@@ -165,11 +183,6 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
return err;
}
- for(QpidByteBuffer fragment: fragments)
- {
- fragment.dispose();
- }
- fragments = null;
MessageReference<?> reference = serverMessage.newReference();
try
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85459e55/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionDecoderImpl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionDecoderImpl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionDecoderImpl.java
index f55c49f..982a242 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionDecoderImpl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionDecoderImpl.java
@@ -28,6 +28,8 @@ import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
public class SectionDecoderImpl implements SectionDecoder
{
@@ -47,8 +49,19 @@ public class SectionDecoderImpl implements SectionDecoder
List<EncodingRetainingSection<?>> obj = new ArrayList<>();
while(QpidByteBufferUtils.hasRemaining(buf))
{
- EncodingRetainingSection<?> section = (EncodingRetainingSection<?>) _valueHandler.parse(buf);
- obj.add(section);
+
+ final Object parsedObject = _valueHandler.parse(buf);
+ if (parsedObject instanceof EncodingRetainingSection)
+ {
+ EncodingRetainingSection<?> section = (EncodingRetainingSection<?>) parsedObject;
+ obj.add(section);
+ }
+ else
+ {
+ throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+ String.format("Invalid Message: Expected type \"section\" but found \"%s\"",
+ parsedObject.getClass().getSimpleName()));
+ }
}
return obj;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85459e55/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/AmqpErrorRuntimeException.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/AmqpErrorRuntimeException.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/AmqpErrorRuntimeException.java
new file mode 100644
index 0000000..8640cca
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/AmqpErrorRuntimeException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.type;
+
+public class AmqpErrorRuntimeException extends RuntimeException
+{
+ public AmqpErrorRuntimeException(final AmqpErrorException underlyingException)
+ {
+ super(underlyingException);
+ }
+
+ @Override
+ public synchronized AmqpErrorException getCause()
+ {
+ return ((AmqpErrorException) super.getCause());
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85459e55/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
index a4058a5..b9148aa 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
@@ -20,21 +20,28 @@
package org.apache.qpid.tests.protocol.v1_0.messaging;
+import static org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError.DECODE_ERROR;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.fail;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.codec.StringWriter;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotations;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotationsSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
@@ -45,12 +52,12 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
-import org.apache.qpid.tests.protocol.v1_0.Utils;
-import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
-import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
import org.apache.qpid.tests.protocol.v1_0.Response;
import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
public class MessageFormat extends BrokerAdminUsingTestBase
{
@@ -121,4 +128,44 @@ public class MessageFormat extends BrokerAdminUsingTestBase
assertThat(error, is(notNullValue()));
}
}
+
+ @Test
+ @SpecificationTest(section = "3.2",
+ description = "Altogether a message consists of the following sections: Zero or one header,"
+ + " Zero or one delivery-annotations, [...]")
+ public void illegalMessageFormatPayload() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+
+ List<QpidByteBuffer> combinedPayload = new ArrayList<>();
+ final HeaderSection headerSection = new Header().createEncodingRetainingSection();
+ combinedPayload.addAll(headerSection.getEncodedForm());
+ headerSection.dispose();
+ final StringWriter stringWriter = new StringWriter("string in between annotation sections");
+ QpidByteBuffer encodedString = QpidByteBuffer.allocate(stringWriter.getEncodedSize());
+ stringWriter.writeToBuffer(encodedString);
+ encodedString.flip();
+ combinedPayload.add(encodedString);
+ final DeliveryAnnotationsSection deliveryAnnotationsSection = new DeliveryAnnotations(Collections.emptyMap()).createEncodingRetainingSection();
+ combinedPayload.addAll(deliveryAnnotationsSection.getEncodedForm());
+ deliveryAnnotationsSection.dispose();
+
+ final Detach detachResponse = transport.newInteraction()
+ .negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.SENDER)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+ .transferMessageFormat(UnsignedInteger.ZERO)
+ .transferPayload(combinedPayload)
+ .transfer()
+ .consumeResponse()
+ .getLatestResponse(Detach.class);
+ assertThat(detachResponse.getError(), is(notNullValue()));
+ assertThat(detachResponse.getError().getCondition(), is(equalTo(DECODE_ERROR)));
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org