You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/11/28 12:42:16 UTC

qpid-broker-j git commit: QPID-7642: [Broker-J] Let caller to handle bind operation exception for invalid selector

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 082f9bf0d -> e1d8e2fd8


QPID-7642: [Broker-J] Let caller to handle bind operation exception for invalid selector


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/e1d8e2fd
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/e1d8e2fd
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/e1d8e2fd

Branch: refs/heads/master
Commit: e1d8e2fd84123cc518b82a4f7c4cd5fd46244d96
Parents: 082f9bf
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Nov 28 11:40:10 2018 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Nov 28 12:42:06 2018 +0000

----------------------------------------------------------------------
 .../model/ConfiguredObjectFactoryGenerator.java | 25 ++++++-
 .../qpid/server/exchange/AbstractExchange.java  | 74 ++++++++++++++------
 .../server/exchange/DirectExchangeImpl.java     | 55 ++++++---------
 .../server/exchange/FanoutExchangeImpl.java     | 46 +++++-------
 .../qpid/server/exchange/HeadersBinding.java    | 16 +----
 .../server/exchange/HeadersExchangeImpl.java    | 14 +++-
 .../qpid/server/exchange/TopicExchangeImpl.java | 25 ++-----
 .../org/apache/qpid/server/model/Exchange.java  |  5 +-
 .../server/virtualhost/AbstractVirtualHost.java | 14 +++-
 .../server/exchange/DirectExchangeTest.java     | 52 ++++++++++++++
 .../server/exchange/FanoutExchangeTest.java     | 54 ++++++++++++++
 .../server/exchange/HeadersBindingTest.java     | 28 ++++----
 .../server/exchange/HeadersExchangeTest.java    | 64 +++++++++++++++++
 .../qpid/server/exchange/TopicExchangeTest.java | 55 +++++++++++++++
 .../server/queue/AbstractQueueTestBase.java     |  2 +-
 .../protocol/v0_10/ServerSessionDelegate.java   | 11 +++
 .../qpid/server/protocol/v0_8/AMQChannel.java   | 20 ++++--
 17 files changed, 418 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-codegen/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryGenerator.java
----------------------------------------------------------------------
diff --git a/broker-codegen/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryGenerator.java b/broker-codegen/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryGenerator.java
index a63f12f..23cb371 100644
--- a/broker-codegen/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryGenerator.java
+++ b/broker-codegen/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryGenerator.java
@@ -26,8 +26,10 @@ import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import javax.annotation.processing.AbstractProcessor;
 import javax.annotation.processing.Filer;
@@ -242,7 +244,7 @@ public class ConfiguredObjectFactoryGenerator extends AbstractProcessor
     }
     private void processDoOnConfigMethod(final PrintWriter pw, final String className, final ExecutableElement methodElement, final AnnotationMirror annotationMirror)
     {
-
+        pw.println("    @Override");
         pw.print("    public " + methodElement.getReturnType() + " " + methodElement.getSimpleName().toString() + "(");
         boolean first = true;
         for(VariableElement param : methodElement.getParameters())
@@ -262,6 +264,11 @@ public class ConfiguredObjectFactoryGenerator extends AbstractProcessor
             pw.print(getParamName(param));
         }
         pw.println(")");
+        final List<? extends TypeMirror> thrownTypes = methodElement.getThrownTypes();
+        if (!thrownTypes.isEmpty())
+        {
+            pw.println(thrownTypes.stream().map(TypeMirror::toString).collect(Collectors.joining(" , ", "    throws ", "")));
+        }
         pw.println("    {");
 
 
@@ -412,6 +419,12 @@ public class ConfiguredObjectFactoryGenerator extends AbstractProcessor
                    + boxedReturnTypeName
                    + "> execute()");
         pw.println("                {");
+        final List<? extends TypeMirror> thrownTypes = methodElement.getThrownTypes();
+        if (!thrownTypes.isEmpty())
+        {
+            pw.println("                    try");
+            pw.println("                    {");
+        }
         if (methodElement.getReturnType().getKind() != TypeKind.VOID)
         {
             pw.println("                    return Futures.<"
@@ -429,6 +442,16 @@ public class ConfiguredObjectFactoryGenerator extends AbstractProcessor
                        + boxedReturnTypeName
                        + ">immediateFuture(null);");
         }
+        if (!thrownTypes.isEmpty())
+        {
+            pw.println("                    }");
+            pw.println(thrownTypes.stream()
+                                  .map(TypeMirror::toString)
+                                  .collect(Collectors.joining(" | ", "                    catch (", " e)")));
+            pw.println("                    {");
+            pw.println("                        return Futures.immediateFailedFuture(e);");
+            pw.println("                    }");
+        }
         pw.println("                }");
 
         pw.println("                @Override");

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 0351512..c6a7a95 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.binding.BindingImpl;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.filter.AMQInvalidArgumentException;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.BindingMessages;
@@ -216,7 +217,14 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
                 if (messageDestination != null)
                 {
                     Map<String, Object> arguments = b.getArguments() == null ? Collections.emptyMap() : b.getArguments();
-                    onBind(new BindingIdentifier(b.getBindingKey(), messageDestination), arguments);
+                    try
+                    {
+                        onBind(new BindingIdentifier(b.getBindingKey(), messageDestination), arguments);
+                    }
+                    catch (AMQInvalidArgumentException e)
+                    {
+                        throw new IllegalConfigurationException("Unexpected bind argument : " + e.getMessage(), e);
+                    }
                     messageDestination.linkAdded(this, b);
                 }
             }
@@ -567,9 +575,10 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
     }
 
     protected abstract void onBindingUpdated(final BindingIdentifier binding,
-                                             final Map<String, Object> newArguments);
+                                             final Map<String, Object> newArguments) throws AMQInvalidArgumentException;
 
-    protected abstract void onBind(final BindingIdentifier binding, final Map<String, Object> arguments);
+    protected abstract void onBind(final BindingIdentifier binding, final Map<String, Object> arguments)
+            throws AMQInvalidArgumentException;
 
     protected abstract void onUnbind(final BindingIdentifier binding);
 
@@ -666,6 +675,21 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
                         Map<String, Object> arguments,
                         boolean replaceExistingArguments)
     {
+        try
+        {
+            return bindInternal(destination, bindingKey, arguments, replaceExistingArguments);
+        }
+        catch (AMQInvalidArgumentException e)
+        {
+            throw new IllegalArgumentException("Unexpected bind argument : " + e.getMessage(), e);
+        }
+    }
+
+    private boolean bindInternal(final String destination,
+                                 final String bindingKey,
+                                 Map<String, Object> arguments,
+                                 final boolean replaceExistingArguments) throws AMQInvalidArgumentException
+    {
         MessageDestination messageDestination = getAttainedMessageDestination(destination);
         if (messageDestination == null)
         {
@@ -679,32 +703,24 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
 
         Binding newBinding = new BindingImpl(bindingKey, destination, arguments);
 
-        boolean modified = false;
+        Binding previousBinding = null;
         for(Binding b : _bindings)
         {
-
             if (b.getBindingKey().equals(bindingKey) && b.getDestination().equals(messageDestination.getName()))
             {
-                if (replaceExistingArguments)
-                {
-                    _bindings.remove(b);
-                    modified = true;
-                    break;
-                }
-                else
-                {
-                    return false;
-                }
+                previousBinding = b;
+                break;
             }
         }
-        _bindings.add(newBinding);
-        if(isDurable() && messageDestination.isDurable())
+
+        if (previousBinding != null && !replaceExistingArguments)
         {
-            final Collection<Binding> durableBindings = getDurableBindings();
-            attributeSet(DURABLE_BINDINGS, durableBindings, durableBindings);
+            return false;
         }
+
+
         final BindingIdentifier bindingIdentifier = new BindingIdentifier(bindingKey, messageDestination);
-        if(modified)
+        if(previousBinding != null)
         {
             onBindingUpdated(bindingIdentifier, arguments);
         }
@@ -717,6 +733,17 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
             onBind(bindingIdentifier, arguments);
             messageDestination.linkAdded(this, newBinding);
         }
+
+        if (previousBinding != null)
+        {
+            _bindings.remove(previousBinding);
+        }
+        _bindings.add(newBinding);
+        if(isDurable() && messageDestination.isDurable())
+        {
+            final Collection<Binding> durableBindings = getDurableBindings();
+            attributeSet(DURABLE_BINDINGS, durableBindings, durableBindings);
+        }
         return true;
     }
 
@@ -857,16 +884,17 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
 
     @Override
     public boolean addBinding(String bindingKey, final Queue<?> queue, Map<String, Object> arguments)
+            throws AMQInvalidArgumentException
     {
-        return bind(queue.getName(), bindingKey, arguments, false);
+        return bindInternal(queue.getName(), bindingKey, arguments, false);
     }
 
     @Override
     public void replaceBinding(String bindingKey,
                                final Queue<?> queue,
-                               Map<String, Object> arguments)
+                               Map<String, Object> arguments) throws AMQInvalidArgumentException
     {
-        bind(queue.getName(), bindingKey, arguments, true);
+        bindInternal(queue.getName(), bindingKey, arguments, true);
     }
 
     private boolean autoDeleteIfNecessary()

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java
index 4683042..d0171b1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java
@@ -87,6 +87,7 @@ public class DirectExchangeImpl extends AbstractExchange<DirectExchangeImpl> imp
         }
 
         BindingSet putBinding(MessageDestination destination, Map<String, Object> arguments, boolean force)
+                throws AMQInvalidArgumentException
         {
             if (!force && (_unfilteredDestinations.containsKey(destination) || _filteredDestinations.containsKey(
                     destination)))
@@ -95,44 +96,30 @@ public class DirectExchangeImpl extends AbstractExchange<DirectExchangeImpl> imp
             }
             else if(FilterSupport.argumentsContainFilter(arguments))
             {
-                try
+                FilterManager messageFilter = FilterSupport.createMessageFilter(arguments, destination);
+                Map<MessageDestination, String> unfilteredDestinations;
+                Map<MessageDestination, FilterManagerReplacementRoutingKeyTuple> filteredDestinations;
+                if (_unfilteredDestinations.containsKey(destination))
                 {
-                    Map<MessageDestination, String> unfilteredDestinations;
-                    Map<MessageDestination, FilterManagerReplacementRoutingKeyTuple> filteredDestinations;
-                    if (_unfilteredDestinations.containsKey(destination))
-                    {
-                        unfilteredDestinations = new HashMap<>(_unfilteredDestinations);
-                        unfilteredDestinations.remove(destination);
-                    }
-                    else
-                    {
-                        unfilteredDestinations = _unfilteredDestinations;
-                    }
-
-                    filteredDestinations = new HashMap<>(_filteredDestinations);
-                    FilterManager messageFilter = FilterSupport.createMessageFilter(arguments, destination);
-                    String replacementRoutingKey = arguments.get(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY) != null
-                            ? String.valueOf(arguments.get(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY))
-                            : null;
-                    filteredDestinations.put(destination,
-                                       new FilterManagerReplacementRoutingKeyTuple(messageFilter,
-                                                                                   replacementRoutingKey));
-
-                    return new BindingSet(Collections.unmodifiableMap(unfilteredDestinations),
-                                          Collections.unmodifiableMap(filteredDestinations));
-
+                    unfilteredDestinations = new HashMap<>(_unfilteredDestinations);
+                    unfilteredDestinations.remove(destination);
                 }
-                catch (AMQInvalidArgumentException e)
+                else
                 {
-                    LOGGER.warn(
-                            "Binding ignored: cannot parse filter on binding of destination '{}' to exchange '{}' with arguments: {}",
-                            destination.getName(),
-                            DirectExchangeImpl.this.getName(),
-                            arguments,
-                            e);
-                    return this;
+                    unfilteredDestinations = _unfilteredDestinations;
                 }
 
+                filteredDestinations = new HashMap<>(_filteredDestinations);
+
+                String replacementRoutingKey = arguments.get(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY) != null
+                        ? String.valueOf(arguments.get(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY))
+                        : null;
+                filteredDestinations.put(destination,
+                                   new FilterManagerReplacementRoutingKeyTuple(messageFilter,
+                                                                               replacementRoutingKey));
+
+                return new BindingSet(Collections.unmodifiableMap(unfilteredDestinations),
+                                      Collections.unmodifiableMap(filteredDestinations));
             }
             else
             {
@@ -232,6 +219,7 @@ public class DirectExchangeImpl extends AbstractExchange<DirectExchangeImpl> imp
 
     @Override
     protected void onBindingUpdated(final BindingIdentifier binding, final Map<String, Object> newArguments)
+            throws AMQInvalidArgumentException
     {
         String bindingKey = binding.getBindingKey();
 
@@ -241,6 +229,7 @@ public class DirectExchangeImpl extends AbstractExchange<DirectExchangeImpl> imp
 
     @Override
     protected void onBind(final BindingIdentifier binding, final Map<String, Object> arguments)
+            throws AMQInvalidArgumentException
     {
         String bindingKey = binding.getBindingKey();
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java
index 4c64b53..afcbb8c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java
@@ -67,41 +67,29 @@ class FanoutExchangeImpl extends AbstractExchange<FanoutExchangeImpl> implements
         }
 
         BindingSet addBinding(final BindingIdentifier binding, final Map<String, Object> arguments)
+                throws AMQInvalidArgumentException
         {
             MessageDestination destination = binding.getDestination();
             if (FilterSupport.argumentsContainFilter(arguments))
             {
-                try
-                {
-                    Map<MessageDestination, Map<BindingIdentifier, FilterManagerReplacementRoutingKeyTuple>>
-                            filteredDestinations = new HashMap<>(_filteredDestinations);
+                Map<MessageDestination, Map<BindingIdentifier, FilterManagerReplacementRoutingKeyTuple>>
+                        filteredDestinations = new HashMap<>(_filteredDestinations);
 
-                    filteredDestinations.computeIfAbsent(destination, messageDestination -> new HashMap<>());
+                filteredDestinations.computeIfAbsent(destination, messageDestination -> new HashMap<>());
 
-                    Map<BindingIdentifier, FilterManagerReplacementRoutingKeyTuple> bindingsForDestination =
-                            new HashMap<>(filteredDestinations.get(destination));
+                Map<BindingIdentifier, FilterManagerReplacementRoutingKeyTuple> bindingsForDestination =
+                        new HashMap<>(filteredDestinations.get(destination));
 
-                    FilterManager filterManager = FilterSupport.createMessageFilter(arguments, destination);
-                    String replacementRoutingKey = arguments.get(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY) != null
-                            ? String.valueOf(arguments.get(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY))
-                            : null;
+                FilterManager filterManager = FilterSupport.createMessageFilter(arguments, destination);
+                String replacementRoutingKey = arguments.get(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY) != null
+                        ? String.valueOf(arguments.get(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY))
+                        : null;
 
-                    bindingsForDestination.put(binding,
-                                               new FilterManagerReplacementRoutingKeyTuple(filterManager,
-                                                                                           replacementRoutingKey));
-                    filteredDestinations.put(destination, Collections.unmodifiableMap(bindingsForDestination));
-                    return new BindingSet(_unfilteredDestinations, Collections.unmodifiableMap(filteredDestinations));
-                }
-                catch (AMQInvalidArgumentException e)
-                {
-                    LOGGER.warn(
-                            "Binding ignored: cannot parse filter on binding of destination '{}' to exchange '{}' with arguments: {}",
-                            destination.getName(),
-                            FanoutExchangeImpl.this.getName(),
-                            arguments,
-                            e);
-                    return this;
-                }
+                bindingsForDestination.put(binding,
+                                           new FilterManagerReplacementRoutingKeyTuple(filterManager,
+                                                                                       replacementRoutingKey));
+                filteredDestinations.put(destination, Collections.unmodifiableMap(bindingsForDestination));
+                return new BindingSet(_unfilteredDestinations, Collections.unmodifiableMap(filteredDestinations));
             }
             else
             {
@@ -126,6 +114,7 @@ class FanoutExchangeImpl extends AbstractExchange<FanoutExchangeImpl> implements
         }
 
         BindingSet updateBinding(final BindingIdentifier binding, final Map<String, Object> newArguments)
+                throws AMQInvalidArgumentException
         {
             return removeBinding(binding).addBinding(binding, newArguments);
         }
@@ -232,13 +221,14 @@ class FanoutExchangeImpl extends AbstractExchange<FanoutExchangeImpl> implements
 
     @Override
     protected void onBindingUpdated(final BindingIdentifier binding,
-                                    final Map<String, Object> newArguments)
+                                    final Map<String, Object> newArguments) throws AMQInvalidArgumentException
     {
         _bindingSet = _bindingSet.updateBinding(binding, newArguments);
     }
 
     @Override
     protected void onBind(final BindingIdentifier binding, final Map<String, Object> arguments)
+            throws AMQInvalidArgumentException
     {
         _bindingSet = _bindingSet.addBinding(binding, arguments);
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
index 5fb5329..b18d91a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
@@ -61,6 +61,7 @@ class HeadersBinding
      * @param binding the binding to create a header binding using
      */
     public HeadersBinding(AbstractExchange.BindingIdentifier binding, Map<String,Object> arguments)
+            throws AMQInvalidArgumentException
     {
         _binding = binding;
         arguments = arguments == null ? Collections.emptyMap() : arguments;
@@ -77,22 +78,11 @@ class HeadersBinding
         _replacementRoutingKey = key == null ? null : String.valueOf(key);
     }
 
-    private void initMappings()
+    private void initMappings() throws AMQInvalidArgumentException
     {
         if(FilterSupport.argumentsContainFilter(_mappings))
         {
-            try
-            {
-                _filter = FilterSupport.createMessageFilter(_mappings, _binding.getDestination());
-            }
-            catch (AMQInvalidArgumentException e)
-            {
-                LOGGER.warn("Invalid filter in binding queue '"+_binding.getDestination().getName()
-                             +"' with arguments: " + _mappings);
-                _filter = new FilterManager();
-
-                _filter.add("x-exclude-all", new ExcludeAllFilter());
-            }
+            _filter = FilterSupport.createMessageFilter(_mappings, _binding.getDestination());
         }
         for(Map.Entry<String, Object> entry : _mappings.entrySet())
         {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java
index 4bf7a75..a454cfb 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.filter.AMQInvalidArgumentException;
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
@@ -106,13 +107,13 @@ public class HeadersExchangeImpl extends AbstractExchange<HeadersExchangeImpl> i
 
 
     @Override
-    protected void onBind(final BindingIdentifier binding, Map<String,Object> arguments)
+    protected void onBind(final BindingIdentifier binding, Map<String,Object> arguments) throws AMQInvalidArgumentException
     {
         _bindingHeaderMatchers.add(new HeadersBinding(binding, arguments));
     }
 
     @Override
-    protected void onBindingUpdated(final BindingIdentifier binding, final Map<String, Object> arguments)
+    protected void onBindingUpdated(final BindingIdentifier binding, final Map<String, Object> arguments)  throws AMQInvalidArgumentException
     {
         _bindingHeaderMatchers.add(new HeadersBinding(binding, arguments));
     }
@@ -120,7 +121,14 @@ public class HeadersExchangeImpl extends AbstractExchange<HeadersExchangeImpl> i
     @Override
     protected void onUnbind(final BindingIdentifier binding)
     {
-        _bindingHeaderMatchers.remove(new HeadersBinding(binding, Collections.emptyMap()));
+        try
+        {
+            _bindingHeaderMatchers.remove(new HeadersBinding(binding, Collections.emptyMap()));
+        }
+        catch (AMQInvalidArgumentException e)
+        {
+            // ignore
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
index c7f6bc7..f0fc3a8 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
@@ -65,6 +65,7 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
 
     @Override
     protected synchronized void onBindingUpdated(final BindingIdentifier binding, final Map<String, Object> newArguments)
+            throws AMQInvalidArgumentException
     {
         final String bindingKey = binding.getBindingKey();
         final MessageDestination destination = binding.getDestination();
@@ -73,17 +74,10 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
 
         String routingKey = TopicNormalizer.normalize(bindingKey);
 
-        try
-        {
-            if (_bindings.containsKey(binding))
-            {
-                TopicExchangeResult result = _topicExchangeResults.get(routingKey);
-                updateTopicExchangeResult(result, binding, newArguments);
-            }
-        }
-        catch (AMQInvalidArgumentException e)
+        if (_bindings.containsKey(binding))
         {
-            throw new ConnectionScopedRuntimeException(e);
+            TopicExchangeResult result = _topicExchangeResults.get(routingKey);
+            updateTopicExchangeResult(result, binding, newArguments);
         }
     }
 
@@ -217,16 +211,9 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
 
     @Override
     protected void onBind(final BindingIdentifier binding, Map<String, Object> arguments)
+            throws AMQInvalidArgumentException
     {
-        try
-        {
-            bind(binding, arguments);
-        }
-        catch (AMQInvalidArgumentException e)
-        {
-            // TODO - this seems incorrect, handling of invalid bindings should be propagated more cleanly
-            throw new ConnectionScopedRuntimeException(e);
-        }
+        bind(binding, arguments);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java b/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
index c2c044e..eb54c75 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.Map;
 
 import org.apache.qpid.server.exchange.DestinationReferrer;
+import org.apache.qpid.server.filter.AMQInvalidArgumentException;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageSender;
@@ -122,7 +123,7 @@ public interface Exchange<X extends Exchange<X>> extends ConfiguredObject<X>, Me
     @DoOnConfigThread
     boolean addBinding(@Param(name = "bindingKey") String bindingKey,
                        @Param(name = "queue") Queue<?> queue,
-                       @Param(name = "arguments") Map<String, Object> arguments);
+                       @Param(name = "arguments") Map<String, Object> arguments) throws AMQInvalidArgumentException;
 
     @DoOnConfigThread
     boolean deleteBinding(@Param(name = "bindingKey") String bindingKey,
@@ -135,7 +136,7 @@ public interface Exchange<X extends Exchange<X>> extends ConfiguredObject<X>, Me
     @DoOnConfigThread
     void replaceBinding(@Param(name = "bindingKey") String bindingKey,
                         @Param(name = "queue") Queue<?> queue,
-                        @Param(name = "arguments") Map<String, Object> arguments);
+                        @Param(name = "arguments") Map<String, Object> arguments) throws AMQInvalidArgumentException;
 
     QueueManagingVirtualHost<?> getVirtualHost();
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index f038ecf..e54ca5b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -83,6 +83,7 @@ import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
 import org.apache.qpid.server.exchange.DefaultDestination;
 import org.apache.qpid.server.exchange.ExchangeDefaults;
+import org.apache.qpid.server.filter.AMQInvalidArgumentException;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.messages.MessageStoreMessages;
 import org.apache.qpid.server.logging.messages.VirtualHostMessages;
@@ -2876,7 +2877,14 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
 
                         for (String binding : bindings.keySet())
                         {
-                            exchange.addBinding(binding, queue, bindings.get(binding));
+                            try
+                            {
+                                exchange.addBinding(binding, queue, bindings.get(binding));
+                            }
+                            catch (AMQInvalidArgumentException ia)
+                            {
+                                throw new IllegalArgumentException("Unexpected bind argument : " + ia.getMessage(), ia);
+                            }
                         }
                     }
                     else
@@ -2894,6 +2902,10 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
                 throw new IllegalStateException("subscription already in use");
             }
         }
+        catch (AMQInvalidArgumentException e)
+        {
+            throw new IllegalArgumentException("Unexpected bind argument : " + e.getMessage(), e);
+        }
         return queue;
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java b/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java
index 153d85b..02cdd47 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java
@@ -466,4 +466,56 @@ public class DirectExchangeTest extends UnitTestBase
         assertFalse("Message unexpectedly routed to queue", result.hasRoutes());
     }
 
+    @Test
+    public void testBindWithInvalidSelector()
+    {
+        final String queueName = getTestName() + "_queue";
+        _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, queueName));
+
+        final Map<String, Object> bindArguments = Collections.singletonMap(JMS_SELECTOR.toString(), "foo in (");
+
+        try
+        {
+            _exchange.bind(queueName, queueName, bindArguments, false);
+            fail("Queue can be bound when invalid selector expression is supplied as part of bind arguments");
+        }
+        catch (IllegalArgumentException e)
+        {
+            // pass
+        }
+
+        final ServerMessage<?> testMessage = createTestMessage(Collections.singletonMap("foo", "bar"));
+        final RoutingResult<ServerMessage<?>> result = _exchange.route(testMessage, queueName, _instanceProperties);
+
+        assertFalse("Message is unexpectedly routed to queue", result.hasRoutes());
+    }
+
+    @Test
+    public void testBindWithInvalidSelectorWhenBindingExists()
+    {
+        final String queueName = getTestName() + "_queue";
+        _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, queueName));
+
+        final Map<String, Object> bindArguments = Collections.singletonMap(JMS_SELECTOR.toString(), "foo in ('bar')");
+        final boolean isBound = _exchange.bind(queueName, queueName, bindArguments, false);
+        assertTrue("Could not bind queue", isBound);
+
+        final ServerMessage<?> testMessage = createTestMessage(Collections.singletonMap("foo", "bar"));
+        final RoutingResult<ServerMessage<?>> result = _exchange.route(testMessage, queueName, _instanceProperties);
+        assertTrue("Message should be routed to queue", result.hasRoutes());
+
+        final Map<String, Object> bindArguments2 = Collections.singletonMap(JMS_SELECTOR.toString(), "foo in (");
+        try
+        {
+            _exchange.bind(queueName, queueName, bindArguments2, true);
+            fail("Queue can be bound when invalid selector expression is supplied as part of bind arguments");
+        }
+        catch (IllegalArgumentException e)
+        {
+            // pass
+        }
+
+        final RoutingResult<ServerMessage<?>> result2 = _exchange.route(testMessage, queueName, _instanceProperties);
+        assertTrue("Message should be be possible to route using old binding", result2.hasRoutes());
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
index 26d94f8..ae4067a 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
@@ -24,6 +24,7 @@ import static org.apache.qpid.server.filter.AMQPFilterTypes.JMS_SELECTOR;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -398,6 +399,59 @@ public class FanoutExchangeTest extends UnitTestBase
         assertFalse(_exchange.isBound(queue));
     }
 
+    @Test
+    public void testBindWithInvalidSelector()
+    {
+        final String queueName = getTestName() + "_queue";
+        _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, queueName));
+
+        final Map<String, Object> bindArguments = Collections.singletonMap(JMS_SELECTOR.toString(), "foo in (");
+
+        try
+        {
+            _exchange.bind(queueName, "", bindArguments, false);
+            fail("Queue can be bound when invalid selector expression is supplied as part of bind arguments");
+        }
+        catch (IllegalArgumentException e)
+        {
+            // pass
+        }
+
+        final ServerMessage<?> testMessage = createTestMessage(Collections.singletonMap("foo", "bar"));
+        final RoutingResult<ServerMessage<?>> result = _exchange.route(testMessage, queueName, _instanceProperties);
+
+        assertFalse("Message is unexpectedly routed to queue", result.hasRoutes());
+    }
+
+    @Test
+    public void testBindWithInvalidSelectorWhenBindingExists()
+    {
+        final String queueName = getTestName() + "_queue";
+        _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, queueName));
+
+        final Map<String, Object> bindArguments = Collections.singletonMap(JMS_SELECTOR.toString(), "foo in ('bar')");
+        final boolean isBound = _exchange.bind(queueName, "", bindArguments, false);
+        assertTrue("Could not bind queue", isBound);
+
+        final ServerMessage<?> testMessage = createTestMessage(Collections.singletonMap("foo", "bar"));
+        final RoutingResult<ServerMessage<?>> result = _exchange.route(testMessage, queueName, _instanceProperties);
+        assertTrue("Message should be routed to queue", result.hasRoutes());
+
+        final Map<String, Object> bindArguments2 = Collections.singletonMap(JMS_SELECTOR.toString(), "foo in (");
+        try
+        {
+            _exchange.bind(queueName, "", bindArguments2, true);
+            fail("Queue can be bound when invalid selector expression is supplied as part of bind arguments");
+        }
+        catch (IllegalArgumentException e)
+        {
+            // pass
+        }
+
+        final RoutingResult<ServerMessage<?>> result2 = _exchange.route(testMessage, "", _instanceProperties);
+        assertTrue("Message should be be possible to route using old binding", result2.hasRoutes());
+    }
+
     private ServerMessage<?> createTestMessage(Map<String, Object> headerValues)
     {
         AMQMessageHeader header = mock(AMQMessageHeader.class);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
index 126c07d..7947b09 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
@@ -207,7 +207,7 @@ public class HeadersBindingTest extends UnitTestBase
     }
 
     @Test
-    public void testDefault_1()
+    public void testDefault_1() throws Exception
     {
         bindHeaders.put("A", "Value of A");
 
@@ -219,7 +219,7 @@ public class HeadersBindingTest extends UnitTestBase
     }
 
     @Test
-    public void testDefault_2()
+    public void testDefault_2() throws Exception
     {
         bindHeaders.put("A", "Value of A");
 
@@ -232,7 +232,7 @@ public class HeadersBindingTest extends UnitTestBase
     }
 
     @Test
-    public void testDefault_3()
+    public void testDefault_3() throws Exception
     {
         bindHeaders.put("A", "Value of A");
 
@@ -244,7 +244,7 @@ public class HeadersBindingTest extends UnitTestBase
     }
 
     @Test
-    public void testAll_1()
+    public void testAll_1() throws Exception
     {
         bindHeaders.put("X-match", "all");
         bindHeaders.put("A", "Value of A");
@@ -257,7 +257,7 @@ public class HeadersBindingTest extends UnitTestBase
     }
 
     @Test
-    public void testAll_2()
+    public void testAll_2() throws Exception
     {
         bindHeaders.put("X-match", "all");
         bindHeaders.put("A", "Value of A");
@@ -271,7 +271,7 @@ public class HeadersBindingTest extends UnitTestBase
     }
 
     @Test
-    public void testAll_3()
+    public void testAll_3() throws Exception
     {
         bindHeaders.put("X-match", "all");
         bindHeaders.put("A", "Value of A");
@@ -286,7 +286,7 @@ public class HeadersBindingTest extends UnitTestBase
     }
 
     @Test
-    public void testAll_4()
+    public void testAll_4() throws Exception
     {
         bindHeaders.put("X-match", "all");
         bindHeaders.put("A", "Value of A");
@@ -302,7 +302,7 @@ public class HeadersBindingTest extends UnitTestBase
     }
 
     @Test
-    public void testAll_5()
+    public void testAll_5() throws Exception
     {
         bindHeaders.put("X-match", "all");
         bindHeaders.put("A", "Value of A");
@@ -318,7 +318,7 @@ public class HeadersBindingTest extends UnitTestBase
     }
 
     @Test
-    public void testAny_1()
+    public void testAny_1() throws Exception
     {
         bindHeaders.put("X-match", "any");
         bindHeaders.put("A", "Value of A");
@@ -331,7 +331,7 @@ public class HeadersBindingTest extends UnitTestBase
     }
 
     @Test
-    public void testAny_2()
+    public void testAny_2() throws Exception
     {
         bindHeaders.put("X-match", "any");
         bindHeaders.put("A", "Value of A");
@@ -345,7 +345,7 @@ public class HeadersBindingTest extends UnitTestBase
     }
 
     @Test
-    public void testAny_3()
+    public void testAny_3() throws Exception
     {
         bindHeaders.put("X-match", "any");
         bindHeaders.put("A", "Value of A");
@@ -360,7 +360,7 @@ public class HeadersBindingTest extends UnitTestBase
     }
 
     @Test
-    public void testAny_4()
+    public void testAny_4() throws Exception
     {
         bindHeaders.put("X-match", "any");
         bindHeaders.put("A", "Value of A");
@@ -376,7 +376,7 @@ public class HeadersBindingTest extends UnitTestBase
     }
 
     @Test
-    public void testAny_5()
+    public void testAny_5() throws Exception
     {
         bindHeaders.put("X-match", "any");
         bindHeaders.put("A", "Value of A");
@@ -392,7 +392,7 @@ public class HeadersBindingTest extends UnitTestBase
     }
 
     @Test
-    public void testAny_6()
+    public void testAny_6() throws Exception
     {
         bindHeaders.put("X-match", "any");
         bindHeaders.put("A", "Value of A");

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
index 18678da..5d86fe9 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.exchange;
 import static org.apache.qpid.server.filter.AMQPFilterTypes.JMS_SELECTOR;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -317,6 +318,69 @@ public class HeadersExchangeTest extends UnitTestBase
         assertFalse("Message unexpectedly routed to queue", result.hasRoutes());
     }
 
+    @Test
+    public void testBindWithInvalidSelector()
+    {
+        final String queueName = getTestName() + "_queue";
+        _virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, queueName));
+
+        final Map<String, Object> bindArguments = new HashMap<>();
+        bindArguments.put(JMS_SELECTOR.toString(), "foo in (");
+        bindArguments.put("X-match", "any");
+        bindArguments.put("foo", null);
+        bindArguments.put("bar", null);
+
+        try
+        {
+            _exchange.bind(queueName, queueName, bindArguments, false);
+            fail("Queue can be bound when invalid selector expression is supplied as part of bind arguments");
+        }
+        catch (IllegalArgumentException e)
+        {
+            // pass
+        }
+
+        final ServerMessage<?> testMessage = createTestMessage(Collections.singletonMap("foo", "bar"));
+        final RoutingResult<ServerMessage<?>> result = _exchange.route(testMessage, queueName, _instanceProperties);
+
+        assertFalse("Message is unexpectedly routed to queue", result.hasRoutes());
+    }
+
+    @Test
+    public void testBindWithInvalidSelectorWhenBindingExists()
+    {
+        final String queueName = getTestName() + "_queue";
+        _virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, queueName));
+
+        final Map<String, Object> bindArguments = new HashMap<>();
+        bindArguments.put(JMS_SELECTOR.toString(), "foo in ('bar')");
+        bindArguments.put("X-match", "any");
+        bindArguments.put("foo", null);
+        bindArguments.put("bar", null);
+
+        final boolean isBound = _exchange.bind(queueName, queueName, bindArguments, false);
+        assertTrue("Could not bind queue", isBound);
+
+        final ServerMessage<?> testMessage = createTestMessage(Collections.singletonMap("foo", "bar"));
+        final RoutingResult<ServerMessage<?>> result = _exchange.route(testMessage, queueName, _instanceProperties);
+        assertTrue("Message should be routed to queue", result.hasRoutes());
+
+        final Map<String, Object> bindArguments2 = new HashMap<>(bindArguments);
+        bindArguments2.put(JMS_SELECTOR.toString(), "foo in (");
+        try
+        {
+            _exchange.bind(queueName, queueName, bindArguments2, true);
+            fail("Queue can be bound when invalid selector expression is supplied as part of bind arguments");
+        }
+        catch (IllegalArgumentException e)
+        {
+            // pass
+        }
+
+        final RoutingResult<ServerMessage<?>> result2 = _exchange.route(testMessage, queueName, _instanceProperties);
+        assertTrue("Message should be be possible to route using old binding", result2.hasRoutes());
+    }
+
     private ServerMessage<?> createTestMessage(Map<String, Object> headerValues)
     {
         AMQMessageHeader header = mock(AMQMessageHeader.class);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index d2b7f48..9bbb4da 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -24,6 +24,7 @@ import static org.apache.qpid.server.filter.AMQPFilterTypes.JMS_SELECTOR;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -602,6 +603,60 @@ public class TopicExchangeTest extends UnitTestBase
         assertTrue("Message was not routed to queue2", result.getRoutes().contains(queue2));
     }
 
+
+    @Test
+    public void testBindWithInvalidSelector()
+    {
+        final String queueName = getTestName() + "_queue";
+        _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, queueName));
+
+        final Map<String, Object> bindArguments = Collections.singletonMap(JMS_SELECTOR.toString(), "foo in (");
+
+        try
+        {
+            _exchange.bind(queueName, "#", bindArguments, false);
+            fail("Queue can be bound when invalid selector expression is supplied as part of bind arguments");
+        }
+        catch (IllegalArgumentException e)
+        {
+            // pass
+        }
+
+        final ServerMessage<?> testMessage = createTestMessage(Collections.singletonMap("foo", "bar"));
+        final RoutingResult<ServerMessage<?>> result = _exchange.route(testMessage, queueName, _instanceProperties);
+
+        assertFalse("Message is unexpectedly routed to queue", result.hasRoutes());
+    }
+
+    @Test
+    public void testBindWithInvalidSelectorWhenBindingExists()
+    {
+        final String queueName = getTestName() + "_queue";
+        _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, queueName));
+
+        final Map<String, Object> bindArguments = Collections.singletonMap(JMS_SELECTOR.toString(), "foo in ('bar')");
+        final boolean isBound = _exchange.bind(queueName, "#", bindArguments, false);
+        assertTrue("Could not bind queue", isBound);
+
+        final ServerMessage<?> testMessage = createTestMessage(Collections.singletonMap("foo", "bar"));
+        final RoutingResult<ServerMessage<?>> result = _exchange.route(testMessage, queueName, _instanceProperties);
+        assertTrue("Message should be routed to queue", result.hasRoutes());
+
+        final Map<String, Object> bindArguments2 = Collections.singletonMap(JMS_SELECTOR.toString(), "foo in (");
+        try
+        {
+            _exchange.bind(queueName, "#", bindArguments2, true);
+            fail("Queue can be bound when invalid selector expression is supplied as part of bind arguments");
+        }
+        catch (IllegalArgumentException e)
+        {
+            // pass
+        }
+
+        final RoutingResult<ServerMessage<?>> result2 = _exchange.route(testMessage, queueName, _instanceProperties);
+        assertTrue("Message should be be possible to route using old binding", result2.hasRoutes());
+    }
+
     private ServerMessage<?> createTestMessage(Map<String, Object> headerValues)
     {
         AMQMessageHeader header = mock(AMQMessageHeader.class);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index ac6e984..f811e90 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -161,7 +161,7 @@ abstract class AbstractQueueTestBase extends UnitTestBase
     }
 
     @Test
-    public void testBinding()
+    public void testBinding() throws Exception
     {
         _exchange.addBinding(_routingKey, _queue, Collections.EMPTY_MAP);
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 4e6e97f..505ea89 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -1237,6 +1237,17 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
                         {
                             exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
                         }
+                        catch (AMQInvalidArgumentException e)
+                        {
+                            exception(session,
+                                      method,
+                                      ExecutionErrorCode.INVALID_ARGUMENT,
+                                      String.format(
+                                              "Cannot bind queue '%s' to exchange '%s' due to invalid argument : %s",
+                                              queue.getName(),
+                                              exchangeName,
+                                              e.getMessage()));
+                        }
                     }
                     else
                     {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 41b10ca..4b4f9c7 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -2846,11 +2846,23 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                     if (!exch.isBound(bindingKeyStr, arguments, queue))
                     {
 
-                        if (!exch.addBinding(bindingKeyStr, queue, arguments)
-                            && ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(
-                                exch.getType()))
+                        try
                         {
-                            exch.replaceBinding(bindingKeyStr, queue, arguments);
+                            if (!exch.addBinding(bindingKeyStr, queue, arguments)
+                                && ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(exch.getType()))
+                            {
+                                exch.replaceBinding(bindingKeyStr, queue, arguments);
+                            }
+                        }
+                        catch (AMQInvalidArgumentException e)
+                        {
+                            _connection.sendConnectionClose(ErrorCodes.ARGUMENT_INVALID,
+                                                            String.format(
+                                                                    "Cannot bind queue '%s' to exchange '%s' due to invalid argument : %s",
+                                                                    queueName,
+                                                                    exch.getName(),
+                                                                    e.getMessage()),
+                                                            getChannelId());
                         }
                     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org