You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/12/05 13:44:23 UTC
[2/2] qpid-broker-j git commit: QPID-7694:[Broker-J][AMQP 0-8..0-10]
Add validation for queue declare arguments
QPID-7694:[Broker-J][AMQP 0-8..0-10] Add validation for queue declare arguments
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/ab33d1ad
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/ab33d1ad
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/ab33d1ad
Branch: refs/heads/master
Commit: ab33d1ad69d3f27f8a56f046fd959ceb0f62d05a
Parents: c80acea
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Dec 5 12:31:48 2018 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Dec 5 13:39:58 2018 +0000
----------------------------------------------------------------------
.../server/queue/QueueArgumentsConverter.java | 86 +++++++++++++-------
.../qpid/server/protocol/v0_8/AMQChannel.java | 2 +-
.../v0_10/extensions/queue/QueueTest.java | 35 ++++++--
.../v0_8/extension/queue/QueueTest.java | 21 ++++-
4 files changed, 103 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ab33d1ad/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
index e67e9a3..0c01d52 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
@@ -20,12 +20,15 @@
*/
package org.apache.qpid.server.queue;
-import java.util.Collection;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.Objects;
+import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,10 +66,10 @@ public class QueueArgumentsConverter
private static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
private static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key";
- static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled";
+ private static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled";
private static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count";
- static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
- static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group";
+ private static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
+ private static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group";
private static final String QPID_DEFAULT_MESSAGE_GROUP_ARG = "qpid.default-message-group";
private static final String QPID_MESSAGE_DURABILITY = "qpid.message_durability";
@@ -142,22 +145,28 @@ public class QueueArgumentsConverter
if(wireArguments != null)
{
final ConfiguredObjectTypeRegistry typeRegistry = model.getTypeRegistry();
- final Map<String, ConfiguredObjectAttribute<?, ?>> attributeTypes =
- new HashMap<>(typeRegistry.getAttributeTypes(Queue.class));
+ final List<ConfiguredObjectAttribute<?, ?>> attributeTypes =
+ new ArrayList<>(typeRegistry.getAttributeTypes(Queue.class).values());
typeRegistry.getTypeSpecialisations(Queue.class)
- .forEach(type -> typeRegistry.getTypeSpecificAttributes(type)
- .forEach(t -> attributeTypes.put(t.getName(), t)));
+ .forEach(type -> attributeTypes.addAll(typeRegistry.getTypeSpecificAttributes(type)));
+
+ final Set<String> wireArgumentNames = new HashSet<>(wireArguments.keySet());
wireArguments.entrySet()
.stream()
- .filter(entry -> attributeTypes.containsKey(entry.getKey())
- && !attributeTypes.get(entry.getKey()).isDerived())
- .forEach(entry -> modelArguments.put(entry.getKey(), entry.getValue()));
+ .filter(entry -> attributeTypes.stream()
+ .anyMatch(type -> Objects.equals(entry.getKey(), type.getName())
+ && !type.isDerived()))
+ .forEach(entry -> {
+ modelArguments.put(entry.getKey(), entry.getValue());
+ wireArgumentNames.remove(entry.getKey());
+ });
for(Map.Entry<String,String> entry : ATTRIBUTE_MAPPINGS.entrySet())
{
if(wireArguments.containsKey(entry.getKey()))
{
modelArguments.put(entry.getValue(), wireArguments.get(entry.getKey()));
+ wireArgumentNames.remove(entry.getKey());
}
}
if(wireArguments.containsKey(QPID_LAST_VALUE_QUEUE) && !wireArguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY))
@@ -169,10 +178,13 @@ public class QueueArgumentsConverter
modelArguments.put(Queue.OVERFLOW_POLICY, OverflowPolicy.valueOf(String.valueOf(wireArguments.get(QPID_POLICY_TYPE)).toUpperCase()));
}
- if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP)
- && SHARED_MSG_GROUP_ARG_VALUE.equals(String.valueOf(wireArguments.get(QPID_SHARED_MSG_GROUP))))
+ if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP))
{
- modelArguments.put(Queue.MESSAGE_GROUP_TYPE, MessageGroupType.SHARED_GROUPS);
+ wireArgumentNames.remove(QPID_SHARED_MSG_GROUP);
+ if (SHARED_MSG_GROUP_ARG_VALUE.equals(String.valueOf(wireArguments.get(QPID_SHARED_MSG_GROUP))))
+ {
+ modelArguments.put(Queue.MESSAGE_GROUP_TYPE, MessageGroupType.SHARED_GROUPS);
+ }
}
else if(wireArguments.containsKey(QPID_GROUP_HEADER_KEY))
{
@@ -189,34 +201,40 @@ public class QueueArgumentsConverter
modelArguments.put(Queue.NO_LOCAL, Boolean.parseBoolean(wireArguments.get(QPID_NO_LOCAL).toString()));
}
- if (wireArguments.get(X_QPID_FLOW_RESUME_CAPACITY) != null && wireArguments.get(X_QPID_CAPACITY) != null)
+ if (wireArguments.containsKey(X_QPID_FLOW_RESUME_CAPACITY))
{
- double resumeCapacity = Integer.parseInt(wireArguments.get(X_QPID_FLOW_RESUME_CAPACITY).toString());
- double maximumCapacity = Integer.parseInt(wireArguments.get(X_QPID_CAPACITY).toString());
- if (resumeCapacity > maximumCapacity)
- {
- throw new ConnectionScopedRuntimeException(
- "Flow resume size can't be greater than flow control size");
- }
- Map<String, String> context = (Map<String, String>) modelArguments.get(Queue.CONTEXT);
- if (context == null)
+ wireArgumentNames.remove(X_QPID_FLOW_RESUME_CAPACITY);
+ if (wireArguments.get(X_QPID_FLOW_RESUME_CAPACITY) != null && wireArguments.get(X_QPID_CAPACITY) != null)
{
- context = new HashMap<>();
- modelArguments.put(Queue.CONTEXT, context);
+ double resumeCapacity = Integer.parseInt(wireArguments.get(X_QPID_FLOW_RESUME_CAPACITY).toString());
+ double maximumCapacity = Integer.parseInt(wireArguments.get(X_QPID_CAPACITY).toString());
+ if (resumeCapacity > maximumCapacity)
+ {
+ throw new ConnectionScopedRuntimeException(
+ "Flow resume size can't be greater than flow control size");
+ }
+ Map<String, String> context = (Map<String, String>) modelArguments.get(Queue.CONTEXT);
+ if (context == null)
+ {
+ context = new HashMap<>();
+ modelArguments.put(Queue.CONTEXT, context);
+ }
+ double ratio = resumeCapacity / maximumCapacity;
+ context.put(Queue.QUEUE_FLOW_RESUME_LIMIT, String.format("%.2f", ratio * 100.0));
+ modelArguments.put(Queue.OVERFLOW_POLICY, OverflowPolicy.PRODUCER_FLOW_CONTROL);
}
- double ratio = resumeCapacity / maximumCapacity;
- context.put(Queue.QUEUE_FLOW_RESUME_LIMIT, String.format("%.2f", ratio * 100.0));
- modelArguments.put(Queue.OVERFLOW_POLICY, OverflowPolicy.PRODUCER_FLOW_CONTROL);
}
- if (wireArguments.get(ALTERNATE_EXCHANGE) != null)
+ if (wireArguments.containsKey(ALTERNATE_EXCHANGE))
{
+ wireArgumentNames.remove(ALTERNATE_EXCHANGE);
modelArguments.put(Queue.ALTERNATE_BINDING,
Collections.singletonMap(AlternateBinding.DESTINATION,
wireArguments.get(ALTERNATE_EXCHANGE)));
}
else if (wireArguments.containsKey(X_QPID_DLQ_ENABLED))
{
+ wireArgumentNames.remove(X_QPID_DLQ_ENABLED);
Object argument = wireArguments.get(X_QPID_DLQ_ENABLED);
if ((argument instanceof Boolean && ((Boolean) argument).booleanValue())
|| (argument instanceof String && Boolean.parseBoolean((String)argument)))
@@ -226,6 +244,12 @@ public class QueueArgumentsConverter
getDeadLetterQueueName(queueName)));
}
}
+
+ if (!wireArgumentNames.isEmpty())
+ {
+ throw new IllegalArgumentException(String.format("Unsupported queue declare argument(s) : %s",
+ String.join(",", wireArgumentNames)));
+ }
}
return modelArguments;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ab33d1ad/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index e2048a0..fc85de2 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -3107,7 +3107,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
catch (IllegalArgumentException | IllegalConfigurationException e)
{
String message = String.format("Error creating queue '%s': %s", queueName, e.getMessage());
- _connection.sendConnectionClose(ErrorCodes.COMMAND_INVALID, message, getChannelId());
+ _connection.sendConnectionClose(ErrorCodes.INVALID_ARGUMENT, message, getChannelId());
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ab33d1ad/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/queue/QueueTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/queue/QueueTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/queue/QueueTest.java
index f5216c2..4365812 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/queue/QueueTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/queue/QueueTest.java
@@ -24,10 +24,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assume.assumeThat;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
@@ -37,15 +34,14 @@ import org.junit.Before;
import org.junit.Test;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v0_10.transport.ExecutionResult;
+import org.apache.qpid.server.protocol.v0_10.transport.ExecutionErrorCode;
+import org.apache.qpid.server.protocol.v0_10.transport.ExecutionException;
import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
-import org.apache.qpid.server.protocol.v0_10.transport.QueueQueryResult;
import org.apache.qpid.server.protocol.v0_10.transport.SessionCommandPoint;
import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted;
import org.apache.qpid.server.protocol.v0_10.transport.SessionConfirmed;
-import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v0_10.FrameTransport;
import org.apache.qpid.tests.protocol.v0_10.Interaction;
import org.apache.qpid.tests.utils.BrokerAdmin;
@@ -65,7 +61,6 @@ public class QueueTest extends BrokerAdminUsingTestBase
}
@Test
- @SpecificationTest(section = "10.queue.declare", description = "This command creates or checks a queue.")
public void queueDeclareUsingRealQueueAttributesInWireArguments() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
@@ -149,4 +144,30 @@ public class QueueTest extends BrokerAdminUsingTestBase
}
}
}
+
+
+ @Test
+ public void queueDeclareInvalidWireArguments() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(SESSION_NAME)
+ .queue()
+ .declareQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .declareArguments(Collections.singletonMap("foo", "bar"))
+ .declareId(0)
+ .declare()
+ .session()
+ .flushCompleted()
+ .flush();
+
+ ExecutionException exception =
+ interaction.consume(ExecutionException.class, SessionCompleted.class, SessionCommandPoint.class);
+
+ assertThat(exception.getErrorCode(), is(equalTo(ExecutionErrorCode.ILLEGAL_ARGUMENT)));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ab33d1ad/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java
index af809e1..4bd9b5a 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java
@@ -37,6 +37,7 @@ import org.junit.Before;
import org.junit.Test;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.ErrorCodes;
import org.apache.qpid.server.protocol.v0_8.AMQShortString;
import org.apache.qpid.server.protocol.v0_8.FieldTable;
import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeOkBody;
@@ -46,10 +47,10 @@ import org.apache.qpid.server.protocol.v0_8.transport.BasicQosOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ChannelFlowOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody;
import org.apache.qpid.server.protocol.v0_8.transport.ContentBody;
import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
import org.apache.qpid.server.protocol.v0_8.transport.QueueDeclareOkBody;
-import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v0_8.FrameTransport;
import org.apache.qpid.tests.protocol.v0_8.Interaction;
import org.apache.qpid.tests.utils.BrokerAdmin;
@@ -68,7 +69,6 @@ public class QueueTest extends BrokerAdminUsingTestBase
}
@Test
- @SpecificationTest(section = "1.7.2.1", description = "declare queue, create if needed")
public void queueDeclareUsingRealQueueAttributesInWireArguments() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
@@ -158,4 +158,21 @@ public class QueueTest extends BrokerAdminUsingTestBase
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
}
}
+
+ @Test
+ public void queueDeclareInvalidWireArguments() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ ConnectionCloseBody response = interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .queue().declareName(BrokerAdmin.TEST_QUEUE_NAME)
+ .declareArguments(Collections.singletonMap("foo", "bar"))
+ .declare()
+ .consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+
+ assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.INVALID_ARGUMENT)));
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org