You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/24 02:04:26 UTC
svn commit: r1571124 - in /qpid/trunk/qpid/java:
broker-core/src/main/java/org/apache/qpid/server/binding/
broker-core/src/main/java/org/apache/qpid/server/exchange/
broker-core/src/main/java/org/apache/qpid/server/model/adapter/
broker-core/src/main/j...
Author: rgodfrey
Date: Mon Feb 24 01:04:25 2014
New Revision: 1571124
URL: http://svn.apache.org/r1571124
Log:
QPID-5582 : [Java Broker] only allow one binding per binding-key and queue at an exchange
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java?rev=1571124&r1=1571123&r2=1571124&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java Mon Feb 24 01:04:25 2014
@@ -21,11 +21,18 @@
package org.apache.qpid.server.binding;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.BindingMessages;
+import org.apache.qpid.server.logging.subjects.BindingLogSubject;
+import org.apache.qpid.server.model.State;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.util.StateChangeListener;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
public class Binding
@@ -36,6 +43,13 @@ public class Binding
private final Map<String, Object> _arguments;
private final UUID _id;
private final AtomicLong _matches = new AtomicLong();
+ private final BindingLogSubject _logSubject;
+ //TODO : persist creation time
+ private long _createTime = System.currentTimeMillis();
+ final AtomicBoolean _deleted = new AtomicBoolean();
+ final CopyOnWriteArrayList<StateChangeListener<Binding,State>> _stateChangeListeners =
+ new CopyOnWriteArrayList<StateChangeListener<Binding, State>>();
+
public Binding(UUID id,
final String bindingKey,
@@ -51,6 +65,11 @@ public class Binding
//Perform ACLs
queue.getVirtualHost().getSecurityManager().authoriseCreateBinding(this);
+ _logSubject = new BindingLogSubject(bindingKey,exchange,queue);
+ CurrentActor.get().message(_logSubject, BindingMessages.CREATED(String.valueOf(getArguments()),
+ getArguments() != null
+ && !getArguments().isEmpty()));
+
}
@@ -89,11 +108,16 @@ public class Binding
return _matches.get();
}
- boolean isDurable()
+ public boolean isDurable()
{
return _queue.isDurable() && _exchange.isDurable();
}
+ public long getCreateTime()
+ {
+ return _createTime;
+ }
+
@Override
public boolean equals(final Object o)
{
@@ -128,4 +152,30 @@ public class Binding
return "Binding{bindingKey="+_bindingKey+", exchange="+_exchange+", queue="+_queue+", id= " + _id + " }";
}
+ public void delete()
+ {
+ if(_deleted.compareAndSet(false,true))
+ {
+ for(StateChangeListener<Binding,State> listener : _stateChangeListeners)
+ {
+ listener.stateChanged(this, State.ACTIVE, State.DELETED);
+ }
+ CurrentActor.get().message(_logSubject, BindingMessages.DELETED());
+ }
+ }
+
+ public State getState()
+ {
+ return _deleted.get() ? State.DELETED : State.ACTIVE;
+ }
+
+ public void addStateChangeListener(StateChangeListener<Binding,State> listener)
+ {
+ _stateChangeListeners.add(listener);
+ }
+
+ public void removeStateChangeListener(StateChangeListener<Binding,State> listener)
+ {
+ _stateChangeListeners.remove(listener);
+ }
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1571124&r1=1571123&r2=1571124&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Mon Feb 24 01:04:25 2014
@@ -26,15 +26,15 @@ import org.apache.qpid.server.binding.Bi
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.BindingMessages;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
-import org.apache.qpid.server.logging.subjects.BindingLogSubject;
import org.apache.qpid.server.logging.subjects.ExchangeLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
@@ -44,6 +44,7 @@ import org.apache.qpid.server.store.Stor
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.MapValueConverter;
+import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.UnknownExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -82,7 +83,7 @@ public abstract class AbstractExchange<T
private Map<ExchangeReferrer,Object> _referrers = new ConcurrentHashMap<ExchangeReferrer,Object>();
private final CopyOnWriteArrayList<Binding> _bindings = new CopyOnWriteArrayList<Binding>();
- private UUID _id;
+ private final UUID _id;
private final AtomicInteger _bindingCountHigh = new AtomicInteger();
private final AtomicLong _receivedMessageCount = new AtomicLong();
private final AtomicLong _receivedMessageSize = new AtomicLong();
@@ -93,8 +94,12 @@ public abstract class AbstractExchange<T
private final CopyOnWriteArrayList<Exchange.BindingListener> _listeners = new CopyOnWriteArrayList<Exchange.BindingListener>();
+ private final ConcurrentHashMap<BindingIdentifier, Binding> _bindingsMap = new ConcurrentHashMap<BindingIdentifier, Binding>();
+
+
//TODO : persist creation time
private long _createTime = System.currentTimeMillis();
+ private StateChangeListener<Binding, State> _bindingListener;
public AbstractExchange(VirtualHost vhost, Map<String, Object> attributes) throws UnknownExchangeException
{
@@ -142,7 +147,17 @@ public abstract class AbstractExchange<T
}
}
-
+ _bindingListener = new StateChangeListener<Binding, State>()
+ {
+ @Override
+ public void stateChanged(final Binding binding, final State oldState, final State newState)
+ {
+ if(newState == State.DELETED)
+ {
+ removeBinding(binding);
+ }
+ }
+ };
// Log Exchange creation
CurrentActor.get().message(ExchangeMessages.CREATED(getExchangeType().getType(), _name, _durable));
}
@@ -173,7 +188,8 @@ public abstract class AbstractExchange<T
List<Binding> bindings = new ArrayList<Binding>(_bindings);
for(Binding binding : bindings)
{
- removeBinding(binding);
+ binding.removeStateChangeListener(_bindingListener);
+ binding.delete();
}
if(_alternateExchange != null)
@@ -571,30 +587,22 @@ public abstract class AbstractExchange<T
makeBinding(id, bindingKey,queue, argumentMap,true, false);
}
- @Override
- public void removeBinding(final Binding b)
+ private void removeBinding(final Binding binding)
{
- removeBinding(b.getBindingKey(), b.getQueue(), b.getArguments());
- }
+ String bindingKey = binding.getBindingKey();
+ AMQQueue queue = binding.getQueue();
- @Override
- public Binding removeBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments)
- {
assert queue != null;
if (bindingKey == null)
{
bindingKey = "";
}
- if (arguments == null)
- {
- arguments = Collections.emptyMap();
- }
// Check access
_virtualHost.getSecurityManager().authoriseUnbind(this, bindingKey, queue);
- BindingImpl b = _bindingsMap.remove(new BindingImpl(null, bindingKey,queue,arguments));
+ Binding b = _bindingsMap.remove(new BindingIdentifier(bindingKey,queue));
if (b != null)
{
@@ -605,15 +613,14 @@ public abstract class AbstractExchange<T
{
DurableConfigurationStoreHelper.removeBinding(_virtualHost.getDurableConfigurationStore(), b);
}
- b.logDestruction();
+ b.delete();
}
- return b;
}
@Override
- public Binding getBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments)
+ public Binding getBinding(String bindingKey, AMQQueue queue)
{
assert queue != null;
@@ -622,17 +629,9 @@ public abstract class AbstractExchange<T
bindingKey = "";
}
- if(arguments == null)
- {
- arguments = Collections.emptyMap();
- }
-
- BindingImpl b = new BindingImpl(null, bindingKey,queue,arguments);
- return _bindingsMap.get(b);
+ return _bindingsMap.get(new BindingIdentifier(bindingKey,queue));
}
- private final ConcurrentHashMap<BindingImpl, BindingImpl> _bindingsMap = new ConcurrentHashMap<BindingImpl, BindingImpl>();
-
private boolean makeBinding(UUID id,
String bindingKey,
AMQQueue queue,
@@ -658,13 +657,14 @@ public abstract class AbstractExchange<T
bindingKey,
_virtualHost.getName());
}
- BindingImpl b = new BindingImpl(id, bindingKey, queue, arguments);
- BindingImpl existingMapping = _bindingsMap.putIfAbsent(b, b);
+ Binding b = new Binding(id, bindingKey, queue, this, arguments);
+ Binding existingMapping = _bindingsMap.putIfAbsent(new BindingIdentifier(bindingKey,queue), b);
if (existingMapping == null || force)
{
+ b.addStateChangeListener(_bindingListener);
if (existingMapping != null)
{
- removeBinding(existingMapping);
+ existingMapping.delete();
}
if (b.isDurable() && !restore)
@@ -674,7 +674,6 @@ public abstract class AbstractExchange<T
queue.addBinding(b);
doAddBinding(b);
- b.logCreation();
return true;
}
@@ -684,56 +683,61 @@ public abstract class AbstractExchange<T
}
}
- private final class BindingImpl extends Binding
+
+ private static final class BindingIdentifier
{
- private final BindingLogSubject _logSubject;
- //TODO : persist creation time
- private long _createTime = System.currentTimeMillis();
+ private final String _bindingKey;
+ private final AMQQueue _destination;
- private BindingImpl(UUID id,
- String bindingKey,
- final AMQQueue queue,
- final Map<String, Object> arguments)
+ private BindingIdentifier(final String bindingKey, final AMQQueue destination)
{
- super(id, bindingKey, queue, AbstractExchange.this, arguments);
- _logSubject = new BindingLogSubject(bindingKey,AbstractExchange.this,queue);
-
+ _bindingKey = bindingKey;
+ _destination = destination;
}
- public void onClose(final Exchange exchange)
+ public String getBindingKey()
{
- removeBinding(this);
+ return _bindingKey;
}
- void logCreation()
+ public AMQQueue getDestination()
{
- CurrentActor.get().message(_logSubject, BindingMessages.CREATED(String.valueOf(getArguments()),
- getArguments() != null
- && !getArguments().isEmpty()));
+ return _destination;
}
- void logDestruction()
+ @Override
+ public boolean equals(final Object o)
{
- CurrentActor.get().message(_logSubject, BindingMessages.DELETED());
- }
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
- public String getOrigin()
- {
- return (String) getArguments().get("qpid.fed.origin");
- }
+ final BindingIdentifier that = (BindingIdentifier) o;
- public long getCreateTime()
- {
- return _createTime;
+ if (!_bindingKey.equals(that._bindingKey))
+ {
+ return false;
+ }
+ if (!_destination.equals(that._destination))
+ {
+ return false;
+ }
+
+ return true;
}
- public boolean isDurable()
+ @Override
+ public int hashCode()
{
- return getQueue().isDurable() && getExchange().isDurable();
+ int result = _bindingKey.hashCode();
+ result = 31 * result + _destination.hashCode();
+ return result;
}
-
}
-
-
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java?rev=1571124&r1=1571123&r2=1571124&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java Mon Feb 24 01:04:25 2014
@@ -35,6 +35,7 @@ import org.apache.qpid.server.message.In
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
@@ -42,6 +43,7 @@ import org.apache.qpid.server.queue.Queu
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class DefaultExchange implements Exchange<DirectExchange>
@@ -123,21 +125,9 @@ public class DefaultExchange implements
}
@Override
- public void removeBinding(Binding b)
+ public Binding getBinding(String bindingKey, AMQQueue queue)
{
- throw new AccessControlException("Cannot remove bindings to the default exchange");
- }
-
- @Override
- public Binding removeBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments)
- {
- throw new AccessControlException("Cannot remove bindings to the default exchange");
- }
-
- @Override
- public Binding getBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments)
- {
- if(_virtualHost.getQueue(bindingKey) == queue && (arguments == null || arguments.isEmpty()))
+ if(_virtualHost.getQueue(bindingKey) == queue)
{
return convertToBinding(queue);
}
@@ -157,7 +147,9 @@ public class DefaultExchange implements
queueName,
_virtualHost.getName());
- return new Binding(exchangeId, queueName, queue, this, Collections.EMPTY_MAP);
+ final Binding binding = new Binding(exchangeId, queueName, queue, this, Collections.EMPTY_MAP);
+ binding.addStateChangeListener(STATE_CHANGE_LISTENER);
+ return binding;
}
@Override
@@ -346,4 +338,16 @@ public class DefaultExchange implements
}
}
+ private static final StateChangeListener<Binding, State> STATE_CHANGE_LISTENER =
+ new StateChangeListener<Binding, State>()
+ {
+ @Override
+ public void stateChanged(final Binding object, final State oldState, final State newState)
+ {
+ if(newState == State.DELETED)
+ {
+ throw new AccessControlException("Cannot remove bindings to the default exchange");
+ }
+ }
+ };
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=1571124&r1=1571123&r2=1571124&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java Mon Feb 24 01:04:25 2014
@@ -72,11 +72,7 @@ public interface Exchange<T extends Exch
void restoreBinding(UUID id, String bindingKey, AMQQueue queue,
Map<String, Object> argumentMap);
- void removeBinding(Binding b);
-
- Binding removeBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments);
-
- Binding getBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments);
+ Binding getBinding(String bindingKey, AMQQueue queue);
void close();
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java?rev=1571124&r1=1571123&r2=1571124&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java Mon Feb 24 01:04:25 2014
@@ -71,12 +71,12 @@ final class BindingAdapter extends Abstr
public State getState()
{
- return null; //TODO
+ return _binding.getState();
}
public boolean isDurable()
{
- return _binding.getQueue().isDurable() && _binding.getExchange().isDurable();
+ return _binding.isDurable();
}
public void setDurable(final boolean durable)
@@ -131,7 +131,7 @@ final class BindingAdapter extends Abstr
public void delete()
{
- _exchange.getExchange().removeBinding(_binding);
+ _binding.delete();
}
@Override
@@ -147,11 +147,11 @@ final class BindingAdapter extends Abstr
}
else if(STATE.equals(name))
{
-
+ return getState();
}
else if(DURABLE.equals(name))
{
- return _queue.isDurable() && _exchange.isDurable();
+ return isDurable();
}
else if(LIFETIME_POLICY.equals(name))
{
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java?rev=1571124&r1=1571123&r2=1571124&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java Mon Feb 24 01:04:25 2014
@@ -122,7 +122,7 @@ final class ExchangeAdapter extends Abst
if(!_exchange.addBinding(bindingKey, amqQueue, bindingArguments))
{
- Binding oldBinding = _exchange.getBinding(bindingKey, amqQueue, bindingArguments);
+ Binding oldBinding = _exchange.getBinding(bindingKey, amqQueue);
Map<String, Object> oldArgs = oldBinding.getArguments();
if((oldArgs == null && !bindingArguments.isEmpty()) || (oldArgs != null && !oldArgs.equals(bindingArguments)))
@@ -130,7 +130,7 @@ final class ExchangeAdapter extends Abst
_exchange.replaceBinding(oldBinding.getId(), bindingKey, amqQueue, bindingArguments);
}
}
- Binding binding = _exchange.getBinding(bindingKey, amqQueue, bindingArguments);
+ Binding binding = _exchange.getBinding(bindingKey, amqQueue);
synchronized (_bindingAdapters)
{
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1571124&r1=1571123&r2=1571124&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Mon Feb 24 01:04:25 2014
@@ -1535,7 +1535,7 @@ abstract class AbstractQueue<E extends Q
for (Binding b : bindingCopy)
{
- b.getExchange().removeBinding(b);
+ b.delete();
}
QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java?rev=1571124&r1=1571123&r2=1571124&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java Mon Feb 24 01:04:25 2014
@@ -106,14 +106,14 @@ public class BindingRecoverer extends Ab
@Override
public Binding resolve()
{
- if(_exchange.getBinding(_bindingName, _queue, _bindingArgumentsMap) == null)
+ if(_exchange.getBinding(_bindingName, _queue) == null)
{
_logger.info("Restoring binding: (Exchange: " + _exchange.getName() + ", Queue: " + _queue.getName()
+ ", Routing Key: " + _bindingName + ", Arguments: " + _bindingArgumentsMap + ")");
_exchange.restoreBinding(_bindingId, _bindingName, _queue, _bindingArgumentsMap);
}
- return _exchange.getBinding(_bindingName, _queue, _bindingArgumentsMap);
+ return _exchange.getBinding(_bindingName, _queue);
}
private class QueueDependency implements UnresolvedDependency<AMQQueue>
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java?rev=1571124&r1=1571123&r2=1571124&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java Mon Feb 24 01:04:25 2014
@@ -38,7 +38,6 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.*;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
@@ -143,7 +142,7 @@ public class FanoutExchangeTest extends
assertTrue("Expected queue1 to be routed to", result.contains(queue1));
assertTrue("Expected queue2 to be routed to", result.contains(queue2));
- _exchange.removeBinding("key",queue2,null);
+ _exchange.getBinding("key",queue2).delete();
result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY);
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java?rev=1571124&r1=1571123&r2=1571124&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java Mon Feb 24 01:04:25 2014
@@ -24,7 +24,10 @@ import java.util.Collection;
import junit.framework.TestCase;
import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.*;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -137,6 +140,7 @@ public class HeadersBindingTest extends
private MockHeader matchHeaders = new MockHeader();
private int _count = 0;
private AMQQueue _queue;
+ private Exchange _exchange;
protected void setUp()
{
@@ -145,6 +149,9 @@ public class HeadersBindingTest extends
VirtualHost vhost = mock(VirtualHost.class);
when(_queue.getVirtualHost()).thenReturn(vhost);
when(vhost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class));
+ CurrentActor.set(mock(LogActor.class));
+ _exchange = mock(Exchange.class);
+ when(_exchange.getExchangeType()).thenReturn(mock(ExchangeType.class));
}
protected String getQueueName()
@@ -158,7 +165,7 @@ public class HeadersBindingTest extends
matchHeaders.setString("A", "Value of A");
- Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders);
assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
@@ -169,7 +176,7 @@ public class HeadersBindingTest extends
matchHeaders.setString("A", "Value of A");
matchHeaders.setString("B", "Value of B");
- Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders);
assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
@@ -179,7 +186,7 @@ public class HeadersBindingTest extends
matchHeaders.setString("A", "Altered value of A");
- Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders);
assertFalse(new HeadersBinding(b).matches(matchHeaders));
}
@@ -190,7 +197,7 @@ public class HeadersBindingTest extends
matchHeaders.setString("A", "Value of A");
- Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders);
assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
@@ -202,7 +209,7 @@ public class HeadersBindingTest extends
matchHeaders.setString("A", "Value of A");
- Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders);
assertFalse(new HeadersBinding(b).matches(matchHeaders));
}
@@ -215,7 +222,7 @@ public class HeadersBindingTest extends
matchHeaders.setString("A", "Value of A");
matchHeaders.setString("B", "Value of B");
- Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders);
assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
@@ -229,7 +236,7 @@ public class HeadersBindingTest extends
matchHeaders.setString("B", "Value of B");
matchHeaders.setString("C", "Value of C");
- Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders);
assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
@@ -243,7 +250,7 @@ public class HeadersBindingTest extends
matchHeaders.setString("B", "Altered value of B");
matchHeaders.setString("C", "Value of C");
- Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders);
assertFalse(new HeadersBinding(b).matches(matchHeaders));
}
@@ -254,7 +261,7 @@ public class HeadersBindingTest extends
matchHeaders.setString("A", "Value of A");
- Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders);
assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
@@ -266,7 +273,7 @@ public class HeadersBindingTest extends
matchHeaders.setString("A", "Value of A");
- Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders);
assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
@@ -279,7 +286,7 @@ public class HeadersBindingTest extends
matchHeaders.setString("A", "Value of A");
matchHeaders.setString("B", "Value of B");
- Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders);
assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
@@ -293,7 +300,7 @@ public class HeadersBindingTest extends
matchHeaders.setString("B", "Value of B");
matchHeaders.setString("C", "Value of C");
- Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders);
assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
@@ -307,7 +314,7 @@ public class HeadersBindingTest extends
matchHeaders.setString("B", "Altered value of B");
matchHeaders.setString("C", "Value of C");
- Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders);
assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
@@ -321,7 +328,7 @@ public class HeadersBindingTest extends
matchHeaders.setString("B", "Altered value of B");
matchHeaders.setString("C", "Value of C");
- Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders);
+ Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders);
assertFalse(new HeadersBinding(b).matches(matchHeaders));
}
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java?rev=1571124&r1=1571123&r2=1571124&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java Mon Feb 24 01:04:25 2014
@@ -35,7 +35,6 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.*;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
@@ -179,7 +178,7 @@ public class HeadersExchangeTest extends
routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2);
routeAndTest(mockMessage(getArgsMapFromStrings("F0001")), q3);
- _exchange.removeBinding("Q1",q1,getArgsMapFromStrings("F0000"));
+ _exchange.getBinding("Q1",q1).delete();
routeAndTest(mockMessage(getArgsMapFromStrings("F0000")));
routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q2);
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1571124&r1=1571123&r2=1571124&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Mon Feb 24 01:04:25 2014
@@ -143,7 +143,7 @@ abstract class AbstractQueueTestBase<E e
assertEquals("Wrong exchange bound", _exchange,
_queue.getBindings().get(0).getExchange());
- _exchange.removeBinding(_routingKey, _queue, Collections.EMPTY_MAP);
+ _exchange.getBinding(_routingKey, _queue).delete();
assertFalse("Routing key was still bound",
_exchange.isBound(_routingKey));
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java?rev=1571124&r1=1571123&r2=1571124&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java Mon Feb 24 01:04:25 2014
@@ -40,12 +40,15 @@ import org.apache.commons.configuration.
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
@@ -92,6 +95,7 @@ public abstract class AbstractDurableCon
_queueId = UUIDGenerator.generateRandomUUID();
_exchangeId = UUIDGenerator.generateRandomUUID();
+ CurrentActor.set(mock(LogActor.class));
_storeName = getName();
_storePath = TMP_FOLDER + File.separator + _storeName;
FileUtils.delete(new File(_storePath), true);
@@ -112,6 +116,7 @@ public abstract class AbstractDurableCon
when(_exchange.getName()).thenReturn(EXCHANGE_NAME);
when(_exchange.getId()).thenReturn(_exchangeId);
+ when(_exchange.getExchangeType()).thenReturn(mock(ExchangeType.class));
when(_configuration.getString(eq(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY), anyString())).thenReturn(
_storePath);
when(_virtualHost.getAttribute(eq(VirtualHost.STORE_PATH))).thenReturn(_storePath);
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1571124&r1=1571123&r2=1571124&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Mon Feb 24 01:04:25 2014
@@ -27,6 +27,8 @@ import java.util.LinkedHashMap;
import java.util.UUID;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.binding.*;
+import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.store.StoreException;
@@ -989,7 +991,11 @@ public class ServerSessionDelegate exten
{
try
{
- exchange.removeBinding(method.getBindingKey(), queue, null);
+ Binding binding = exchange.getBinding(method.getBindingKey(), queue);
+ if(binding != null)
+ {
+ binding.delete();
+ }
}
catch (AccessControlException e)
{
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java?rev=1571124&r1=1571123&r2=1571124&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java Mon Feb 24 01:04:25 2014
@@ -121,7 +121,7 @@ public class QueueBindHandler implements
if(!exch.addBinding(bindingKey, queue, arguments) && TopicExchange.TYPE.equals(exch.getExchangeType()))
{
- Binding oldBinding = exch.getBinding(bindingKey, queue, arguments);
+ Binding oldBinding = exch.getBinding(bindingKey, queue);
Map<String, Object> oldArgs = oldBinding.getArguments();
if((oldArgs == null && !arguments.isEmpty()) || (oldArgs != null && !oldArgs.equals(arguments)))
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java?rev=1571124&r1=1571123&r2=1571124&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java Mon Feb 24 01:04:25 2014
@@ -25,12 +25,12 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.QueueUnbindBody;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
@@ -100,7 +100,7 @@ public class QueueUnbindHandler implemen
throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
}
- if(exch.getBinding(String.valueOf(routingKey), queue, FieldTable.convertToMap(body.getArguments())) == null)
+ if(exch.getBinding(String.valueOf(routingKey), queue) == null)
{
throw body.getChannelException(AMQConstant.NOT_FOUND,"No such binding");
}
@@ -108,7 +108,11 @@ public class QueueUnbindHandler implemen
{
try
{
- exch.removeBinding(String.valueOf(routingKey), queue, FieldTable.convertToMap(body.getArguments()));
+ Binding binding = exch.getBinding(String.valueOf(routingKey), queue);
+ if(binding != null)
+ {
+ binding.delete();
+ }
}
catch (AccessControlException e)
{
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1571124&r1=1571123&r2=1571124&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Mon Feb 24 01:04:25 2014
@@ -232,7 +232,7 @@ public class SendingLink_1_0 implements
}
for(Binding existingBinding : bindingsToRemove)
{
- existingBinding.getExchange().removeBinding(existingBinding);
+ existingBinding.delete();
}
}
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=1571124&r1=1571123&r2=1571124&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java Mon Feb 24 01:04:25 2014
@@ -56,7 +56,6 @@ import org.apache.qpid.server.queue.Stan
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.BrokerTestHelper;
-import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
@@ -795,7 +794,8 @@ public class MessageStoreTest extends Qp
try
{
- exchange.removeBinding(routingKey, queue, bindArguments);
+ Binding b = exchange.getBinding(routingKey, queue);
+ b.delete();
}
catch (Exception e)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org