You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/11/28 12:42:16 UTC
qpid-broker-j git commit: QPID-7642: [Broker-J] Let caller to handle
bind operation exception for invalid selector
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 082f9bf0d -> e1d8e2fd8
QPID-7642: [Broker-J] Let caller to handle bind operation exception for invalid selector
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/e1d8e2fd
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/e1d8e2fd
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/e1d8e2fd
Branch: refs/heads/master
Commit: e1d8e2fd84123cc518b82a4f7c4cd5fd46244d96
Parents: 082f9bf
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Nov 28 11:40:10 2018 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Nov 28 12:42:06 2018 +0000
----------------------------------------------------------------------
.../model/ConfiguredObjectFactoryGenerator.java | 25 ++++++-
.../qpid/server/exchange/AbstractExchange.java | 74 ++++++++++++++------
.../server/exchange/DirectExchangeImpl.java | 55 ++++++---------
.../server/exchange/FanoutExchangeImpl.java | 46 +++++-------
.../qpid/server/exchange/HeadersBinding.java | 16 +----
.../server/exchange/HeadersExchangeImpl.java | 14 +++-
.../qpid/server/exchange/TopicExchangeImpl.java | 25 ++-----
.../org/apache/qpid/server/model/Exchange.java | 5 +-
.../server/virtualhost/AbstractVirtualHost.java | 14 +++-
.../server/exchange/DirectExchangeTest.java | 52 ++++++++++++++
.../server/exchange/FanoutExchangeTest.java | 54 ++++++++++++++
.../server/exchange/HeadersBindingTest.java | 28 ++++----
.../server/exchange/HeadersExchangeTest.java | 64 +++++++++++++++++
.../qpid/server/exchange/TopicExchangeTest.java | 55 +++++++++++++++
.../server/queue/AbstractQueueTestBase.java | 2 +-
.../protocol/v0_10/ServerSessionDelegate.java | 11 +++
.../qpid/server/protocol/v0_8/AMQChannel.java | 20 ++++--
17 files changed, 418 insertions(+), 142 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-codegen/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryGenerator.java
----------------------------------------------------------------------
diff --git a/broker-codegen/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryGenerator.java b/broker-codegen/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryGenerator.java
index a63f12f..23cb371 100644
--- a/broker-codegen/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryGenerator.java
+++ b/broker-codegen/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryGenerator.java
@@ -26,8 +26,10 @@ import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import javax.annotation.processing.AbstractProcessor;
import javax.annotation.processing.Filer;
@@ -242,7 +244,7 @@ public class ConfiguredObjectFactoryGenerator extends AbstractProcessor
}
private void processDoOnConfigMethod(final PrintWriter pw, final String className, final ExecutableElement methodElement, final AnnotationMirror annotationMirror)
{
-
+ pw.println(" @Override");
pw.print(" public " + methodElement.getReturnType() + " " + methodElement.getSimpleName().toString() + "(");
boolean first = true;
for(VariableElement param : methodElement.getParameters())
@@ -262,6 +264,11 @@ public class ConfiguredObjectFactoryGenerator extends AbstractProcessor
pw.print(getParamName(param));
}
pw.println(")");
+ final List<? extends TypeMirror> thrownTypes = methodElement.getThrownTypes();
+ if (!thrownTypes.isEmpty())
+ {
+ pw.println(thrownTypes.stream().map(TypeMirror::toString).collect(Collectors.joining(" , ", " throws ", "")));
+ }
pw.println(" {");
@@ -412,6 +419,12 @@ public class ConfiguredObjectFactoryGenerator extends AbstractProcessor
+ boxedReturnTypeName
+ "> execute()");
pw.println(" {");
+ final List<? extends TypeMirror> thrownTypes = methodElement.getThrownTypes();
+ if (!thrownTypes.isEmpty())
+ {
+ pw.println(" try");
+ pw.println(" {");
+ }
if (methodElement.getReturnType().getKind() != TypeKind.VOID)
{
pw.println(" return Futures.<"
@@ -429,6 +442,16 @@ public class ConfiguredObjectFactoryGenerator extends AbstractProcessor
+ boxedReturnTypeName
+ ">immediateFuture(null);");
}
+ if (!thrownTypes.isEmpty())
+ {
+ pw.println(" }");
+ pw.println(thrownTypes.stream()
+ .map(TypeMirror::toString)
+ .collect(Collectors.joining(" | ", " catch (", " e)")));
+ pw.println(" {");
+ pw.println(" return Futures.immediateFailedFuture(e);");
+ pw.println(" }");
+ }
pw.println(" }");
pw.println(" @Override");
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 0351512..c6a7a95 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.BindingMessages;
@@ -216,7 +217,14 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
if (messageDestination != null)
{
Map<String, Object> arguments = b.getArguments() == null ? Collections.emptyMap() : b.getArguments();
- onBind(new BindingIdentifier(b.getBindingKey(), messageDestination), arguments);
+ try
+ {
+ onBind(new BindingIdentifier(b.getBindingKey(), messageDestination), arguments);
+ }
+ catch (AMQInvalidArgumentException e)
+ {
+ throw new IllegalConfigurationException("Unexpected bind argument : " + e.getMessage(), e);
+ }
messageDestination.linkAdded(this, b);
}
}
@@ -567,9 +575,10 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
}
protected abstract void onBindingUpdated(final BindingIdentifier binding,
- final Map<String, Object> newArguments);
+ final Map<String, Object> newArguments) throws AMQInvalidArgumentException;
- protected abstract void onBind(final BindingIdentifier binding, final Map<String, Object> arguments);
+ protected abstract void onBind(final BindingIdentifier binding, final Map<String, Object> arguments)
+ throws AMQInvalidArgumentException;
protected abstract void onUnbind(final BindingIdentifier binding);
@@ -666,6 +675,21 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
Map<String, Object> arguments,
boolean replaceExistingArguments)
{
+ try
+ {
+ return bindInternal(destination, bindingKey, arguments, replaceExistingArguments);
+ }
+ catch (AMQInvalidArgumentException e)
+ {
+ throw new IllegalArgumentException("Unexpected bind argument : " + e.getMessage(), e);
+ }
+ }
+
+ private boolean bindInternal(final String destination,
+ final String bindingKey,
+ Map<String, Object> arguments,
+ final boolean replaceExistingArguments) throws AMQInvalidArgumentException
+ {
MessageDestination messageDestination = getAttainedMessageDestination(destination);
if (messageDestination == null)
{
@@ -679,32 +703,24 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
Binding newBinding = new BindingImpl(bindingKey, destination, arguments);
- boolean modified = false;
+ Binding previousBinding = null;
for(Binding b : _bindings)
{
-
if (b.getBindingKey().equals(bindingKey) && b.getDestination().equals(messageDestination.getName()))
{
- if (replaceExistingArguments)
- {
- _bindings.remove(b);
- modified = true;
- break;
- }
- else
- {
- return false;
- }
+ previousBinding = b;
+ break;
}
}
- _bindings.add(newBinding);
- if(isDurable() && messageDestination.isDurable())
+
+ if (previousBinding != null && !replaceExistingArguments)
{
- final Collection<Binding> durableBindings = getDurableBindings();
- attributeSet(DURABLE_BINDINGS, durableBindings, durableBindings);
+ return false;
}
+
+
final BindingIdentifier bindingIdentifier = new BindingIdentifier(bindingKey, messageDestination);
- if(modified)
+ if(previousBinding != null)
{
onBindingUpdated(bindingIdentifier, arguments);
}
@@ -717,6 +733,17 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
onBind(bindingIdentifier, arguments);
messageDestination.linkAdded(this, newBinding);
}
+
+ if (previousBinding != null)
+ {
+ _bindings.remove(previousBinding);
+ }
+ _bindings.add(newBinding);
+ if(isDurable() && messageDestination.isDurable())
+ {
+ final Collection<Binding> durableBindings = getDurableBindings();
+ attributeSet(DURABLE_BINDINGS, durableBindings, durableBindings);
+ }
return true;
}
@@ -857,16 +884,17 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
@Override
public boolean addBinding(String bindingKey, final Queue<?> queue, Map<String, Object> arguments)
+ throws AMQInvalidArgumentException
{
- return bind(queue.getName(), bindingKey, arguments, false);
+ return bindInternal(queue.getName(), bindingKey, arguments, false);
}
@Override
public void replaceBinding(String bindingKey,
final Queue<?> queue,
- Map<String, Object> arguments)
+ Map<String, Object> arguments) throws AMQInvalidArgumentException
{
- bind(queue.getName(), bindingKey, arguments, true);
+ bindInternal(queue.getName(), bindingKey, arguments, true);
}
private boolean autoDeleteIfNecessary()
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java
index 4683042..d0171b1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java
@@ -87,6 +87,7 @@ public class DirectExchangeImpl extends AbstractExchange<DirectExchangeImpl> imp
}
BindingSet putBinding(MessageDestination destination, Map<String, Object> arguments, boolean force)
+ throws AMQInvalidArgumentException
{
if (!force && (_unfilteredDestinations.containsKey(destination) || _filteredDestinations.containsKey(
destination)))
@@ -95,44 +96,30 @@ public class DirectExchangeImpl extends AbstractExchange<DirectExchangeImpl> imp
}
else if(FilterSupport.argumentsContainFilter(arguments))
{
- try
+ FilterManager messageFilter = FilterSupport.createMessageFilter(arguments, destination);
+ Map<MessageDestination, String> unfilteredDestinations;
+ Map<MessageDestination, FilterManagerReplacementRoutingKeyTuple> filteredDestinations;
+ if (_unfilteredDestinations.containsKey(destination))
{
- Map<MessageDestination, String> unfilteredDestinations;
- Map<MessageDestination, FilterManagerReplacementRoutingKeyTuple> filteredDestinations;
- if (_unfilteredDestinations.containsKey(destination))
- {
- unfilteredDestinations = new HashMap<>(_unfilteredDestinations);
- unfilteredDestinations.remove(destination);
- }
- else
- {
- unfilteredDestinations = _unfilteredDestinations;
- }
-
- filteredDestinations = new HashMap<>(_filteredDestinations);
- FilterManager messageFilter = FilterSupport.createMessageFilter(arguments, destination);
- String replacementRoutingKey = arguments.get(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY) != null
- ? String.valueOf(arguments.get(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY))
- : null;
- filteredDestinations.put(destination,
- new FilterManagerReplacementRoutingKeyTuple(messageFilter,
- replacementRoutingKey));
-
- return new BindingSet(Collections.unmodifiableMap(unfilteredDestinations),
- Collections.unmodifiableMap(filteredDestinations));
-
+ unfilteredDestinations = new HashMap<>(_unfilteredDestinations);
+ unfilteredDestinations.remove(destination);
}
- catch (AMQInvalidArgumentException e)
+ else
{
- LOGGER.warn(
- "Binding ignored: cannot parse filter on binding of destination '{}' to exchange '{}' with arguments: {}",
- destination.getName(),
- DirectExchangeImpl.this.getName(),
- arguments,
- e);
- return this;
+ unfilteredDestinations = _unfilteredDestinations;
}
+ filteredDestinations = new HashMap<>(_filteredDestinations);
+
+ String replacementRoutingKey = arguments.get(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY) != null
+ ? String.valueOf(arguments.get(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY))
+ : null;
+ filteredDestinations.put(destination,
+ new FilterManagerReplacementRoutingKeyTuple(messageFilter,
+ replacementRoutingKey));
+
+ return new BindingSet(Collections.unmodifiableMap(unfilteredDestinations),
+ Collections.unmodifiableMap(filteredDestinations));
}
else
{
@@ -232,6 +219,7 @@ public class DirectExchangeImpl extends AbstractExchange<DirectExchangeImpl> imp
@Override
protected void onBindingUpdated(final BindingIdentifier binding, final Map<String, Object> newArguments)
+ throws AMQInvalidArgumentException
{
String bindingKey = binding.getBindingKey();
@@ -241,6 +229,7 @@ public class DirectExchangeImpl extends AbstractExchange<DirectExchangeImpl> imp
@Override
protected void onBind(final BindingIdentifier binding, final Map<String, Object> arguments)
+ throws AMQInvalidArgumentException
{
String bindingKey = binding.getBindingKey();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java
index 4c64b53..afcbb8c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java
@@ -67,41 +67,29 @@ class FanoutExchangeImpl extends AbstractExchange<FanoutExchangeImpl> implements
}
BindingSet addBinding(final BindingIdentifier binding, final Map<String, Object> arguments)
+ throws AMQInvalidArgumentException
{
MessageDestination destination = binding.getDestination();
if (FilterSupport.argumentsContainFilter(arguments))
{
- try
- {
- Map<MessageDestination, Map<BindingIdentifier, FilterManagerReplacementRoutingKeyTuple>>
- filteredDestinations = new HashMap<>(_filteredDestinations);
+ Map<MessageDestination, Map<BindingIdentifier, FilterManagerReplacementRoutingKeyTuple>>
+ filteredDestinations = new HashMap<>(_filteredDestinations);
- filteredDestinations.computeIfAbsent(destination, messageDestination -> new HashMap<>());
+ filteredDestinations.computeIfAbsent(destination, messageDestination -> new HashMap<>());
- Map<BindingIdentifier, FilterManagerReplacementRoutingKeyTuple> bindingsForDestination =
- new HashMap<>(filteredDestinations.get(destination));
+ Map<BindingIdentifier, FilterManagerReplacementRoutingKeyTuple> bindingsForDestination =
+ new HashMap<>(filteredDestinations.get(destination));
- FilterManager filterManager = FilterSupport.createMessageFilter(arguments, destination);
- String replacementRoutingKey = arguments.get(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY) != null
- ? String.valueOf(arguments.get(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY))
- : null;
+ FilterManager filterManager = FilterSupport.createMessageFilter(arguments, destination);
+ String replacementRoutingKey = arguments.get(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY) != null
+ ? String.valueOf(arguments.get(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY))
+ : null;
- bindingsForDestination.put(binding,
- new FilterManagerReplacementRoutingKeyTuple(filterManager,
- replacementRoutingKey));
- filteredDestinations.put(destination, Collections.unmodifiableMap(bindingsForDestination));
- return new BindingSet(_unfilteredDestinations, Collections.unmodifiableMap(filteredDestinations));
- }
- catch (AMQInvalidArgumentException e)
- {
- LOGGER.warn(
- "Binding ignored: cannot parse filter on binding of destination '{}' to exchange '{}' with arguments: {}",
- destination.getName(),
- FanoutExchangeImpl.this.getName(),
- arguments,
- e);
- return this;
- }
+ bindingsForDestination.put(binding,
+ new FilterManagerReplacementRoutingKeyTuple(filterManager,
+ replacementRoutingKey));
+ filteredDestinations.put(destination, Collections.unmodifiableMap(bindingsForDestination));
+ return new BindingSet(_unfilteredDestinations, Collections.unmodifiableMap(filteredDestinations));
}
else
{
@@ -126,6 +114,7 @@ class FanoutExchangeImpl extends AbstractExchange<FanoutExchangeImpl> implements
}
BindingSet updateBinding(final BindingIdentifier binding, final Map<String, Object> newArguments)
+ throws AMQInvalidArgumentException
{
return removeBinding(binding).addBinding(binding, newArguments);
}
@@ -232,13 +221,14 @@ class FanoutExchangeImpl extends AbstractExchange<FanoutExchangeImpl> implements
@Override
protected void onBindingUpdated(final BindingIdentifier binding,
- final Map<String, Object> newArguments)
+ final Map<String, Object> newArguments) throws AMQInvalidArgumentException
{
_bindingSet = _bindingSet.updateBinding(binding, newArguments);
}
@Override
protected void onBind(final BindingIdentifier binding, final Map<String, Object> arguments)
+ throws AMQInvalidArgumentException
{
_bindingSet = _bindingSet.addBinding(binding, arguments);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
index 5fb5329..b18d91a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
@@ -61,6 +61,7 @@ class HeadersBinding
* @param binding the binding to create a header binding using
*/
public HeadersBinding(AbstractExchange.BindingIdentifier binding, Map<String,Object> arguments)
+ throws AMQInvalidArgumentException
{
_binding = binding;
arguments = arguments == null ? Collections.emptyMap() : arguments;
@@ -77,22 +78,11 @@ class HeadersBinding
_replacementRoutingKey = key == null ? null : String.valueOf(key);
}
- private void initMappings()
+ private void initMappings() throws AMQInvalidArgumentException
{
if(FilterSupport.argumentsContainFilter(_mappings))
{
- try
- {
- _filter = FilterSupport.createMessageFilter(_mappings, _binding.getDestination());
- }
- catch (AMQInvalidArgumentException e)
- {
- LOGGER.warn("Invalid filter in binding queue '"+_binding.getDestination().getName()
- +"' with arguments: " + _mappings);
- _filter = new FilterManager();
-
- _filter.add("x-exclude-all", new ExcludeAllFilter());
- }
+ _filter = FilterSupport.createMessageFilter(_mappings, _binding.getDestination());
}
for(Map.Entry<String, Object> entry : _mappings.entrySet())
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java
index 4bf7a75..a454cfb 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
@@ -106,13 +107,13 @@ public class HeadersExchangeImpl extends AbstractExchange<HeadersExchangeImpl> i
@Override
- protected void onBind(final BindingIdentifier binding, Map<String,Object> arguments)
+ protected void onBind(final BindingIdentifier binding, Map<String,Object> arguments) throws AMQInvalidArgumentException
{
_bindingHeaderMatchers.add(new HeadersBinding(binding, arguments));
}
@Override
- protected void onBindingUpdated(final BindingIdentifier binding, final Map<String, Object> arguments)
+ protected void onBindingUpdated(final BindingIdentifier binding, final Map<String, Object> arguments) throws AMQInvalidArgumentException
{
_bindingHeaderMatchers.add(new HeadersBinding(binding, arguments));
}
@@ -120,7 +121,14 @@ public class HeadersExchangeImpl extends AbstractExchange<HeadersExchangeImpl> i
@Override
protected void onUnbind(final BindingIdentifier binding)
{
- _bindingHeaderMatchers.remove(new HeadersBinding(binding, Collections.emptyMap()));
+ try
+ {
+ _bindingHeaderMatchers.remove(new HeadersBinding(binding, Collections.emptyMap()));
+ }
+ catch (AMQInvalidArgumentException e)
+ {
+ // ignore
+ }
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
index c7f6bc7..f0fc3a8 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
@@ -65,6 +65,7 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
@Override
protected synchronized void onBindingUpdated(final BindingIdentifier binding, final Map<String, Object> newArguments)
+ throws AMQInvalidArgumentException
{
final String bindingKey = binding.getBindingKey();
final MessageDestination destination = binding.getDestination();
@@ -73,17 +74,10 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
String routingKey = TopicNormalizer.normalize(bindingKey);
- try
- {
- if (_bindings.containsKey(binding))
- {
- TopicExchangeResult result = _topicExchangeResults.get(routingKey);
- updateTopicExchangeResult(result, binding, newArguments);
- }
- }
- catch (AMQInvalidArgumentException e)
+ if (_bindings.containsKey(binding))
{
- throw new ConnectionScopedRuntimeException(e);
+ TopicExchangeResult result = _topicExchangeResults.get(routingKey);
+ updateTopicExchangeResult(result, binding, newArguments);
}
}
@@ -217,16 +211,9 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
@Override
protected void onBind(final BindingIdentifier binding, Map<String, Object> arguments)
+ throws AMQInvalidArgumentException
{
- try
- {
- bind(binding, arguments);
- }
- catch (AMQInvalidArgumentException e)
- {
- // TODO - this seems incorrect, handling of invalid bindings should be propagated more cleanly
- throw new ConnectionScopedRuntimeException(e);
- }
+ bind(binding, arguments);
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java b/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
index c2c044e..eb54c75 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
@@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.Map;
import org.apache.qpid.server.exchange.DestinationReferrer;
+import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSender;
@@ -122,7 +123,7 @@ public interface Exchange<X extends Exchange<X>> extends ConfiguredObject<X>, Me
@DoOnConfigThread
boolean addBinding(@Param(name = "bindingKey") String bindingKey,
@Param(name = "queue") Queue<?> queue,
- @Param(name = "arguments") Map<String, Object> arguments);
+ @Param(name = "arguments") Map<String, Object> arguments) throws AMQInvalidArgumentException;
@DoOnConfigThread
boolean deleteBinding(@Param(name = "bindingKey") String bindingKey,
@@ -135,7 +136,7 @@ public interface Exchange<X extends Exchange<X>> extends ConfiguredObject<X>, Me
@DoOnConfigThread
void replaceBinding(@Param(name = "bindingKey") String bindingKey,
@Param(name = "queue") Queue<?> queue,
- @Param(name = "arguments") Map<String, Object> arguments);
+ @Param(name = "arguments") Map<String, Object> arguments) throws AMQInvalidArgumentException;
QueueManagingVirtualHost<?> getVirtualHost();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index f038ecf..e54ca5b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -83,6 +83,7 @@ import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
import org.apache.qpid.server.exchange.DefaultDestination;
import org.apache.qpid.server.exchange.ExchangeDefaults;
+import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
@@ -2876,7 +2877,14 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
for (String binding : bindings.keySet())
{
- exchange.addBinding(binding, queue, bindings.get(binding));
+ try
+ {
+ exchange.addBinding(binding, queue, bindings.get(binding));
+ }
+ catch (AMQInvalidArgumentException ia)
+ {
+ throw new IllegalArgumentException("Unexpected bind argument : " + ia.getMessage(), ia);
+ }
}
}
else
@@ -2894,6 +2902,10 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
throw new IllegalStateException("subscription already in use");
}
}
+ catch (AMQInvalidArgumentException e)
+ {
+ throw new IllegalArgumentException("Unexpected bind argument : " + e.getMessage(), e);
+ }
return queue;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java b/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java
index 153d85b..02cdd47 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java
@@ -466,4 +466,56 @@ public class DirectExchangeTest extends UnitTestBase
assertFalse("Message unexpectedly routed to queue", result.hasRoutes());
}
+ @Test
+ public void testBindWithInvalidSelector()
+ {
+ final String queueName = getTestName() + "_queue";
+ _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, queueName));
+
+ final Map<String, Object> bindArguments = Collections.singletonMap(JMS_SELECTOR.toString(), "foo in (");
+
+ try
+ {
+ _exchange.bind(queueName, queueName, bindArguments, false);
+ fail("Queue can be bound when invalid selector expression is supplied as part of bind arguments");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // pass
+ }
+
+ final ServerMessage<?> testMessage = createTestMessage(Collections.singletonMap("foo", "bar"));
+ final RoutingResult<ServerMessage<?>> result = _exchange.route(testMessage, queueName, _instanceProperties);
+
+ assertFalse("Message is unexpectedly routed to queue", result.hasRoutes());
+ }
+
+ @Test
+ public void testBindWithInvalidSelectorWhenBindingExists()
+ {
+ final String queueName = getTestName() + "_queue";
+ _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, queueName));
+
+ final Map<String, Object> bindArguments = Collections.singletonMap(JMS_SELECTOR.toString(), "foo in ('bar')");
+ final boolean isBound = _exchange.bind(queueName, queueName, bindArguments, false);
+ assertTrue("Could not bind queue", isBound);
+
+ final ServerMessage<?> testMessage = createTestMessage(Collections.singletonMap("foo", "bar"));
+ final RoutingResult<ServerMessage<?>> result = _exchange.route(testMessage, queueName, _instanceProperties);
+ assertTrue("Message should be routed to queue", result.hasRoutes());
+
+ final Map<String, Object> bindArguments2 = Collections.singletonMap(JMS_SELECTOR.toString(), "foo in (");
+ try
+ {
+ _exchange.bind(queueName, queueName, bindArguments2, true);
+ fail("Queue can be bound when invalid selector expression is supplied as part of bind arguments");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // pass
+ }
+
+ final RoutingResult<ServerMessage<?>> result2 = _exchange.route(testMessage, queueName, _instanceProperties);
+ assertTrue("Message should be be possible to route using old binding", result2.hasRoutes());
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
index 26d94f8..ae4067a 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
@@ -24,6 +24,7 @@ import static org.apache.qpid.server.filter.AMQPFilterTypes.JMS_SELECTOR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -398,6 +399,59 @@ public class FanoutExchangeTest extends UnitTestBase
assertFalse(_exchange.isBound(queue));
}
+ @Test
+ public void testBindWithInvalidSelector()
+ {
+ final String queueName = getTestName() + "_queue";
+ _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, queueName));
+
+ final Map<String, Object> bindArguments = Collections.singletonMap(JMS_SELECTOR.toString(), "foo in (");
+
+ try
+ {
+ _exchange.bind(queueName, "", bindArguments, false);
+ fail("Queue can be bound when invalid selector expression is supplied as part of bind arguments");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // pass
+ }
+
+ final ServerMessage<?> testMessage = createTestMessage(Collections.singletonMap("foo", "bar"));
+ final RoutingResult<ServerMessage<?>> result = _exchange.route(testMessage, queueName, _instanceProperties);
+
+ assertFalse("Message is unexpectedly routed to queue", result.hasRoutes());
+ }
+
+ @Test
+ public void testBindWithInvalidSelectorWhenBindingExists()
+ {
+ final String queueName = getTestName() + "_queue";
+ _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, queueName));
+
+ final Map<String, Object> bindArguments = Collections.singletonMap(JMS_SELECTOR.toString(), "foo in ('bar')");
+ final boolean isBound = _exchange.bind(queueName, "", bindArguments, false);
+ assertTrue("Could not bind queue", isBound);
+
+ final ServerMessage<?> testMessage = createTestMessage(Collections.singletonMap("foo", "bar"));
+ final RoutingResult<ServerMessage<?>> result = _exchange.route(testMessage, queueName, _instanceProperties);
+ assertTrue("Message should be routed to queue", result.hasRoutes());
+
+ final Map<String, Object> bindArguments2 = Collections.singletonMap(JMS_SELECTOR.toString(), "foo in (");
+ try
+ {
+ _exchange.bind(queueName, "", bindArguments2, true);
+ fail("Queue can be bound when invalid selector expression is supplied as part of bind arguments");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // pass
+ }
+
+ final RoutingResult<ServerMessage<?>> result2 = _exchange.route(testMessage, "", _instanceProperties);
+ assertTrue("Message should be be possible to route using old binding", result2.hasRoutes());
+ }
+
private ServerMessage<?> createTestMessage(Map<String, Object> headerValues)
{
AMQMessageHeader header = mock(AMQMessageHeader.class);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
index 126c07d..7947b09 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
@@ -207,7 +207,7 @@ public class HeadersBindingTest extends UnitTestBase
}
@Test
- public void testDefault_1()
+ public void testDefault_1() throws Exception
{
bindHeaders.put("A", "Value of A");
@@ -219,7 +219,7 @@ public class HeadersBindingTest extends UnitTestBase
}
@Test
- public void testDefault_2()
+ public void testDefault_2() throws Exception
{
bindHeaders.put("A", "Value of A");
@@ -232,7 +232,7 @@ public class HeadersBindingTest extends UnitTestBase
}
@Test
- public void testDefault_3()
+ public void testDefault_3() throws Exception
{
bindHeaders.put("A", "Value of A");
@@ -244,7 +244,7 @@ public class HeadersBindingTest extends UnitTestBase
}
@Test
- public void testAll_1()
+ public void testAll_1() throws Exception
{
bindHeaders.put("X-match", "all");
bindHeaders.put("A", "Value of A");
@@ -257,7 +257,7 @@ public class HeadersBindingTest extends UnitTestBase
}
@Test
- public void testAll_2()
+ public void testAll_2() throws Exception
{
bindHeaders.put("X-match", "all");
bindHeaders.put("A", "Value of A");
@@ -271,7 +271,7 @@ public class HeadersBindingTest extends UnitTestBase
}
@Test
- public void testAll_3()
+ public void testAll_3() throws Exception
{
bindHeaders.put("X-match", "all");
bindHeaders.put("A", "Value of A");
@@ -286,7 +286,7 @@ public class HeadersBindingTest extends UnitTestBase
}
@Test
- public void testAll_4()
+ public void testAll_4() throws Exception
{
bindHeaders.put("X-match", "all");
bindHeaders.put("A", "Value of A");
@@ -302,7 +302,7 @@ public class HeadersBindingTest extends UnitTestBase
}
@Test
- public void testAll_5()
+ public void testAll_5() throws Exception
{
bindHeaders.put("X-match", "all");
bindHeaders.put("A", "Value of A");
@@ -318,7 +318,7 @@ public class HeadersBindingTest extends UnitTestBase
}
@Test
- public void testAny_1()
+ public void testAny_1() throws Exception
{
bindHeaders.put("X-match", "any");
bindHeaders.put("A", "Value of A");
@@ -331,7 +331,7 @@ public class HeadersBindingTest extends UnitTestBase
}
@Test
- public void testAny_2()
+ public void testAny_2() throws Exception
{
bindHeaders.put("X-match", "any");
bindHeaders.put("A", "Value of A");
@@ -345,7 +345,7 @@ public class HeadersBindingTest extends UnitTestBase
}
@Test
- public void testAny_3()
+ public void testAny_3() throws Exception
{
bindHeaders.put("X-match", "any");
bindHeaders.put("A", "Value of A");
@@ -360,7 +360,7 @@ public class HeadersBindingTest extends UnitTestBase
}
@Test
- public void testAny_4()
+ public void testAny_4() throws Exception
{
bindHeaders.put("X-match", "any");
bindHeaders.put("A", "Value of A");
@@ -376,7 +376,7 @@ public class HeadersBindingTest extends UnitTestBase
}
@Test
- public void testAny_5()
+ public void testAny_5() throws Exception
{
bindHeaders.put("X-match", "any");
bindHeaders.put("A", "Value of A");
@@ -392,7 +392,7 @@ public class HeadersBindingTest extends UnitTestBase
}
@Test
- public void testAny_6()
+ public void testAny_6() throws Exception
{
bindHeaders.put("X-match", "any");
bindHeaders.put("A", "Value of A");
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
index 18678da..5d86fe9 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.exchange;
import static org.apache.qpid.server.filter.AMQPFilterTypes.JMS_SELECTOR;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -317,6 +318,69 @@ public class HeadersExchangeTest extends UnitTestBase
assertFalse("Message unexpectedly routed to queue", result.hasRoutes());
}
+ @Test
+ public void testBindWithInvalidSelector()
+ {
+ final String queueName = getTestName() + "_queue";
+ _virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, queueName));
+
+ final Map<String, Object> bindArguments = new HashMap<>();
+ bindArguments.put(JMS_SELECTOR.toString(), "foo in (");
+ bindArguments.put("X-match", "any");
+ bindArguments.put("foo", null);
+ bindArguments.put("bar", null);
+
+ try
+ {
+ _exchange.bind(queueName, queueName, bindArguments, false);
+ fail("Queue can be bound when invalid selector expression is supplied as part of bind arguments");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // pass
+ }
+
+ final ServerMessage<?> testMessage = createTestMessage(Collections.singletonMap("foo", "bar"));
+ final RoutingResult<ServerMessage<?>> result = _exchange.route(testMessage, queueName, _instanceProperties);
+
+ assertFalse("Message is unexpectedly routed to queue", result.hasRoutes());
+ }
+
+ @Test
+ public void testBindWithInvalidSelectorWhenBindingExists()
+ {
+ final String queueName = getTestName() + "_queue";
+ _virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, queueName));
+
+ final Map<String, Object> bindArguments = new HashMap<>();
+ bindArguments.put(JMS_SELECTOR.toString(), "foo in ('bar')");
+ bindArguments.put("X-match", "any");
+ bindArguments.put("foo", null);
+ bindArguments.put("bar", null);
+
+ final boolean isBound = _exchange.bind(queueName, queueName, bindArguments, false);
+ assertTrue("Could not bind queue", isBound);
+
+ final ServerMessage<?> testMessage = createTestMessage(Collections.singletonMap("foo", "bar"));
+ final RoutingResult<ServerMessage<?>> result = _exchange.route(testMessage, queueName, _instanceProperties);
+ assertTrue("Message should be routed to queue", result.hasRoutes());
+
+ final Map<String, Object> bindArguments2 = new HashMap<>(bindArguments);
+ bindArguments2.put(JMS_SELECTOR.toString(), "foo in (");
+ try
+ {
+ _exchange.bind(queueName, queueName, bindArguments2, true);
+ fail("Queue can be bound when invalid selector expression is supplied as part of bind arguments");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // pass
+ }
+
+ final RoutingResult<ServerMessage<?>> result2 = _exchange.route(testMessage, queueName, _instanceProperties);
+ assertTrue("Message should be be possible to route using old binding", result2.hasRoutes());
+ }
+
private ServerMessage<?> createTestMessage(Map<String, Object> headerValues)
{
AMQMessageHeader header = mock(AMQMessageHeader.class);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index d2b7f48..9bbb4da 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -24,6 +24,7 @@ import static org.apache.qpid.server.filter.AMQPFilterTypes.JMS_SELECTOR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -602,6 +603,60 @@ public class TopicExchangeTest extends UnitTestBase
assertTrue("Message was not routed to queue2", result.getRoutes().contains(queue2));
}
+
+ @Test
+ public void testBindWithInvalidSelector()
+ {
+ final String queueName = getTestName() + "_queue";
+ _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, queueName));
+
+ final Map<String, Object> bindArguments = Collections.singletonMap(JMS_SELECTOR.toString(), "foo in (");
+
+ try
+ {
+ _exchange.bind(queueName, "#", bindArguments, false);
+ fail("Queue can be bound when invalid selector expression is supplied as part of bind arguments");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // pass
+ }
+
+ final ServerMessage<?> testMessage = createTestMessage(Collections.singletonMap("foo", "bar"));
+ final RoutingResult<ServerMessage<?>> result = _exchange.route(testMessage, queueName, _instanceProperties);
+
+ assertFalse("Message is unexpectedly routed to queue", result.hasRoutes());
+ }
+
+ @Test
+ public void testBindWithInvalidSelectorWhenBindingExists()
+ {
+ final String queueName = getTestName() + "_queue";
+ _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, queueName));
+
+ final Map<String, Object> bindArguments = Collections.singletonMap(JMS_SELECTOR.toString(), "foo in ('bar')");
+ final boolean isBound = _exchange.bind(queueName, "#", bindArguments, false);
+ assertTrue("Could not bind queue", isBound);
+
+ final ServerMessage<?> testMessage = createTestMessage(Collections.singletonMap("foo", "bar"));
+ final RoutingResult<ServerMessage<?>> result = _exchange.route(testMessage, queueName, _instanceProperties);
+ assertTrue("Message should be routed to queue", result.hasRoutes());
+
+ final Map<String, Object> bindArguments2 = Collections.singletonMap(JMS_SELECTOR.toString(), "foo in (");
+ try
+ {
+ _exchange.bind(queueName, "#", bindArguments2, true);
+ fail("Queue can be bound when invalid selector expression is supplied as part of bind arguments");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // pass
+ }
+
+ final RoutingResult<ServerMessage<?>> result2 = _exchange.route(testMessage, queueName, _instanceProperties);
+ assertTrue("Message should be be possible to route using old binding", result2.hasRoutes());
+ }
+
private ServerMessage<?> createTestMessage(Map<String, Object> headerValues)
{
AMQMessageHeader header = mock(AMQMessageHeader.class);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index ac6e984..f811e90 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -161,7 +161,7 @@ abstract class AbstractQueueTestBase extends UnitTestBase
}
@Test
- public void testBinding()
+ public void testBinding() throws Exception
{
_exchange.addBinding(_routingKey, _queue, Collections.EMPTY_MAP);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 4e6e97f..505ea89 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -1237,6 +1237,17 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
{
exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
}
+ catch (AMQInvalidArgumentException e)
+ {
+ exception(session,
+ method,
+ ExecutionErrorCode.INVALID_ARGUMENT,
+ String.format(
+ "Cannot bind queue '%s' to exchange '%s' due to invalid argument : %s",
+ queue.getName(),
+ exchangeName,
+ e.getMessage()));
+ }
}
else
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1d8e2fd/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 41b10ca..4b4f9c7 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -2846,11 +2846,23 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
if (!exch.isBound(bindingKeyStr, arguments, queue))
{
- if (!exch.addBinding(bindingKeyStr, queue, arguments)
- && ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(
- exch.getType()))
+ try
{
- exch.replaceBinding(bindingKeyStr, queue, arguments);
+ if (!exch.addBinding(bindingKeyStr, queue, arguments)
+ && ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(exch.getType()))
+ {
+ exch.replaceBinding(bindingKeyStr, queue, arguments);
+ }
+ }
+ catch (AMQInvalidArgumentException e)
+ {
+ _connection.sendConnectionClose(ErrorCodes.ARGUMENT_INVALID,
+ String.format(
+ "Cannot bind queue '%s' to exchange '%s' due to invalid argument : %s",
+ queueName,
+ exch.getName(),
+ e.getMessage()),
+ getChannelId());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org