You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/12/05 13:44:22 UTC

[1/2] qpid-broker-j git commit: QPID-7694:[Broker-J][AMQP 0-8..0-10] Add validation for exchange declare arguments

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master c80aceab2 -> 8bd6bd420


QPID-7694:[Broker-J][AMQP 0-8..0-10] Add validation for exchange declare arguments


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/8bd6bd42
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/8bd6bd42
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/8bd6bd42

Branch: refs/heads/master
Commit: 8bd6bd4206bc0855b162e0a2bbeb6c8db2fdda22
Parents: ab33d1a
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Dec 5 13:39:21 2018 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Dec 5 13:39:58 2018 +0000

----------------------------------------------------------------------
 .../protocol/v0_10/ServerSessionDelegate.java   |  42 +++++--
 .../qpid/server/protocol/v0_8/AMQChannel.java   |  25 ++++-
 .../protocol/v0_10/ExchangeInteraction.java     |   6 +
 .../v0_10/extensions/exchange/ExchangeTest.java | 109 +++++++++++++++++++
 .../v0_8/extension/exchange/ExchangeTest.java   | 100 +++++++++++++++++
 5 files changed, 269 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8bd6bd42/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 5dda560..d97f1c6 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_10;
 
 import java.nio.charset.StandardCharsets;
 import java.security.AccessControlException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -30,7 +31,10 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 import com.google.common.util.concurrent.Futures;
 import org.slf4j.Logger;
@@ -57,9 +61,12 @@ import org.apache.qpid.server.message.RejectType;
 import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.AlternateBinding;
+import org.apache.qpid.server.model.ConfiguredObjectAttribute;
+import org.apache.qpid.server.model.ConfiguredObjectTypeRegistry;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.ExclusivityPolicy;
 import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Model;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.NoFactoryForTypeException;
 import org.apache.qpid.server.model.Queue;
@@ -870,17 +877,6 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
         String exchangeName = method.getExchange();
         NamedAddressSpace addressSpace = getAddressSpace(session);
 
-        //we must check for any unsupported arguments present and throw not-implemented
-        if(method.hasArguments())
-        {
-            Map<String,Object> args = method.getArguments();
-            //QPID-3392: currently we don't support any!
-            if(!args.isEmpty())
-            {
-                exception(session, method, ExecutionErrorCode.NOT_IMPLEMENTED, "Unsupported exchange argument(s) found " + args.keySet().toString());
-                return;
-            }
-        }
         String alternateExchangeName = method.getAlternateExchange();
         if(nameNullOrEmpty(method.getExchange()))
         {
@@ -926,7 +922,10 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
                 try
                 {
                     Map<String,Object> attributes = new HashMap<String, Object>();
-
+                    if(method.hasArguments())
+                    {
+                        attributes.putAll(method.getArguments());
+                    }
                     attributes.put(org.apache.qpid.server.model.Exchange.NAME, method.getExchange());
                     attributes.put(org.apache.qpid.server.model.Exchange.TYPE, method.getType());
                     attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, method.getDurable());
@@ -939,6 +938,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
                         attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_BINDING,
                                        Collections.singletonMap(AlternateBinding.DESTINATION, alternateExchangeName));
                     }
+                    validateExchangeDeclareArguments(attributes, session.getAMQPConnection().getModel());
                     addressSpace.createMessageDestination(Exchange.class, attributes);;
                 }
                 catch(ReservedExchangeNameException e)
@@ -997,6 +997,24 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
         }
     }
 
+    private void validateExchangeDeclareArguments(final Map<String, Object> attributes, final Model model)
+    {
+        final ConfiguredObjectTypeRegistry typeRegistry = model.getTypeRegistry();
+        final List<ConfiguredObjectAttribute<?, ?>> types = new ArrayList<>(typeRegistry.getAttributeTypes(Exchange.class).values());
+        typeRegistry.getTypeSpecialisations(Exchange.class).forEach(type -> types.addAll(typeRegistry.getTypeSpecificAttributes(type)));
+        final Set<String> unsupported = attributes.keySet()
+                                                  .stream()
+                                                  .filter(name -> types.stream().noneMatch(a -> Objects.equals(name, a.getName())
+                                                                                                && !a.isDerived()))
+                                                  .collect(Collectors.toSet());
+
+        if (!unsupported.isEmpty())
+        {
+            throw new IllegalArgumentException(String.format(
+                    "Unsupported exchange declare arguments : %s", String.join(",", unsupported)));
+        }
+    }
+
     private void exception(ServerSession session, Method method, ExecutionErrorCode errorCode, String description)
     {
         ExecutionException ex = new ExecutionException();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8bd6bd42/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index fc85de2..a70c525 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -35,10 +35,12 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 import javax.security.auth.Subject;
 
@@ -77,6 +79,8 @@ import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.AlternateBinding;
 import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObjectAttribute;
+import org.apache.qpid.server.model.ConfiguredObjectTypeRegistry;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.ExclusivityPolicy;
 import org.apache.qpid.server.model.LifetimePolicy;
@@ -2624,6 +2628,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                         attributes.put(Exchange.ALTERNATE_BINDING,
                                        Collections.singletonMap(AlternateBinding.DESTINATION, alternateExchangeName));
                     }
+                    validateExchangeDeclareArguments(attributes);
                     exchange = virtualHost.createMessageDestination(Exchange.class, attributes);
 
                     if (!nowait)
@@ -2689,7 +2694,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                 }
                 catch (IllegalArgumentException | IllegalConfigurationException e)
                 {
-                    _connection.sendConnectionClose(ErrorCodes.COMMAND_INVALID, "Error creating exchange '"
+                    _connection.sendConnectionClose(ErrorCodes.INVALID_ARGUMENT, "Error creating exchange '"
                                                                                 + exchangeName
                                                                                 + "': "
                                                                                 + e.getMessage(), getChannelId());
@@ -2700,6 +2705,24 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
 
     }
 
+    private void validateExchangeDeclareArguments(final Map<String, Object> attributes)
+    {
+        final ConfiguredObjectTypeRegistry typeRegistry = getModel().getTypeRegistry();
+        final List<ConfiguredObjectAttribute<?, ?>> types = new ArrayList<>(typeRegistry.getAttributeTypes(Exchange.class).values());
+        typeRegistry.getTypeSpecialisations(Exchange.class).forEach(type -> types.addAll(typeRegistry.getTypeSpecificAttributes(type)));
+        final Set<String> unsupported = attributes.keySet()
+                                                  .stream()
+                                                  .filter(name -> types.stream().noneMatch(a -> Objects.equals(name, a.getName())
+                                                                                               && !a.isDerived()))
+                                                  .collect(Collectors.toSet());
+
+        if (!unsupported.isEmpty())
+        {
+            throw new IllegalArgumentException(String.format(
+                    "Unsupported exchange declare arguments : %s", String.join(",", unsupported)));
+        }
+    }
+
     private void validateAlternateExchangeIsNotQueue(final NamedAddressSpace addressSpace, final String alternateExchangeName)
     {
         MessageDestination alternateMessageDestination = addressSpace.getAttainedMessageDestination(alternateExchangeName, false);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8bd6bd42/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExchangeInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExchangeInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExchangeInteraction.java
index 02afaa8..d1fcd45 100644
--- a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExchangeInteraction.java
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExchangeInteraction.java
@@ -235,4 +235,10 @@ public class ExchangeInteraction
     {
         return _interaction.sendPerformative(_unbind);
     }
+
+    public ExchangeInteraction declareArguments(final Map<String, Object> arguments)
+    {
+        _declare.setArguments(arguments);
+        return this;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8bd6bd42/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/exchange/ExchangeTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/exchange/ExchangeTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/exchange/ExchangeTest.java
new file mode 100644
index 0000000..a7a67d1
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/exchange/ExchangeTest.java
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10.extensions.exchange;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.exchange.ExchangeDefaults;
+import org.apache.qpid.server.protocol.v0_10.transport.ExecutionErrorCode;
+import org.apache.qpid.server.protocol.v0_10.transport.ExecutionException;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionCommandPoint;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted;
+import org.apache.qpid.tests.protocol.v0_10.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_10.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+public class ExchangeTest extends BrokerAdminUsingTestBase
+{
+    private InetSocketAddress _brokerAddress;
+    private static final byte[] SESSION_NAME = "test".getBytes(UTF_8);
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+    }
+
+    @Test
+    public void exchangeDeclareValidWireArguments() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            SessionCompleted completed = interaction.openAnonymousConnection()
+                                                    .channelId(1)
+                                                    .attachSession(SESSION_NAME)
+                                                    .exchange()
+                                                    .declareExchange("test")
+                                                    .declareArguments(Collections.singletonMap("unroutableMessageBehaviour", "REJECT"))
+                                                    .declareType(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)
+                                                    .declareId(0)
+                                                    .declare()
+                                                    .session()
+                                                    .flushCompleted()
+                                                    .flush()
+                                                    .consumeResponse()
+                                                    .getLatestResponse(SessionCompleted.class);
+
+            assertThat(completed.getCommands().includes(0), is(equalTo(true)));
+        }
+    }
+
+    @Test
+    public void exchangeDeclareInvalidWireArguments() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channelId(1)
+                       .attachSession(SESSION_NAME)
+                       .exchange()
+                       .declareExchange("test")
+                       .declareType(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)
+                       .declareId(0)
+                       .declareArguments(Collections.singletonMap("foo", "bar"))
+                       .declare()
+                       .session()
+                       .flushCompleted()
+                       .flush();
+
+            ExecutionException exception =
+                    interaction.consume(ExecutionException.class, SessionCompleted.class, SessionCommandPoint.class);
+
+            assertThat(exception.getErrorCode(), is(equalTo(ExecutionErrorCode.ILLEGAL_ARGUMENT)));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8bd6bd42/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/exchange/ExchangeTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/exchange/ExchangeTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/exchange/ExchangeTest.java
new file mode 100644
index 0000000..8ad178a
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/exchange/ExchangeTest.java
@@ -0,0 +1,100 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8.extension.exchange;
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.ErrorCodes;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ExchangeBoundOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeclareOkBody;
+import org.apache.qpid.tests.protocol.v0_8.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_8.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+public class ExchangeTest extends BrokerAdminUsingTestBase
+{
+    private static final String TEST_EXCHANGE = "testExchange";
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+    }
+
+    @Test
+    public void exchangeDeclareValidWireArguments() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .exchange()
+                       .declareName(TEST_EXCHANGE)
+                       .declareArguments(Collections.singletonMap("unroutableMessageBehaviour", "REJECT"))
+                       .declare()
+                       .consumeResponse(ExchangeDeclareOkBody.class);
+
+            ExchangeBoundOkBody response = interaction.exchange()
+                                                      .boundExchangeName(TEST_EXCHANGE)
+                                                      .bound()
+                                                      .consumeResponse()
+                                                      .getLatestResponse(ExchangeBoundOkBody.class);
+            assertThat(response.getReplyCode(), is(equalTo(ExchangeBoundOkBody.NO_BINDINGS)));
+        }
+    }
+
+    @Test
+    public void exchangeDeclareInvalidWireArguments() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            ConnectionCloseBody response = interaction.openAnonymousConnection()
+                                                      .channel()
+                                                      .open()
+                                                      .consumeResponse(ChannelOpenOkBody.class)
+                                                      .exchange()
+                                                      .declareName(TEST_EXCHANGE)
+                                                      .declareArguments(Collections.singletonMap("foo", "bar"))
+                                                      .declare()
+                                                      .consumeResponse(ConnectionCloseBody.class)
+                                                      .getLatestResponse(ConnectionCloseBody.class);
+
+            assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.INVALID_ARGUMENT)));
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-broker-j git commit: QPID-7694:[Broker-J][AMQP 0-8..0-10] Add validation for queue declare arguments

Posted by or...@apache.org.
QPID-7694:[Broker-J][AMQP 0-8..0-10] Add validation for queue declare arguments


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/ab33d1ad
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/ab33d1ad
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/ab33d1ad

Branch: refs/heads/master
Commit: ab33d1ad69d3f27f8a56f046fd959ceb0f62d05a
Parents: c80acea
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Dec 5 12:31:48 2018 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Dec 5 13:39:58 2018 +0000

----------------------------------------------------------------------
 .../server/queue/QueueArgumentsConverter.java   | 86 +++++++++++++-------
 .../qpid/server/protocol/v0_8/AMQChannel.java   |  2 +-
 .../v0_10/extensions/queue/QueueTest.java       | 35 ++++++--
 .../v0_8/extension/queue/QueueTest.java         | 21 ++++-
 4 files changed, 103 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ab33d1ad/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
index e67e9a3..0c01d52 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
@@ -20,12 +20,15 @@
  */
 package org.apache.qpid.server.queue;
 
-import java.util.Collection;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.Objects;
+import java.util.Set;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,10 +66,10 @@ public class QueueArgumentsConverter
     private static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
 
     private static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key";
-    static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled";
+    private static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled";
     private static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count";
-    static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
-    static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group";
+    private static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
+    private static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group";
     private static final String QPID_DEFAULT_MESSAGE_GROUP_ARG = "qpid.default-message-group";
 
     private static final String QPID_MESSAGE_DURABILITY = "qpid.message_durability";
@@ -142,22 +145,28 @@ public class QueueArgumentsConverter
         if(wireArguments != null)
         {
             final ConfiguredObjectTypeRegistry typeRegistry = model.getTypeRegistry();
-            final Map<String, ConfiguredObjectAttribute<?, ?>> attributeTypes =
-                    new HashMap<>(typeRegistry.getAttributeTypes(Queue.class));
+            final List<ConfiguredObjectAttribute<?, ?>> attributeTypes =
+                    new ArrayList<>(typeRegistry.getAttributeTypes(Queue.class).values());
             typeRegistry.getTypeSpecialisations(Queue.class)
-                        .forEach(type -> typeRegistry.getTypeSpecificAttributes(type)
-                                                     .forEach(t -> attributeTypes.put(t.getName(), t)));
+                        .forEach(type -> attributeTypes.addAll(typeRegistry.getTypeSpecificAttributes(type)));
+
+            final Set<String> wireArgumentNames = new HashSet<>(wireArguments.keySet());
             wireArguments.entrySet()
                          .stream()
-                         .filter(entry -> attributeTypes.containsKey(entry.getKey())
-                                          && !attributeTypes.get(entry.getKey()).isDerived())
-                         .forEach(entry -> modelArguments.put(entry.getKey(), entry.getValue()));
+                         .filter(entry -> attributeTypes.stream()
+                                                        .anyMatch(type -> Objects.equals(entry.getKey(), type.getName())
+                                                                          && !type.isDerived()))
+                         .forEach(entry -> {
+                             modelArguments.put(entry.getKey(), entry.getValue());
+                             wireArgumentNames.remove(entry.getKey());
+                         });
 
             for(Map.Entry<String,String> entry : ATTRIBUTE_MAPPINGS.entrySet())
             {
                 if(wireArguments.containsKey(entry.getKey()))
                 {
                     modelArguments.put(entry.getValue(), wireArguments.get(entry.getKey()));
+                    wireArgumentNames.remove(entry.getKey());
                 }
             }
             if(wireArguments.containsKey(QPID_LAST_VALUE_QUEUE) && !wireArguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY))
@@ -169,10 +178,13 @@ public class QueueArgumentsConverter
                 modelArguments.put(Queue.OVERFLOW_POLICY, OverflowPolicy.valueOf(String.valueOf(wireArguments.get(QPID_POLICY_TYPE)).toUpperCase()));
             }
 
-            if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP)
-               && SHARED_MSG_GROUP_ARG_VALUE.equals(String.valueOf(wireArguments.get(QPID_SHARED_MSG_GROUP))))
+            if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP))
             {
-                modelArguments.put(Queue.MESSAGE_GROUP_TYPE, MessageGroupType.SHARED_GROUPS);
+                wireArgumentNames.remove(QPID_SHARED_MSG_GROUP);
+                if (SHARED_MSG_GROUP_ARG_VALUE.equals(String.valueOf(wireArguments.get(QPID_SHARED_MSG_GROUP))))
+                {
+                    modelArguments.put(Queue.MESSAGE_GROUP_TYPE, MessageGroupType.SHARED_GROUPS);
+                }
             }
             else if(wireArguments.containsKey(QPID_GROUP_HEADER_KEY))
             {
@@ -189,34 +201,40 @@ public class QueueArgumentsConverter
                 modelArguments.put(Queue.NO_LOCAL, Boolean.parseBoolean(wireArguments.get(QPID_NO_LOCAL).toString()));
             }
 
-            if (wireArguments.get(X_QPID_FLOW_RESUME_CAPACITY) != null && wireArguments.get(X_QPID_CAPACITY) != null)
+            if (wireArguments.containsKey(X_QPID_FLOW_RESUME_CAPACITY))
             {
-                double resumeCapacity = Integer.parseInt(wireArguments.get(X_QPID_FLOW_RESUME_CAPACITY).toString());
-                double maximumCapacity = Integer.parseInt(wireArguments.get(X_QPID_CAPACITY).toString());
-                if (resumeCapacity > maximumCapacity)
-                {
-                    throw new ConnectionScopedRuntimeException(
-                            "Flow resume size can't be greater than flow control size");
-                }
-                Map<String, String> context = (Map<String, String>) modelArguments.get(Queue.CONTEXT);
-                if (context == null)
+                wireArgumentNames.remove(X_QPID_FLOW_RESUME_CAPACITY);
+                if (wireArguments.get(X_QPID_FLOW_RESUME_CAPACITY) != null && wireArguments.get(X_QPID_CAPACITY) != null)
                 {
-                    context = new HashMap<>();
-                    modelArguments.put(Queue.CONTEXT, context);
+                    double resumeCapacity = Integer.parseInt(wireArguments.get(X_QPID_FLOW_RESUME_CAPACITY).toString());
+                    double maximumCapacity = Integer.parseInt(wireArguments.get(X_QPID_CAPACITY).toString());
+                    if (resumeCapacity > maximumCapacity)
+                    {
+                        throw new ConnectionScopedRuntimeException(
+                                "Flow resume size can't be greater than flow control size");
+                    }
+                    Map<String, String> context = (Map<String, String>) modelArguments.get(Queue.CONTEXT);
+                    if (context == null)
+                    {
+                        context = new HashMap<>();
+                        modelArguments.put(Queue.CONTEXT, context);
+                    }
+                    double ratio = resumeCapacity / maximumCapacity;
+                    context.put(Queue.QUEUE_FLOW_RESUME_LIMIT, String.format("%.2f", ratio * 100.0));
+                    modelArguments.put(Queue.OVERFLOW_POLICY, OverflowPolicy.PRODUCER_FLOW_CONTROL);
                 }
-                double ratio = resumeCapacity / maximumCapacity;
-                context.put(Queue.QUEUE_FLOW_RESUME_LIMIT, String.format("%.2f", ratio * 100.0));
-                modelArguments.put(Queue.OVERFLOW_POLICY, OverflowPolicy.PRODUCER_FLOW_CONTROL);
             }
 
-            if (wireArguments.get(ALTERNATE_EXCHANGE) != null)
+            if (wireArguments.containsKey(ALTERNATE_EXCHANGE))
             {
+                wireArgumentNames.remove(ALTERNATE_EXCHANGE);
                 modelArguments.put(Queue.ALTERNATE_BINDING,
                                    Collections.singletonMap(AlternateBinding.DESTINATION,
                                                             wireArguments.get(ALTERNATE_EXCHANGE)));
             }
             else if (wireArguments.containsKey(X_QPID_DLQ_ENABLED))
             {
+                wireArgumentNames.remove(X_QPID_DLQ_ENABLED);
                 Object argument = wireArguments.get(X_QPID_DLQ_ENABLED);
                 if ((argument instanceof Boolean && ((Boolean) argument).booleanValue())
                     || (argument instanceof String && Boolean.parseBoolean((String)argument)))
@@ -226,6 +244,12 @@ public class QueueArgumentsConverter
                                                                 getDeadLetterQueueName(queueName)));
                 }
             }
+
+            if (!wireArgumentNames.isEmpty())
+            {
+                throw new IllegalArgumentException(String.format("Unsupported queue declare argument(s) : %s",
+                                                                 String.join(",", wireArgumentNames)));
+            }
         }
         return modelArguments;
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ab33d1ad/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index e2048a0..fc85de2 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -3107,7 +3107,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
             catch (IllegalArgumentException | IllegalConfigurationException e)
             {
                 String message = String.format("Error creating queue '%s': %s", queueName, e.getMessage());
-                _connection.sendConnectionClose(ErrorCodes.COMMAND_INVALID, message, getChannelId());
+                _connection.sendConnectionClose(ErrorCodes.INVALID_ARGUMENT, message, getChannelId());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ab33d1ad/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/queue/QueueTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/queue/QueueTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/queue/QueueTest.java
index f5216c2..4365812 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/queue/QueueTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/queue/QueueTest.java
@@ -24,10 +24,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assume.assumeThat;
 
 import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
@@ -37,15 +34,14 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v0_10.transport.ExecutionResult;
+import org.apache.qpid.server.protocol.v0_10.transport.ExecutionErrorCode;
+import org.apache.qpid.server.protocol.v0_10.transport.ExecutionException;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
-import org.apache.qpid.server.protocol.v0_10.transport.QueueQueryResult;
 import org.apache.qpid.server.protocol.v0_10.transport.SessionCommandPoint;
 import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted;
 import org.apache.qpid.server.protocol.v0_10.transport.SessionConfirmed;
-import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v0_10.FrameTransport;
 import org.apache.qpid.tests.protocol.v0_10.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdmin;
@@ -65,7 +61,6 @@ public class QueueTest extends BrokerAdminUsingTestBase
     }
 
     @Test
-    @SpecificationTest(section = "10.queue.declare", description = "This command creates or checks a queue.")
     public void queueDeclareUsingRealQueueAttributesInWireArguments() throws Exception
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
@@ -149,4 +144,30 @@ public class QueueTest extends BrokerAdminUsingTestBase
             }
         }
     }
+
+
+    @Test
+    public void queueDeclareInvalidWireArguments() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channelId(1)
+                       .attachSession(SESSION_NAME)
+                       .queue()
+                       .declareQueue(BrokerAdmin.TEST_QUEUE_NAME)
+                       .declareArguments(Collections.singletonMap("foo", "bar"))
+                       .declareId(0)
+                       .declare()
+                       .session()
+                       .flushCompleted()
+                       .flush();
+
+            ExecutionException exception =
+                    interaction.consume(ExecutionException.class, SessionCompleted.class, SessionCommandPoint.class);
+
+            assertThat(exception.getErrorCode(), is(equalTo(ExecutionErrorCode.ILLEGAL_ARGUMENT)));
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ab33d1ad/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java
index af809e1..4bd9b5a 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java
@@ -37,6 +37,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.ErrorCodes;
 import org.apache.qpid.server.protocol.v0_8.AMQShortString;
 import org.apache.qpid.server.protocol.v0_8.FieldTable;
 import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeOkBody;
@@ -46,10 +47,10 @@ import org.apache.qpid.server.protocol.v0_8.transport.BasicQosOkBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseOkBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ChannelFlowOkBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ContentBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
 import org.apache.qpid.server.protocol.v0_8.transport.QueueDeclareOkBody;
-import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v0_8.FrameTransport;
 import org.apache.qpid.tests.protocol.v0_8.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdmin;
@@ -68,7 +69,6 @@ public class QueueTest extends BrokerAdminUsingTestBase
     }
 
     @Test
-    @SpecificationTest(section = "1.7.2.1", description = "declare queue, create if needed")
     public void queueDeclareUsingRealQueueAttributesInWireArguments() throws Exception
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
@@ -158,4 +158,21 @@ public class QueueTest extends BrokerAdminUsingTestBase
             assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
         }
     }
+
+    @Test
+    public void queueDeclareInvalidWireArguments() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            ConnectionCloseBody response = interaction.openAnonymousConnection()
+                                                      .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                                                      .queue().declareName(BrokerAdmin.TEST_QUEUE_NAME)
+                                                      .declareArguments(Collections.singletonMap("foo", "bar"))
+                                                      .declare()
+                                                      .consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+
+            assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.INVALID_ARGUMENT)));
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org