You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/11/24 00:20:28 UTC

[qpid-broker-j] 03/04: QPID-8377 : Allow configuration of behaviour with unknown exchange declare arguments

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 7.1.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit a7d5b793bf35d1b3638c694753aa3ed76aa0838c
Author: Robert Godfrey <rg...@apache.org>
AuthorDate: Sun Nov 17 23:12:30 2019 +0100

    QPID-8377 : Allow configuration of behaviour with unknown exchange declare arguments
    
    (cherry picked from commit 03b751e1467f7482c825f6c5f09a89bb0157b057)
---
 .../org/apache/qpid/server/model/Exchange.java     | 11 ++++++++++
 .../protocol/v0_10/ServerSessionDelegate.java      | 24 +++++++++++++++++-----
 .../qpid/server/protocol/v0_8/AMQChannel.java      | 22 ++++++++++++++++----
 3 files changed, 48 insertions(+), 9 deletions(-)

diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java b/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
index eb54c75..800f57c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
@@ -52,6 +52,17 @@ public interface Exchange<X extends Exchange<X>> extends ConfiguredObject<X>, Me
         REJECT, DISCARD
     }
 
+    enum BehaviourOnUnknownDeclareArgument
+    {
+        IGNORE, LOG, FAIL
+    }
+
+    String UNKNOWN_EXCHANGE_DECLARE_ARGUMENT_BEHAVIOUR_NAME = "exchange.behaviourOnUnknownDeclareArgument";
+    @ManagedContextDefault(name= UNKNOWN_EXCHANGE_DECLARE_ARGUMENT_BEHAVIOUR_NAME)
+    BehaviourOnUnknownDeclareArgument
+            ON_UNKNOWN_EXCHANGE_DECLARE_OPTION = BehaviourOnUnknownDeclareArgument.FAIL;
+
+
     // Attributes
 
     @ManagedAttribute(description = "Provides an alternate destination that, depending on behaviour requested by the "
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index d97f1c6..06c1690 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -938,7 +938,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
                         attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_BINDING,
                                        Collections.singletonMap(AlternateBinding.DESTINATION, alternateExchangeName));
                     }
-                    validateExchangeDeclareArguments(attributes, session.getAMQPConnection().getModel());
+                    validateAndSanitizeExchangeDeclareArguments(attributes, session.getAMQPConnection());
                     addressSpace.createMessageDestination(Exchange.class, attributes);;
                 }
                 catch(ReservedExchangeNameException e)
@@ -997,8 +997,9 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
         }
     }
 
-    private void validateExchangeDeclareArguments(final Map<String, Object> attributes, final Model model)
+    private void validateAndSanitizeExchangeDeclareArguments(final Map<String, Object> attributes, final AMQPConnection_0_10 connection)
     {
+        final Model model = connection.getModel();
         final ConfiguredObjectTypeRegistry typeRegistry = model.getTypeRegistry();
         final List<ConfiguredObjectAttribute<?, ?>> types = new ArrayList<>(typeRegistry.getAttributeTypes(Exchange.class).values());
         typeRegistry.getTypeSpecialisations(Exchange.class).forEach(type -> types.addAll(typeRegistry.getTypeSpecificAttributes(type)));
@@ -1007,11 +1008,24 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
                                                   .filter(name -> types.stream().noneMatch(a -> Objects.equals(name, a.getName())
                                                                                                 && !a.isDerived()))
                                                   .collect(Collectors.toSet());
-
         if (!unsupported.isEmpty())
         {
-            throw new IllegalArgumentException(String.format(
-                    "Unsupported exchange declare arguments : %s", String.join(",", unsupported)));
+            Exchange.BehaviourOnUnknownDeclareArgument unknownArgumentBehaviour =
+                    connection.getContextValue(Exchange.BehaviourOnUnknownDeclareArgument.class,
+                                               Exchange.UNKNOWN_EXCHANGE_DECLARE_ARGUMENT_BEHAVIOUR_NAME);
+            switch(unknownArgumentBehaviour)
+            {
+                case LOG:
+                    LOGGER.warn("Unsupported exchange declare arguments : {}", String.join(",", unsupported));
+                    // fall through
+                case IGNORE:
+                    attributes.keySet().removeAll(unsupported);
+                    break;
+                case FAIL:
+                default:
+                    throw new IllegalArgumentException(String.format(
+                            "Unsupported exchange declare arguments : %s", String.join(",", unsupported)));
+            }
         }
     }
 
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 1a1789e..c783687 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -2641,7 +2641,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                         attributes.put(Exchange.ALTERNATE_BINDING,
                                        Collections.singletonMap(AlternateBinding.DESTINATION, alternateExchangeName));
                     }
-                    validateExchangeDeclareArguments(attributes);
+                    validateAndSanitizeExchangeDeclareArguments(attributes);
                     exchange = virtualHost.createMessageDestination(Exchange.class, attributes);
 
                     if (!nowait)
@@ -2718,7 +2718,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
 
     }
 
-    private void validateExchangeDeclareArguments(final Map<String, Object> attributes)
+    private void validateAndSanitizeExchangeDeclareArguments(final Map<String, Object> attributes)
     {
         final ConfiguredObjectTypeRegistry typeRegistry = getModel().getTypeRegistry();
         final List<ConfiguredObjectAttribute<?, ?>> types = new ArrayList<>(typeRegistry.getAttributeTypes(Exchange.class).values());
@@ -2731,8 +2731,22 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
 
         if (!unsupported.isEmpty())
         {
-            throw new IllegalArgumentException(String.format(
-                    "Unsupported exchange declare arguments : %s", String.join(",", unsupported)));
+            Exchange.BehaviourOnUnknownDeclareArgument unknownArgumentBehaviour =
+                    getConnection().getContextValue(Exchange.BehaviourOnUnknownDeclareArgument.class,
+                                                    Exchange.UNKNOWN_EXCHANGE_DECLARE_ARGUMENT_BEHAVIOUR_NAME);
+            switch(unknownArgumentBehaviour)
+            {
+                case LOG:
+                    LOGGER.warn("Unsupported exchange declare arguments : {}", String.join(",", unsupported));
+                    // fall through
+                case IGNORE:
+                    attributes.keySet().removeAll(unsupported);
+                    break;
+                case FAIL:
+                default:
+                    throw new IllegalArgumentException(String.format(
+                        "Unsupported exchange declare arguments : %s", String.join(",", unsupported)));
+            }
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org