You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/12/22 17:59:47 UTC
svn commit: r1775689 [2/4] - in /qpid/java/trunk:
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/
bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/
bdbstore/systests/src/test/java/org/apache/qpid/server/sto...
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java Thu Dec 22 17:59:46 2016
@@ -23,9 +23,8 @@ package org.apache.qpid.server.exchange;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,8 +34,8 @@ import org.apache.qpid.server.filter.Fil
import org.apache.qpid.server.filter.FilterSupport;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.BaseQueue;
@@ -48,248 +47,242 @@ class FanoutExchangeImpl extends Abstrac
private static final Integer ONE = Integer.valueOf(1);
- /**
- * Maps from queue name to queue instances
- */
- private final Map<Queue<?>,Integer> _queues = new HashMap<>();
- private final CopyOnWriteArrayList<Queue<?>> _unfilteredQueues = new CopyOnWriteArrayList<>();
- private final CopyOnWriteArrayList<Queue<?>> _filteredQueues = new CopyOnWriteArrayList<>();
-
- private final AtomicReference<Map<Queue<?>,Map<Binding<?>, FilterManager>>> _filteredBindings =
- new AtomicReference<>();
+ private final class BindingSet
{
- Map<Queue<?>,Map<Binding<?>, FilterManager>> emptyMap = Collections.emptyMap();
- _filteredBindings.set(emptyMap);
- }
+ private final Map<MessageDestination,Integer> _queues;
- @ManagedObjectFactoryConstructor
- public FanoutExchangeImpl(final Map<String, Object> attributes, final QueueManagingVirtualHost<?> vhost)
- {
- super(attributes, vhost);
- }
+ private final List<Queue<?>> _unfilteredQueues;
+ private final List<Queue<?>> _filteredQueues;
- @Override
- public ArrayList<BaseQueue> doRoute(ServerMessage payload,
- final String routingKey,
- final InstanceProperties instanceProperties)
- {
+ private final Map<Queue<?>,Map<BindingIdentifier, FilterManager>> _filteredBindings;
- for(Binding<?> b : getBindings())
+ public BindingSet(final Map<MessageDestination, Integer> queues,
+ final List<Queue<?>> unfilteredQueues,
+ final List<Queue<?>> filteredQueues,
+ final Map<Queue<?>, Map<BindingIdentifier, FilterManager>> filteredBindings)
{
- b.incrementMatches();
+ _queues = queues;
+ _unfilteredQueues = unfilteredQueues;
+ _filteredQueues = filteredQueues;
+ _filteredBindings = filteredBindings;
}
- final ArrayList<BaseQueue> result = new ArrayList<BaseQueue>(_unfilteredQueues);
-
+ public BindingSet()
+ {
+ _queues = Collections.emptyMap();
+ _unfilteredQueues = Collections.emptyList();
+ _filteredQueues = Collections.emptyList();
+ _filteredBindings = Collections.emptyMap();
+ }
- final Map<Queue<?>, Map<Binding<?>, FilterManager>> filteredBindings = _filteredBindings.get();
- if(!_filteredQueues.isEmpty())
+ public BindingSet addBinding(final BindingIdentifier binding, final Map<String, Object> arguments)
{
- for(Queue<?> q : _filteredQueues)
- {
- final Map<Binding<?>, FilterManager> bindingMessageFilterMap = filteredBindings.get(q);
- if(!(bindingMessageFilterMap == null || result.contains(q)))
+ if(FilterSupport.argumentsContainFilter(arguments))
{
- for(FilterManager filter : bindingMessageFilterMap.values())
+ try
{
- if(filter.allAllow(Filterable.Factory.newInstance(payload, instanceProperties)))
+ List<Queue<?>> filteredQueues;
+ if (!(_filteredQueues.contains(binding.getDestination())
+ || _unfilteredQueues.contains(binding.getDestination())))
{
- result.add(q);
- break;
+ filteredQueues = new ArrayList<>(_filteredQueues);
+ filteredQueues.add((Queue<?>) binding.getDestination());
+ filteredQueues = Collections.unmodifiableList(filteredQueues);
}
+ else
+ {
+ filteredQueues = _filteredQueues;
+ }
+ Map<Queue<?>, Map<BindingIdentifier, FilterManager>> filteredBindings =
+ new HashMap<>(_filteredBindings);
+ Map<BindingIdentifier, FilterManager> bindingsForQueue =
+ filteredBindings.get(binding.getDestination());
+ if (bindingsForQueue == null)
+ {
+ bindingsForQueue = new HashMap<>();
+ }
+ else
+ {
+ bindingsForQueue = new HashMap<>(bindingsForQueue);
+ }
+ bindingsForQueue.put(binding,
+ FilterSupport.createMessageFilter(arguments,
+ (Queue<?>) binding.getDestination()));
+ filteredBindings.put((Queue<?>) binding.getDestination(), bindingsForQueue);
+ return new BindingSet(_queues, _unfilteredQueues, filteredQueues, Collections.unmodifiableMap(filteredBindings));
+ }
+ catch (AMQInvalidArgumentException e)
+ {
+ _logger.warn("Binding ignored: cannot parse filter on binding of queue '" + binding.getDestination().getName()
+ + "' to exchange '" + FanoutExchangeImpl.this.getName()
+ + "' with arguments: " + arguments, e);
+ return this;
}
}
- }
-
+ else
+ {
+ Map<MessageDestination, Integer> queues = new HashMap<>(_queues);
+ List<Queue<?>> unfilteredQueues;
+ List<Queue<?>> filteredQueues;
+ if (queues.containsKey(binding.getDestination()))
+ {
+ queues.put(binding.getDestination(), queues.get(binding.getDestination()) + 1);
+ unfilteredQueues = _unfilteredQueues;
+ filteredQueues = _filteredQueues;
+ }
+ else
+ {
+ queues.put(binding.getDestination(), ONE);
+ unfilteredQueues = new ArrayList<>(_unfilteredQueues);
+ unfilteredQueues.add((Queue<?>)binding.getDestination());
+ unfilteredQueues = Collections.unmodifiableList(unfilteredQueues);
+ if(_filteredQueues.contains(binding.getDestination()))
+ {
+ filteredQueues = new ArrayList<>(_filteredQueues);
+ filteredQueues.remove(binding.getDestination());
+ filteredQueues = Collections.unmodifiableList(filteredQueues);
+ }
+ else
+ {
+ filteredQueues = _filteredQueues;
+ }
+ }
+ return new BindingSet(queues, unfilteredQueues, filteredQueues, _filteredBindings);
+ }
}
+ public BindingSet updateBinding(final BindingIdentifier binding, final Map<String, Object> newArguments)
+ {
- _logger.debug("Publishing message to queue {}", result);
-
- return result;
-
- }
-
- @Override
- protected synchronized void onBindingUpdated(final Binding<?> binding, final Map<String, Object> oldArguments)
- {
- Queue<?> queue = binding.getQueue();
+ return removeBinding(binding).addBinding(binding, newArguments);
+ }
- if (binding.getArguments() == null || binding.getArguments().isEmpty() || !FilterSupport.argumentsContainFilter(
- binding.getArguments()))
+ public BindingSet removeBinding(final BindingIdentifier binding)
{
- if(oldArguments != null && !oldArguments.isEmpty() && FilterSupport.argumentsContainFilter(oldArguments))
+ Queue<?> queue = (Queue<?>) binding.getDestination();
+ if(_filteredBindings.get(queue).containsKey(binding))
{
- _unfilteredQueues.add(queue);
- if(_queues.containsKey(queue))
+ final Map<Queue<?>, Map<BindingIdentifier, FilterManager>> filteredBindings = new HashMap<>(_filteredBindings);
+ final Map<BindingIdentifier, FilterManager> bindingsForQueue = new HashMap<>(filteredBindings.remove(queue));
+ bindingsForQueue.remove(binding);
+ List<Queue<?>> filteredQueues;
+ if(bindingsForQueue.isEmpty())
{
- _queues.put(queue,_queues.get(queue)+1);
+ filteredQueues = new ArrayList<>(_filteredQueues);
+ filteredQueues.remove(queue);
+ filteredQueues = Collections.unmodifiableList(filteredQueues);
}
else
{
- _queues.put(queue, ONE);
+ filteredBindings.put(queue, bindingsForQueue);
+ filteredQueues = _filteredQueues;
}
-
- // No longer any reason to check filters for this queue
- _filteredQueues.remove(queue);
+ return new BindingSet(_queues, _unfilteredQueues, filteredQueues, Collections.unmodifiableMap(filteredBindings));
}
- // else - nothing has changed, remains unfiltered
- }
- else
- {
- HashMap<Queue<?>,Map<Binding<?>, FilterManager>> filteredBindings =
- new HashMap<>(_filteredBindings.get());
-
- Map<Binding<?>,FilterManager> bindingsForQueue;
-
- final FilterManager messageFilter;
-
- try
- {
- messageFilter = FilterSupport.createMessageFilter(binding.getArguments(), binding.getQueue());
- }
- catch (AMQInvalidArgumentException e)
- {
- _logger.warn("Cannot bind queue " + queue + " to exchange this " + this + " because selector cannot be parsed.", e);
- return;
- }
-
-
- if (oldArguments != null && !oldArguments.isEmpty() && FilterSupport.argumentsContainFilter(oldArguments))
+ else if(_unfilteredQueues.contains(queue))
{
- bindingsForQueue = new HashMap<>(filteredBindings.remove(binding.getQueue()));
- }
- else // previously unfiltered
- {
- bindingsForQueue = new HashMap<>();
-
- Integer oldValue = _queues.remove(queue);
- if (ONE.equals(oldValue))
+ Map<MessageDestination, Integer> queues = new HashMap<>(_queues);
+ int count = queues.remove(queue);
+ List<Queue<?>> unfilteredQueues;
+ List<Queue<?>> filteredQueues;
+ if(count > 1)
{
- // should start checking filters for this queue
- _filteredQueues.add(queue);
- _unfilteredQueues.remove(queue);
+ queues.put(queue, --count);
+ unfilteredQueues = _unfilteredQueues;
+ filteredQueues = _filteredQueues;
}
else
{
- _queues.put(queue, oldValue - 1);
+ unfilteredQueues = new ArrayList<>(_unfilteredQueues);
+ unfilteredQueues.remove(queue);
+ unfilteredQueues = Collections.unmodifiableList(unfilteredQueues);
+ if(_filteredBindings.containsKey(queue))
+ {
+ filteredQueues = new ArrayList<>(_filteredQueues);
+ filteredQueues.add(queue);
+ filteredQueues = Collections.unmodifiableList(filteredQueues);
+ }
+ else
+ {
+ filteredQueues = _filteredQueues;
+ }
}
-
+ return new BindingSet(Collections.unmodifiableMap(queues), unfilteredQueues, filteredQueues, _filteredBindings);
}
- bindingsForQueue.put(binding, messageFilter);
- filteredBindings.put(binding.getQueue(),bindingsForQueue);
+ else
+ {
+ return this;
+ }
+ }
+ }
- _filteredBindings.set(filteredBindings);
- }
+ private volatile BindingSet _bindingSet = new BindingSet();
+ /**
+ * Maps from queue name to queue instances
+ */
+
+ @ManagedObjectFactoryConstructor
+ public FanoutExchangeImpl(final Map<String, Object> attributes, final QueueManagingVirtualHost<?> vhost)
+ {
+ super(attributes, vhost);
}
@Override
- protected synchronized void onBind(final Binding<?> binding)
+ public ArrayList<BaseQueue> doRoute(ServerMessage payload,
+ final String routingKey,
+ final InstanceProperties instanceProperties)
{
- Queue<?> queue = binding.getQueue();
- assert queue != null;
- if(binding.getArguments() == null || binding.getArguments().isEmpty() || !FilterSupport.argumentsContainFilter(binding.getArguments()))
- {
- Integer oldVal;
- if(_queues.containsKey(queue))
- {
- _queues.put(queue,_queues.get(queue)+1);
- }
- else
- {
- _queues.put(queue, ONE);
- _unfilteredQueues.add(queue);
- // No longer any reason to check filters for this queue
- _filteredQueues.remove(queue);
- }
+ BindingSet bindingSet = _bindingSet;
+ final ArrayList<BaseQueue> result = new ArrayList<BaseQueue>(bindingSet._unfilteredQueues);
- }
- else
+
+ final Map<Queue<?>, Map<BindingIdentifier, FilterManager>> filteredBindings = bindingSet._filteredBindings;
+ if(!bindingSet._filteredQueues.isEmpty())
{
- try
+ for(Queue<?> q : bindingSet._filteredQueues)
{
-
- HashMap<Queue<?>,Map<Binding<?>, FilterManager>> filteredBindings =
- new HashMap<>(_filteredBindings.get());
-
- Map<Binding<?>, FilterManager> bindingsForQueue = filteredBindings.remove(binding.getQueue());
- final FilterManager messageFilter =
- FilterSupport.createMessageFilter(binding.getArguments(), binding.getQueue());
-
- if(bindingsForQueue != null)
- {
- bindingsForQueue = new HashMap<>(bindingsForQueue);
- bindingsForQueue.put(binding, messageFilter);
- }
- else
+ final Map<BindingIdentifier, FilterManager> bindingMessageFilterMap = filteredBindings.get(q);
+ if(!(bindingMessageFilterMap == null || result.contains(q)))
{
- bindingsForQueue = Collections.<Binding<?>, FilterManager>singletonMap(binding, messageFilter);
- if(!_unfilteredQueues.contains(queue))
+ for(FilterManager filter : bindingMessageFilterMap.values())
{
- _filteredQueues.add(queue);
+ if(filter.allAllow(Filterable.Factory.newInstance(payload, instanceProperties)))
+ {
+ result.add(q);
+ break;
+ }
}
}
+ }
- filteredBindings.put(binding.getQueue(), bindingsForQueue);
+ }
- _filteredBindings.set(filteredBindings);
- }
- catch (AMQInvalidArgumentException e)
- {
- _logger.warn("Cannot bind queue " + queue + " to exchange this " + this + " because selector cannot be parsed.", e);
- return;
- }
- }
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Binding queue " + queue
- + " with routing key " + binding.getBindingKey() + " to exchange " + this);
- }
+ _logger.debug("Publishing message to queue {}", result);
+
+ return result;
+
}
@Override
- protected synchronized void onUnbind(final Binding<?> binding)
+ protected void onBindingUpdated(final BindingIdentifier binding,
+ final Map<String, Object> newArguments)
{
- Queue<?> queue = binding.getQueue();
- if(binding.getArguments() == null || binding.getArguments().isEmpty() || !FilterSupport.argumentsContainFilter(binding.getArguments()))
- {
- Integer oldValue = _queues.remove(queue);
- if(ONE.equals(oldValue))
- {
- // should start checking filters for this queue
- if(_filteredBindings.get().containsKey(queue))
- {
- _filteredQueues.add(queue);
- }
- _unfilteredQueues.remove(queue);
- }
- else
- {
- _queues.put(queue,oldValue-1);
- }
- }
- else // we are removing a binding with filters
- {
- HashMap<Queue<?>,Map<Binding<?>, FilterManager>> filteredBindings =
- new HashMap<>(_filteredBindings.get());
+ _bindingSet = _bindingSet.updateBinding(binding, newArguments);
+ }
- Map<Binding<?>,FilterManager> bindingsForQueue = filteredBindings.remove(binding.getQueue());
- if(bindingsForQueue.size()>1)
- {
- bindingsForQueue = new HashMap<>(bindingsForQueue);
- bindingsForQueue.remove(binding);
- filteredBindings.put(binding.getQueue(),bindingsForQueue);
- }
- else
- {
- _filteredQueues.remove(queue);
- }
- _filteredBindings.set(filteredBindings);
+ @Override
+ protected void onBind(final BindingIdentifier binding, final Map<String, Object> arguments)
+ {
+ _bindingSet = _bindingSet.addBinding(binding, arguments);
+ }
- }
+ @Override
+ protected void onUnbind(final BindingIdentifier binding)
+ {
+ _bindingSet = _bindingSet.removeBinding(binding);
}
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java Thu Dec 22 17:59:46 2016
@@ -35,7 +35,7 @@ import org.apache.qpid.server.filter.Fil
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.Queue;
/**
* Defines binding and matching based on a set of headers.
@@ -45,7 +45,7 @@ class HeadersBinding
private static final Logger _logger = LoggerFactory.getLogger(HeadersBinding.class);
private final Map<String,Object> _mappings;
- private final Binding<?> _binding;
+ private final AbstractExchange.BindingIdentifier _binding;
private final Set<String> required = new HashSet<String>();
private final Map<String,Object> matches = new HashMap<String,Object>();
private boolean matchAny;
@@ -59,12 +59,11 @@ class HeadersBinding
*
* @param binding the binding to create a header binding using
*/
- public HeadersBinding(Binding<?> binding)
+ public HeadersBinding(AbstractExchange.BindingIdentifier binding, Map<String,Object> arguments)
{
_binding = binding;
if(_binding !=null)
{
- Map<String, Object> arguments = _binding.getArguments();
_mappings = arguments == null ? Collections.<String,Object>emptyMap() : arguments;
initMappings();
}
@@ -80,13 +79,12 @@ class HeadersBinding
{
try
{
- _filter = FilterSupport.createMessageFilter(_mappings,_binding.getQueue());
+ _filter = FilterSupport.createMessageFilter(_mappings, (Queue<?>) _binding.getDestination());
}
catch (AMQInvalidArgumentException e)
{
- _logger.warn("Invalid filter in binding queue '"+_binding.getQueue().getName()
- +"' to exchange '"+_binding.getExchange().getName()
- +"' with arguments: " + _binding.getArguments());
+ _logger.warn("Invalid filter in binding queue '"+_binding.getDestination().getName()
+ +"' with arguments: " + _mappings);
_filter = new FilterManager();
_filter.add("x-exclude-all", new ExcludeAllFilter());
@@ -111,7 +109,7 @@ class HeadersBinding
}
}
- public Binding<?> getBinding()
+ public AbstractExchange.BindingIdentifier getBinding()
{
return _binding;
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java Thu Dec 22 17:59:46 2016
@@ -21,6 +21,7 @@
package org.apache.qpid.server.exchange;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.ListIterator;
import java.util.Map;
@@ -32,12 +33,9 @@ import java.util.concurrent.CopyOnWriteA
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.Binding;
-import org.apache.qpid.server.model.ManagedObject;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.BaseQueue;
@@ -75,7 +73,7 @@ public class HeadersExchangeImpl extends
private static final Logger _logger = LoggerFactory.getLogger(HeadersExchangeImpl.class);
- private final ConcurrentMap<String, CopyOnWriteArraySet<Binding<?>>> _bindingsByKey =
+ private final ConcurrentMap<String, CopyOnWriteArraySet<BindingIdentifier>> _bindingsByKey =
new ConcurrentHashMap<>();
private final CopyOnWriteArrayList<HeadersBinding> _bindingHeaderMatchers =
@@ -100,16 +98,15 @@ public class HeadersExchangeImpl extends
{
if (hb.matches(Filterable.Factory.newInstance(payload,instanceProperties)))
{
- Binding<?> b = hb.getBinding();
+ BindingIdentifier b = hb.getBinding();
- b.incrementMatches();
if (_logger.isDebugEnabled())
{
_logger.debug("Exchange " + getName() + ": delivering message with headers " +
- payload.getMessageHeader() + " to " + b.getQueue().getName());
+ payload.getMessageHeader() + " to " + b.getDestination().getName());
}
- queues.add(b.getQueue());
+ queues.add((BaseQueue) b.getDestination());
}
}
@@ -117,20 +114,17 @@ public class HeadersExchangeImpl extends
}
@Override
- protected void onBind(final Binding<?> binding)
+ protected void onBind(final BindingIdentifier binding, Map<String,Object> arguments)
{
String bindingKey = binding.getBindingKey();
- Queue<?> queue = binding.getQueue();
+ Queue<?> queue = (Queue<?>) binding.getDestination();
- assert queue != null;
- assert bindingKey != null;
-
- CopyOnWriteArraySet<Binding<?>> bindings = _bindingsByKey.get(bindingKey);
+ CopyOnWriteArraySet<BindingIdentifier> bindings = _bindingsByKey.get(bindingKey);
if(bindings == null)
{
bindings = new CopyOnWriteArraySet<>();
- CopyOnWriteArraySet<Binding<?>> newBindings;
+ CopyOnWriteArraySet<BindingIdentifier> newBindings;
if((newBindings = _bindingsByKey.putIfAbsent(bindingKey, bindings)) != null)
{
bindings = newBindings;
@@ -140,18 +134,18 @@ public class HeadersExchangeImpl extends
if(_logger.isDebugEnabled())
{
_logger.debug("Exchange " + getName() + ": Binding " + queue.getName() +
- " with binding key '" +bindingKey + "' and args: " + binding.getArguments());
+ " with binding key '" +bindingKey + "' and args: " + arguments);
}
- _bindingHeaderMatchers.add(new HeadersBinding(binding));
+ _bindingHeaderMatchers.add(new HeadersBinding(binding, arguments));
bindings.add(binding);
}
@Override
- protected void onBindingUpdated(final Binding<?> binding, final Map<String, Object> oldArguments)
+ protected void onBindingUpdated(final BindingIdentifier binding, final Map<String, Object> arguments)
{
- HeadersBinding headersBinding = new HeadersBinding(binding);
+ HeadersBinding headersBinding = new HeadersBinding(binding, arguments);
ListIterator<HeadersBinding> iter = _bindingHeaderMatchers.listIterator();
while(iter.hasNext())
{
@@ -163,17 +157,17 @@ public class HeadersExchangeImpl extends
}
- protected void onUnbind(final Binding<?> binding)
+ protected void onUnbind(final BindingIdentifier binding)
{
assert binding != null;
- CopyOnWriteArraySet<Binding<?>> bindings = _bindingsByKey.get(binding.getBindingKey());
+ CopyOnWriteArraySet<BindingIdentifier> bindings = _bindingsByKey.get(binding.getBindingKey());
if(bindings != null)
{
bindings.remove(binding);
}
- boolean removedBinding = _bindingHeaderMatchers.remove(new HeadersBinding(binding));
+ boolean removedBinding = _bindingHeaderMatchers.remove(new HeadersBinding(binding, Collections.<String,Object>emptyMap()));
_logger.debug("Removing Binding: {}", removedBinding);
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java Thu Dec 22 17:59:46 2016
@@ -56,7 +56,7 @@ class TopicExchangeImpl extends Abstract
private final Map<String, TopicExchangeResult> _topicExchangeResults =
new ConcurrentHashMap<String, TopicExchangeResult>();
- private final Map<Binding<?>, Map<String,Object>> _bindings = new HashMap<>();
+ private final Map<BindingIdentifier, Map<String,Object>> _bindings = new HashMap<>();
@ManagedObjectFactoryConstructor
public TopicExchangeImpl(final Map<String,Object> attributes, final QueueManagingVirtualHost<?> vhost)
@@ -65,14 +65,10 @@ class TopicExchangeImpl extends Abstract
}
@Override
- protected synchronized void onBindingUpdated(final Binding<?> binding, final Map<String, Object> oldArguments)
+ protected synchronized void onBindingUpdated(final BindingIdentifier binding, final Map<String, Object> newArguments)
{
final String bindingKey = binding.getBindingKey();
- Queue<?> queue = binding.getQueue();
- Map<String,Object> args = binding.getArguments();
-
- assert queue != null;
- assert bindingKey != null;
+ Queue<?> queue = (Queue<?>) binding.getDestination();
_logger.debug("Updating binding of queue {} with routing key {}", queue.getName(), bindingKey);
@@ -84,21 +80,20 @@ class TopicExchangeImpl extends Abstract
if (_bindings.containsKey(binding))
{
- Map<String, Object> oldArgs = _bindings.get(binding);
- _bindings.put(binding, args);
+ Map<String, Object> oldArgs = _bindings.put(binding, newArguments);
TopicExchangeResult result = _topicExchangeResults.get(routingKey);
- if (FilterSupport.argumentsContainFilter(args))
+ if (FilterSupport.argumentsContainFilter(newArguments))
{
if (FilterSupport.argumentsContainFilter(oldArgs))
{
result.replaceQueueFilter(queue,
FilterSupport.createMessageFilter(oldArgs, queue),
- FilterSupport.createMessageFilter(args, queue));
+ FilterSupport.createMessageFilter(newArguments, queue));
}
else
{
- result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue));
+ result.addFilteredQueue(queue, FilterSupport.createMessageFilter(newArguments, queue));
result.removeUnfilteredQueue(queue);
}
}
@@ -126,14 +121,10 @@ class TopicExchangeImpl extends Abstract
}
- protected synchronized void registerQueue(final Binding<?> binding) throws AMQInvalidArgumentException
+ protected synchronized void registerQueue(final BindingIdentifier binding, Map<String,Object> arguments) throws AMQInvalidArgumentException
{
final String bindingKey = binding.getBindingKey();
- Queue<?> queue = binding.getQueue();
- Map<String,Object> args = binding.getArguments();
-
- assert queue != null;
- assert bindingKey != null;
+ Queue<?> queue = (Queue<?>) binding.getDestination();
_logger.debug("Registering queue {} with routing key {}", queue.getName(), bindingKey);
@@ -142,20 +133,20 @@ class TopicExchangeImpl extends Abstract
if(_bindings.containsKey(binding))
{
- Map<String,Object> oldArgs = _bindings.get(binding);
+ Map<String,Object> oldArgs = _bindings.put(binding, arguments);
TopicExchangeResult result = _topicExchangeResults.get(routingKey);
- if(FilterSupport.argumentsContainFilter(args))
+ if(FilterSupport.argumentsContainFilter(arguments))
{
if(FilterSupport.argumentsContainFilter(oldArgs))
{
result.replaceQueueFilter(queue,
FilterSupport.createMessageFilter(oldArgs, queue),
- FilterSupport.createMessageFilter(args, queue));
+ FilterSupport.createMessageFilter(arguments, queue));
}
else
{
- result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue));
+ result.addFilteredQueue(queue, FilterSupport.createMessageFilter(arguments, queue));
result.removeUnfilteredQueue(queue);
}
}
@@ -183,9 +174,9 @@ class TopicExchangeImpl extends Abstract
if(result == null)
{
result = new TopicExchangeResult();
- if(FilterSupport.argumentsContainFilter(args))
+ if(FilterSupport.argumentsContainFilter(arguments))
{
- result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue));
+ result.addFilteredQueue(queue, FilterSupport.createMessageFilter(arguments, queue));
}
else
{
@@ -196,9 +187,9 @@ class TopicExchangeImpl extends Abstract
}
else
{
- if(FilterSupport.argumentsContainFilter(args))
+ if(FilterSupport.argumentsContainFilter(arguments))
{
- result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue));
+ result.addFilteredQueue(queue, FilterSupport.createMessageFilter(arguments, queue));
}
else
{
@@ -207,7 +198,7 @@ class TopicExchangeImpl extends Abstract
}
result.addBinding(binding);
- _bindings.put(binding, args);
+ _bindings.put(binding, arguments);
}
}
@@ -246,7 +237,7 @@ class TopicExchangeImpl extends Abstract
}
- private synchronized boolean deregisterQueue(final Binding<?> binding)
+ private synchronized boolean deregisterQueue(final BindingIdentifier binding)
{
if(_bindings.containsKey(binding))
{
@@ -263,8 +254,8 @@ class TopicExchangeImpl extends Abstract
{
try
{
- result.removeFilteredQueue(binding.getQueue(), FilterSupport.createMessageFilter(bindingArgs,
- binding.getQueue()));
+ result.removeFilteredQueue((Queue<?>) binding.getDestination(), FilterSupport.createMessageFilter(bindingArgs,
+ (Queue<?>) binding.getDestination()));
}
catch (AMQInvalidArgumentException e)
{
@@ -273,7 +264,7 @@ class TopicExchangeImpl extends Abstract
}
else
{
- result.removeUnfilteredQueue(binding.getQueue());
+ result.removeUnfilteredQueue((Queue<?>) binding.getDestination());
}
return true;
}
@@ -301,11 +292,6 @@ class TopicExchangeImpl extends Abstract
{
TopicExchangeResult res = (TopicExchangeResult)result;
- for(Binding<?> b : res.getBindings())
- {
- b.incrementMatches();
- }
-
queues = res.processMessage(message, queues);
}
return queues;
@@ -314,11 +300,12 @@ class TopicExchangeImpl extends Abstract
}
- protected void onBind(final Binding<?> binding)
+ @Override
+ protected void onBind(final BindingIdentifier binding, Map<String, Object> arguments)
{
try
{
- registerQueue(binding);
+ registerQueue(binding, arguments);
}
catch (AMQInvalidArgumentException e)
{
@@ -327,7 +314,8 @@ class TopicExchangeImpl extends Abstract
}
}
- protected void onUnbind(final Binding<?> binding)
+ @Override
+ protected void onUnbind(final BindingIdentifier binding)
{
deregisterQueue(binding);
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java Thu Dec 22 17:59:46 2016
@@ -30,14 +30,14 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.qpid.server.exchange.AbstractExchange;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.Filterable;
-import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Queue;
public final class TopicExchangeResult implements TopicMatcherResult
{
- private final List<Binding<?>> _bindings = new CopyOnWriteArrayList<>();
+ private final List<AbstractExchange.BindingIdentifier> _bindings = new CopyOnWriteArrayList<>();
private final Map<Queue<?>, Integer> _unfilteredQueues = new ConcurrentHashMap<>();
private final ConcurrentMap<Queue<?>, Map<FilterManager,Integer>> _filteredQueues = new ConcurrentHashMap<>();
private volatile ArrayList<Queue<?>> _unfilteredQueueList = new ArrayList<>(0);
@@ -81,17 +81,17 @@ public final class TopicExchangeResult i
return _unfilteredQueues.keySet();
}
- public void addBinding(Binding<?> binding)
+ public void addBinding(AbstractExchange.BindingIdentifier binding)
{
_bindings.add(binding);
}
- public void removeBinding(Binding<?> binding)
+ public void removeBinding(AbstractExchange.BindingIdentifier binding)
{
_bindings.remove(binding);
}
- public List<Binding<?>> getBindings()
+ public List<AbstractExchange.BindingIdentifier> getBindings()
{
return new ArrayList<>(_bindings);
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BindingMessages.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BindingMessages.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BindingMessages.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BindingMessages.java Thu Dec 22 17:59:46 2016
@@ -79,37 +79,14 @@ public class BindingMessages
/**
* Log a Binding message of the Format:
- * <pre>BND-1001 : Create[ : Arguments : {0}]</pre>
+ * <pre>BND-1001 : Create : {0}</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage CREATED(String param1, boolean opt1)
+ public static LogMessage CREATED(String param1)
{
String rawMessage = _messages.getString("CREATED");
- StringBuffer msg = new StringBuffer();
-
- // Split the formatted message up on the option values so we can
- // rebuild the message based on the configured options.
- String[] parts = rawMessage.split("\\[");
- msg.append(parts[0]);
-
- int end;
- if (parts.length > 1)
- {
-
- // Add Option : : Arguments : {0}.
- end = parts[1].indexOf(']');
- if (opt1)
- {
- msg.append(parts[1].substring(0, end));
- }
-
- // Use 'end + 1' to remove the ']' from the output
- msg.append(parts[1].substring(end + 1));
- }
-
- rawMessage = msg.toString();
final Object[] messageArguments = {param1};
// Create a new MessageFormat to ensure thread safety.
@@ -160,16 +137,21 @@ public class BindingMessages
/**
* Log a Binding message of the Format:
- * <pre>BND-1002 : Deleted</pre>
+ * <pre>BND-1002 : Deleted : {0}</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage DELETED()
+ public static LogMessage DELETED(String param1)
{
String rawMessage = _messages.getString("DELETED");
- final String message = rawMessage;
+ final Object[] messageArguments = {param1};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+ final String message = formatter.format(messageArguments);
return new LogMessage()
{
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Binding_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Binding_logmessages.properties?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Binding_logmessages.properties (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Binding_logmessages.properties Thu Dec 22 17:59:46 2016
@@ -18,7 +18,7 @@
#
# Default File used for all non-defined locales.
#
-CREATED = BND-1001 : Create[ : Arguments : {0}]
-DELETED = BND-1002 : Deleted
+CREATED = BND-1001 : Create : {0}
+DELETED = BND-1002 : Deleted : {0}
# 0 - operation name
OPERATION = BND-1003 : Operation : {0}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java Thu Dec 22 17:59:46 2016
@@ -54,4 +54,9 @@ public interface MessageDestination exte
InstanceProperties instanceProperties,
ServerTransaction txn,
Action<? super MessageInstance> postEnqueueAction);
+
+ boolean isDurable();
+
+ void linkAdded(MessageSender sender, String linkName);
+ void linkRemoved(MessageSender sender, String linkName);
}
Copied: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSender.java (from r1775646, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSender.java?p2=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSender.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java&r1=1775646&r2=1775689&rev=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSender.java Thu Dec 22 17:59:46 2016
@@ -18,34 +18,9 @@
* under the License.
*
*/
-package org.apache.qpid.server.model;
+package org.apache.qpid.server.message;
-import java.util.Map;
-
-@ManagedObject( amqpName = "org.apache.qpid.Binding")
-public interface Binding<X extends Binding<X>> extends ConfiguredObject<X>
+public interface MessageSender
{
-
- public String ARGUMENTS = "arguments";
- public String STATE = "state";
- public String QUEUE = "queue";
- public String EXCHANGE = "exchange";
-
- String getBindingKey();
-
- // TODO - this is a hack
- @DerivedAttribute
- Queue<?> getQueue();
-
- // TODO - this is a hack
- @DerivedAttribute
- Exchange<?> getExchange();
-
- @ManagedAttribute
- Map<String,Object> getArguments();
-
- @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.COUNT, label = "Matches")
- long getMatches();
-
- void incrementMatches();
+ void destinationRemoved(MessageDestination destination);
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java Thu Dec 22 17:59:46 2016
@@ -22,30 +22,10 @@ package org.apache.qpid.server.model;
import java.util.Map;
-@ManagedObject( amqpName = "org.apache.qpid.Binding")
-public interface Binding<X extends Binding<X>> extends ConfiguredObject<X>
+@ManagedAttributeValueType
+public interface Binding extends ManagedAttributeValue
{
-
- public String ARGUMENTS = "arguments";
- public String STATE = "state";
- public String QUEUE = "queue";
- public String EXCHANGE = "exchange";
-
String getBindingKey();
-
- // TODO - this is a hack
- @DerivedAttribute
- Queue<?> getQueue();
-
- // TODO - this is a hack
- @DerivedAttribute
- Exchange<?> getExchange();
-
- @ManagedAttribute
+ String getDestination();
Map<String,Object> getArguments();
-
- @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.COUNT, label = "Matches")
- long getMatches();
-
- void incrementMatches();
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java Thu Dec 22 17:59:46 2016
@@ -53,9 +53,10 @@ public final class BrokerModel extends M
* Bring model version and Qpid version into sync
* 6.1 Remove JMX
* Remove PreferencesProvider
+ * 7.0 Remove bindings, Consumer sole parent is Queue
*/
- public static final int MODEL_MAJOR_VERSION = 6;
- public static final int MODEL_MINOR_VERSION = 1;
+ public static final int MODEL_MAJOR_VERSION = 7;
+ public static final int MODEL_MINOR_VERSION = 0;
public static final String MODEL_VERSION = MODEL_MAJOR_VERSION + "." + MODEL_MINOR_VERSION;
private static final Model MODEL_INSTANCE = new BrokerModel();
private final Map<Class<? extends ConfiguredObject>, Collection<Class<? extends ConfiguredObject>>> _parents =
@@ -109,13 +110,8 @@ public final class BrokerModel extends M
addRelationship(Connection.class, Session.class);
- addRelationship(Queue.class, Binding.class);
addRelationship(Queue.class, Consumer.class);
- addRelationship(Exchange.class, Binding.class);
-
- addRelationship(Session.class, Consumer.class);
-
_objectFactory = new ConfiguredObjectFactoryImpl(this);
_typeRegistry = new ConfiguredObjectTypeRegistry((new QpidServiceLoader()).instancesOf(ConfiguredObjectRegistration.class),
(new QpidServiceLoader()).instancesOf(ConfiguredObjectAttributeInjector.class),
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedMethodAttribute.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedMethodAttribute.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedMethodAttribute.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedMethodAttribute.java Thu Dec 22 17:59:46 2016
@@ -96,4 +96,11 @@ public class ConfiguredDerivedMethodAttr
return _secureValuePattern;
}
+ public T convertValue(Object input, C configuredObject)
+ {
+ final AttributeValueConverter<T> converter =
+ AttributeValueConverter.getConverter(getType(), getGetter().getGenericReturnType());
+ return converter.convert(input, configuredObject);
+ }
+
}
Copied: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/DoOnConfigThread.java (from r1775646, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/DoOnConfigThread.java?p2=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/DoOnConfigThread.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java&r1=1775646&r2=1775689&rev=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/DoOnConfigThread.java Thu Dec 22 17:59:46 2016
@@ -20,32 +20,13 @@
*/
package org.apache.qpid.server.model;
-import java.util.Map;
-
-@ManagedObject( amqpName = "org.apache.qpid.Binding")
-public interface Binding<X extends Binding<X>> extends ConfiguredObject<X>
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.SOURCE)
+@Target(ElementType.METHOD)
+public @interface DoOnConfigThread
{
-
- public String ARGUMENTS = "arguments";
- public String STATE = "state";
- public String QUEUE = "queue";
- public String EXCHANGE = "exchange";
-
- String getBindingKey();
-
- // TODO - this is a hack
- @DerivedAttribute
- Queue<?> getQueue();
-
- // TODO - this is a hack
- @DerivedAttribute
- Exchange<?> getExchange();
-
- @ManagedAttribute
- Map<String,Object> getArguments();
-
- @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.COUNT, label = "Matches")
- long getMatches();
-
- void incrementMatches();
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java Thu Dec 22 17:59:46 2016
@@ -23,17 +23,17 @@ package org.apache.qpid.server.model;
import java.util.Collection;
import java.util.Map;
-import com.google.common.util.concurrent.ListenableFuture;
-
import org.apache.qpid.server.exchange.ExchangeReferrer;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageSender;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
@ManagedObject( description = Exchange.CLASS_DESCRIPTION,
amqpName = "org.apache.qpid.Exchange"
)
public interface Exchange<X extends Exchange<X>> extends ConfiguredObject<X>, MessageDestination,
- ExchangeReferrer
+ ExchangeReferrer, MessageSender
{
String CLASS_DESCRIPTION = "<p>An Exchange is a named entity within the Virtualhost which receives messages from "
+ "producers and routes them to matching Queues within the Virtualhost.</p>"
@@ -41,6 +41,7 @@ public interface Exchange<X extends Exch
+ "a different routing algorithm.</p>";
String ALTERNATE_EXCHANGE = "alternateExchange";
+ String DURABLE_BINDINGS = "durableBindings";
enum UnroutableMessageBehaviour
{
@@ -55,9 +56,13 @@ public interface Exchange<X extends Exch
@ManagedAttribute(description = "(AMQP 1.0 only) Default behaviour to apply when a message is not routed to any queues", defaultValue = "DISCARD")
UnroutableMessageBehaviour getUnroutableMessageBehaviour();
- //children
- @ManagedOperation(nonModifying = true, changesConfiguredObjectState = false)
- Collection<Binding<?>> getBindings();
+ @DerivedAttribute
+ Collection<Binding> getBindings();
+
+ Collection<Binding> getBindingsForDestination(MessageDestination destination);
+
+ @DerivedAttribute(persist = true)
+ Collection<Binding> getDurableBindings();
// Statistics
@ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.COUNT, label = "Bindings")
@@ -77,24 +82,40 @@ public interface Exchange<X extends Exch
@ManagedOperation(changesConfiguredObjectState = true)
- void bind(@Param(name="queue", mandatory = true) Queue<?> queue,
- @Param(name="bindingKey") String bindingKey,
- @Param(name="arguments", defaultValue = "{}") Map<String,Object> arguments);
+ boolean bind(@Param(name = "destination", mandatory = true) String destination,
+ @Param(name = "bindingKey") String bindingKey,
+ @Param(name = "arguments", defaultValue = "{}") Map<String, Object> arguments,
+ @Param(name = "replaceExistingArguments", defaultValue = "false") boolean replaceExistingArguments);
+
+ @ManagedOperation(changesConfiguredObjectState = true)
+ boolean unbind(@Param(name="destination", mandatory = true) String destination,
+ @Param(name="bindingKey") String bindingKey);
+
/**
* @return true if the exchange will be deleted after all queues have been detached
*/
boolean isAutoDelete();
- boolean addBinding(String bindingKey, Queue<?> queue, Map<String, Object> arguments);
+ @DoOnConfigThread
+ boolean addBinding(@Param(name = "bindingKey") String bindingKey,
+ @Param(name = "queue") Queue<?> queue,
+ @Param(name = "arguments") Map<String, Object> arguments);
+
+ @DoOnConfigThread
+ boolean deleteBinding(@Param(name = "bindingKey") String bindingKey,
+ @Param(name = "queue") Queue<?> queue);
+
+ @DoOnConfigThread
+ boolean hasBinding(@Param(name = "bindingKey") String bindingKey,
+ @Param(name = "queue") Queue<?> queue);
+
+ @DoOnConfigThread
+ void replaceBinding(@Param(name = "bindingKey") String bindingKey,
+ @Param(name = "queue") Queue<?> queue,
+ @Param(name = "arguments") Map<String, Object> arguments);
- boolean deleteBinding(String bindingKey, Queue<?> queue);
-
- boolean hasBinding(String bindingKey, Queue<?> queue);
-
- boolean replaceBinding(String bindingKey,
- Queue<?> queue,
- Map<String, Object> arguments);
+ QueueManagingVirtualHost<?> getVirtualHost();
/**
* Determines whether a message would be isBound to a particular queue using a specific routing key and arguments
@@ -103,8 +124,10 @@ public interface Exchange<X extends Exch
* @param queue
* @return
*/
-
- boolean isBound(String bindingKey, Map<String, Object> arguments, Queue<?> queue);
+ @DoOnConfigThread
+ boolean isBound(@Param(name = "bindingKey") String bindingKey,
+ @Param(name = "arguments") Map<String, Object> arguments,
+ @Param(name = "queue") Queue<?> queue);
/**
* Determines whether a message would be isBound to a particular queue using a specific routing key
@@ -112,29 +135,38 @@ public interface Exchange<X extends Exch
* @param queue
* @return
*/
-
- boolean isBound(String bindingKey, Queue<?> queue);
+ @DoOnConfigThread
+ boolean isBound(@Param(name = "bindingKey") String bindingKey,
+ @Param(name = "queue") Queue<?> queue);
/**
* Determines whether a message is routing to any queue using a specific _routing key
* @param bindingKey
* @return
*/
- boolean isBound(String bindingKey);
+ @DoOnConfigThread
+ boolean isBound(@Param(name = "bindingKey") String bindingKey);
/**
* Returns true if this exchange has at least one binding associated with it.
* @return
*/
+ @DoOnConfigThread
boolean hasBindings();
- boolean isBound(Queue<?> queue);
-
- boolean isBound(Map<String, Object> arguments);
+ @DoOnConfigThread
+ boolean isBound(@Param(name = "queue") Queue<?> queue);
- boolean isBound(String bindingKey, Map<String, Object> arguments);
+ @DoOnConfigThread
+ boolean isBound(@Param(name = "arguments") Map<String, Object> arguments);
- boolean isBound(Map<String, Object> arguments, Queue<?> queue);
+ @DoOnConfigThread
+ boolean isBound(@Param(name = "bindingKey") String bindingKey,
+ @Param(name = "arguments") Map<String, Object> arguments);
+
+ @DoOnConfigThread
+ boolean isBound(@Param(name = "arguments") Map<String, Object> arguments,
+ @Param(name = "queue") Queue<?> queue);
void removeReference(ExchangeReferrer exchange);
@@ -142,9 +174,6 @@ public interface Exchange<X extends Exch
boolean hasReferrers();
- ListenableFuture<Void> removeBindingAsync(Binding<?> binding);
-
EventLogger getEventLogger();
- void addBinding(Binding<?> binding);
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Thu Dec 22 17:59:46 2016
@@ -260,7 +260,7 @@ public interface Queue<X extends Queue<X
//children
@ManagedOperation(nonModifying = true, changesConfiguredObjectState = false)
- Collection<Binding<?>> getBindings();
+ Collection<Binding> getBindings();
@ManagedOperation(nonModifying = true, changesConfiguredObjectState = false)
@@ -420,10 +420,6 @@ public interface Queue<X extends Queue<X
boolean isExclusive();
- void addBinding(Binding<?> binding);
-
- void removeBinding(Binding<?> binding);
-
LogSubject getLogSubject();
VirtualHost<?> getVirtualHost();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java Thu Dec 22 17:59:46 2016
@@ -22,7 +22,6 @@ package org.apache.qpid.server.model.ada
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -36,7 +35,6 @@ import com.google.common.util.concurrent
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.model.AbstractConfiguredObject;
-import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.NamedAddressSpace;
@@ -132,19 +130,6 @@ public final class SessionAdapter extend
}
@Override
- public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
- {
- if (clazz == org.apache.qpid.server.model.Consumer.class)
- {
- return (Collection<C>) getConsumers();
- }
- else
- {
- return Collections.emptySet();
- }
- }
-
- @Override
public long getConsumerCount()
{
return _session.getConsumerCount();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Thu Dec 22 17:59:46 2016
@@ -60,7 +60,6 @@ import javax.security.auth.Subject;
import com.google.common.collect.Lists;
import com.google.common.io.ByteStreams;
-import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
@@ -91,6 +90,7 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.MessageInfoImpl;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageSender;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.model.*;
@@ -224,7 +224,7 @@ public abstract class AbstractQueue<X ex
private final AtomicBoolean _overfull = new AtomicBoolean(false);
private final FlowToDiskChecker _flowToDiskChecker = new FlowToDiskChecker();
- private final CopyOnWriteArrayList<Binding<?>> _bindings = new CopyOnWriteArrayList<>();
+ private final CopyOnWriteArrayList<Binding> _bindings = new CopyOnWriteArrayList<>();
private Map<String, Object> _arguments;
/** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */
@@ -233,6 +233,9 @@ public abstract class AbstractQueue<X ex
private MessageGroupManager _messageGroupManager;
+ private final ConcurrentMap<MessageSender, Integer> _linkedSenders = new ConcurrentHashMap<>();
+
+
private QueueNotificationListener _notificationListener = NULL_NOTIFICATION_LISTENER;
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
@@ -1030,26 +1033,15 @@ public abstract class AbstractQueue<X ex
}
- public void addBinding(final Binding<?> binding)
- {
- _bindings.add(binding);
- childAdded(binding);
- }
-
- public void removeBinding(final Binding<?> binding)
+ public Collection<Binding> getBindings()
{
- _bindings.remove(binding);
- childRemoved(binding);
- }
-
- public Collection<Binding<?>> getBindings()
- {
- return getBindingsImpl();
- }
-
- private Collection<Binding<?>> getBindingsImpl()
- {
- return Collections.unmodifiableList(_bindings);
+ List<Binding> bindings = new ArrayList<>();
+ for(MessageSender sender : _linkedSenders.keySet())
+ {
+ //TODO - eliminate cast
+ bindings.addAll(((Exchange)sender).getBindingsForDestination(this));
+ }
+ return bindings;
}
public int getBindingCount()
@@ -1651,68 +1643,50 @@ public abstract class AbstractQueue<X ex
if (_deleted.compareAndSet(false, true))
{
final int queueDepthMessages = getQueueDepthMessages();
- final List<ListenableFuture<Void>> removeBindingFutures = new ArrayList<>(_bindings.size());
- final ArrayList<Binding<?>> bindingCopy = new ArrayList<>(_bindings);
- // TODO - RG - Need to sort out bindings!
- for (Binding<?> b : bindingCopy)
+ for(MessageSender sender : _linkedSenders.keySet())
{
- removeBindingFutures.add(b.deleteAsync());
+ sender.destinationRemoved(this);
}
- ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(removeBindingFutures);
-
- addFutureCallback(combinedFuture, new FutureCallback<List<Void>>()
+ try
{
- @Override
- public void onSuccess(final List<Void> result)
- {
- try
- {
- Iterator<QueueConsumer<?,?>> consumerIterator = _queueConsumerManager.getAllIterator();
+ Iterator<QueueConsumer<?,?>> consumerIterator = _queueConsumerManager.getAllIterator();
- while (consumerIterator.hasNext())
- {
- QueueConsumer<?,?> consumer = consumerIterator.next();
+ while (consumerIterator.hasNext())
+ {
+ QueueConsumer<?,?> consumer = consumerIterator.next();
- if (consumer != null)
- {
- consumer.queueDeleted();
- }
- }
+ if (consumer != null)
+ {
+ consumer.queueDeleted();
+ }
+ }
- final List<QueueEntry> entries = getMessagesOnTheQueue(new AcquireAllQueueEntryFilter());
+ final List<QueueEntry> entries = getMessagesOnTheQueue(new AcquireAllQueueEntryFilter());
- routeToAlternate(entries);
+ routeToAlternate(entries);
- preSetAlternateExchange();
- _alternateExchange = null;
+ preSetAlternateExchange();
+ _alternateExchange = null;
- _stopped.set(true);
- _queueHouseKeepingTask.cancel();
+ _stopped.set(true);
+ _queueHouseKeepingTask.cancel();
- performQueueDeleteTasks();
- deleted();
+ performQueueDeleteTasks();
+ deleted();
- //Log Queue Deletion
- getEventLogger().message(_logSubject, QueueMessages.DELETED(getId().toString()));
+ //Log Queue Deletion
+ getEventLogger().message(_logSubject, QueueMessages.DELETED(getId().toString()));
- _deleteFuture.set(queueDepthMessages);
- setState(State.DELETED);
- }
- catch(Throwable e)
- {
- _deleteFuture.setException(e);
- }
- }
-
- @Override
- public void onFailure(final Throwable t)
- {
- _deleteFuture.setException(t);
- }
- }, getTaskExecutor());
+ _deleteFuture.set(queueDepthMessages);
+ setState(State.DELETED);
+ }
+ catch(Throwable e)
+ {
+ _deleteFuture.setException(e);
+ }
}
return _deleteFuture;
@@ -2957,11 +2931,7 @@ public abstract class AbstractQueue<X ex
@Override
public <C extends ConfiguredObject> Collection<C> getChildren(final Class<C> clazz)
{
- if(clazz == Binding.class)
- {
- return (Collection<C>) getBindingsImpl();
- }
- else if(clazz == org.apache.qpid.server.model.Consumer.class)
+ if(clazz == org.apache.qpid.server.model.Consumer.class)
{
return _queueConsumerManager == null
? Collections.<C>emptySet()
@@ -2971,28 +2941,6 @@ public abstract class AbstractQueue<X ex
}
@Override
- protected <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(final Class<C> childClass,
- final Map<String, Object> attributes,
- final ConfiguredObject... otherParents)
- {
- if(childClass == Binding.class && otherParents.length == 1 && otherParents[0] instanceof Exchange)
- {
- final String bindingKey = (String) attributes.get("name");
- ((Exchange<?>)otherParents[0]).addBinding(bindingKey, this,
- (Map<String,Object>) attributes.get(Binding.ARGUMENTS));
- for(Binding binding : _bindings)
- {
- if(binding.getExchange() == otherParents[0] && binding.getName().equals(bindingKey))
- {
- return Futures.immediateFuture((C) binding);
- }
- }
- return null;
- }
- return super.addChildAsync(childClass, attributes, otherParents);
- }
-
- @Override
public boolean changeAttribute(String name, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException
{
if(EXCLUSIVE.equals(name))
@@ -3445,4 +3393,24 @@ public abstract class AbstractQueue<X ex
}
}
}
+
+ @Override
+ public void linkAdded(final MessageSender sender, final String linkName)
+ {
+ Integer oldValue = _linkedSenders.putIfAbsent(sender, 1);
+ if(oldValue != null)
+ {
+ _linkedSenders.put(sender, oldValue+1);
+ }
+ }
+
+ @Override
+ public void linkRemoved(final MessageSender sender, final String linkName)
+ {
+ int oldValue = _linkedSenders.remove(sender);
+ if(oldValue != 1)
+ {
+ _linkedSenders.put(sender, oldValue-1);
+ }
+ }
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Thu Dec 22 17:59:46 2016
@@ -111,7 +111,7 @@ class QueueConsumerImpl<T extends Consum
EnumSet<ConsumerOption> optionSet,
final Integer priority)
{
- super(parentsMap(queue, target.getSessionModel().getModelObject()),
+ super(parentsMap(queue),
createAttributeMap(target.getSessionModel(), consumerName, filters, optionSet, priority));
_messageClass = messageClass;
_sessionReference = target.getSessionModel().getConnectionReference();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java Thu Dec 22 17:59:46 2016
@@ -66,6 +66,8 @@ public class BrokerStoreUpgraderAndRecov
register(new Upgrader_3_0_to_6_0());
register(new Upgrader_6_0_to_6_1());
+ register(new Upgrader_6_1_to_7_0());
+
}
private static final class Upgrader_1_0_to_1_1 extends StoreUpgraderPhase
@@ -545,6 +547,30 @@ public class BrokerStoreUpgraderAndRecov
}
+ private class Upgrader_6_1_to_7_0 extends StoreUpgraderPhase
+ {
+
+ public Upgrader_6_1_to_7_0()
+ {
+ super("modelVersion", "6.1", "7.0");
+ }
+
+ @Override
+ public void configuredObject(ConfiguredObjectRecord record)
+ {
+ if (record.getType().equals("Broker"))
+ {
+ upgradeRootRecord(record);
+ }
+ }
+
+ @Override
+ public void complete()
+ {
+
+ }
+ }
+
private static class VirtualHostEntryUpgrader
{
@SuppressWarnings("serial")
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java Thu Dec 22 17:59:46 2016
@@ -44,6 +44,7 @@ import org.apache.qpid.server.model.Virt
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.util.FixedKeyMapCreator;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
public class VirtualHostStoreUpgraderAndRecoverer extends AbstractConfigurationStoreUpgraderAndRecoverer
@@ -73,6 +74,8 @@ public class VirtualHostStoreUpgraderAnd
register(new Upgrader_2_0_to_3_0());
register(new Upgrader_3_0_to_6_0());
register(new Upgrader_6_0_to_6_1());
+ register(new Upgrader_6_1_to_7_0());
+
Map<String, UUID> defaultExchangeIds = new HashMap<String, UUID>();
for (String exchangeName : DEFAULT_EXCHANGES.keySet())
@@ -106,10 +109,10 @@ public class VirtualHostStoreUpgraderAnd
private void removeSelectorArguments(Map<String, Object> binding)
{
@SuppressWarnings("unchecked")
- Map<String, Object> arguments = new LinkedHashMap<String, Object>((Map<String,Object>)binding.get(Binding.ARGUMENTS));
+ Map<String, Object> arguments = new LinkedHashMap<String, Object>((Map<String,Object>)binding.get("arguments"));
FilterSupport.removeFilters(arguments);
- binding.put(Binding.ARGUMENTS, arguments);
+ binding.put("arguments", arguments);
}
private boolean isTopicExchange(ConfiguredObjectRecord entry)
@@ -141,7 +144,7 @@ public class VirtualHostStoreUpgraderAnd
private boolean hasSelectorArguments(Map<String, Object> binding)
{
@SuppressWarnings("unchecked")
- Map<String, Object> arguments = (Map<String, Object>) binding.get(Binding.ARGUMENTS);
+ Map<String, Object> arguments = (Map<String, Object>) binding.get("arguments");
return (arguments != null) && FilterSupport.argumentsContainFilter(arguments);
}
@@ -541,6 +544,182 @@ public class VirtualHostStoreUpgraderAnd
}
+ private static final FixedKeyMapCreator BINDING_MAP_CREATOR = new FixedKeyMapCreator("bindingKey", "destination", "arguments");
+ private static final FixedKeyMapCreator NO_ARGUMENTS_BINDING_MAP_CREATOR = new FixedKeyMapCreator("bindingKey", "destination");
+
+ private class Upgrader_6_1_to_7_0 extends StoreUpgraderPhase
+ {
+ private final Map<UUID, List<BindingRecord>> _exchangeBindings = new HashMap<>();
+ private final Map<UUID, ConfiguredObjectRecord> _exchanges = new HashMap<>();
+ private final Map<UUID, String> _queues = new HashMap<>();
+ private final Map<String, List<Map<String,Object>>> _queueBindings = new HashMap<>();
+
+
+ public Upgrader_6_1_to_7_0()
+ {
+ super("modelVersion", "6.1", "7.0");
+ }
+
+ @Override
+ public void configuredObject(ConfiguredObjectRecord record)
+ {
+ if("VirtualHost".equals(record.getType()))
+ {
+ upgradeRootRecord(record);
+ }
+ else if("Binding".equals(record.getType()))
+ {
+ BindingRecord binding = new BindingRecord(String.valueOf(record.getAttributes().get("name")),
+ record.getParents().get("Queue").toString(),
+ record.getAttributes().get("arguments"));
+ final UUID exchangeId = record.getParents().get("Exchange");
+ List<BindingRecord> existingBindings = _exchangeBindings.get(exchangeId);
+ if(existingBindings == null)
+ {
+ existingBindings = new ArrayList<>();
+ _exchangeBindings.put(exchangeId, existingBindings);
+ }
+ existingBindings.add(binding);
+ getDeleteMap().put(record.getId(), record);
+ }
+ else if("Exchange".equals(record.getType()))
+ {
+ final UUID exchangeId = record.getId();
+ _exchanges.put(exchangeId, record);
+ if(record.getAttributes().containsKey("bindings"))
+ {
+ List<BindingRecord> existingBindings = _exchangeBindings.get(exchangeId);
+ if(existingBindings == null)
+ {
+ existingBindings = new ArrayList<>();
+ _exchangeBindings.put(exchangeId, existingBindings);
+ }
+
+ List<Map<String,Object>> bindingList =
+ (List<Map<String, Object>>) record.getAttributes().get("bindings");
+ for(Map<String,Object> existingBinding : bindingList)
+ {
+ existingBindings.add(new BindingRecord((String)existingBinding.get("name"),
+ String.valueOf(existingBinding.get("queue")),
+ existingBinding.get("arguments")));
+ }
+ }
+ }
+ else if("Queue".equals(record.getType()))
+ {
+ _queues.put(record.getId(), (String) record.getAttributes().get("name"));
+ if(record.getAttributes().containsKey("bindings"))
+ {
+ _queueBindings.put(String.valueOf(record.getAttributes().get("name")),
+ (List<Map<String, Object>>) record.getAttributes().get("bindings"));
+ Map<String, Object> updatedAttributes = new HashMap<>(record.getAttributes());
+ updatedAttributes.remove("bindings");
+ getUpdateMap().put(record.getId(), new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents()));
+ }
+ }
+ }
+
+ @Override
+ public void complete()
+ {
+ for(Map.Entry<String, List<Map<String,Object>>> entry : _queueBindings.entrySet())
+ {
+ for(Map<String, Object> existingBinding : entry.getValue())
+ {
+ UUID exchangeId;
+ if(existingBinding.get("exchange") instanceof UUID)
+ {
+ exchangeId = (UUID) existingBinding.get("exchange");
+ }
+ else
+ {
+ exchangeId = getExchangeIdFromNameOrId( existingBinding.get("exchange").toString());
+ }
+ List<BindingRecord> existingBindings = _exchangeBindings.get(exchangeId);
+ if(existingBindings == null)
+ {
+ existingBindings = new ArrayList<>();
+ _exchangeBindings.put(exchangeId, existingBindings);
+ }
+ existingBindings.add(new BindingRecord((String)existingBinding.get("name"),
+ entry.getKey(),
+ existingBinding.get("arguments")));
+ }
+ }
+
+ for(Map.Entry<UUID, List<BindingRecord>> entry : _exchangeBindings.entrySet())
+ {
+ ConfiguredObjectRecord exchangeRecord = _exchanges.get(entry.getKey());
+ if(exchangeRecord != null)
+ {
+ final List<BindingRecord> bindingRecords = entry.getValue();
+ List<Map<String,Object>> actualBindings = new ArrayList<>(bindingRecords.size());
+ for(BindingRecord bindingRecord : bindingRecords)
+ {
+ if(bindingRecord._arguments == null)
+ {
+ actualBindings.add(NO_ARGUMENTS_BINDING_MAP_CREATOR.createMap(bindingRecord._name,
+ getQueueFromIdOrName(bindingRecord)));
+ }
+ else
+ {
+ actualBindings.add(BINDING_MAP_CREATOR.createMap(bindingRecord._name,
+ getQueueFromIdOrName(bindingRecord), bindingRecord._arguments));
+ }
+ }
+ Map<String, Object> updatedAttributes = new HashMap<>(exchangeRecord.getAttributes());
+ updatedAttributes.remove("bindings");
+ updatedAttributes.put("durableBindings", actualBindings);
+ exchangeRecord = new ConfiguredObjectRecordImpl(exchangeRecord.getId(), exchangeRecord.getType(), updatedAttributes, exchangeRecord.getParents());
+ getUpdateMap().put(exchangeRecord.getId(), exchangeRecord);
+ }
+
+ }
+ }
+
+ private UUID getExchangeIdFromNameOrId(final String exchange)
+ {
+ for(ConfiguredObjectRecord record : _exchanges.values())
+ {
+ if(exchange.equals(record.getAttributes().get("name")))
+ {
+ return record.getId();
+ }
+ }
+ return UUID.fromString(exchange);
+ }
+
+ private String getQueueFromIdOrName(final BindingRecord bindingRecord)
+ {
+ final String queueIdOrName = bindingRecord._queueIdOrName;
+ try
+ {
+ UUID queueId = UUID.fromString(queueIdOrName);
+ String name = _queues.get(queueId);
+ return name == null ? queueIdOrName : name;
+ }
+ catch(IllegalArgumentException e)
+ {
+ return queueIdOrName;
+ }
+ }
+
+ private class BindingRecord
+ {
+ private final String _name;
+ private final String _queueIdOrName;
+ private final Object _arguments;
+
+ public BindingRecord(final String name, final String queueIdOrName, final Object arguments)
+ {
+ _name = name;
+ _queueIdOrName = queueIdOrName;
+ _arguments = arguments;
+ }
+ }
+ }
+
+
public boolean upgradeAndRecover(final DurableConfigurationStore durableConfigurationStore,
final ConfiguredObjectRecord... initialRecords)
{
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Thu Dec 22 17:59:46 2016
@@ -1316,6 +1316,12 @@ public abstract class AbstractVirtualHos
}
@Override
+ public Queue<?> getAttainedQueue(final String name)
+ {
+ return (Queue<?>) awaitChildClassToAttainState(Queue.class, name);
+ }
+
+ @Override
public Broker<?> getBroker()
{
return _broker;
@@ -1362,6 +1368,12 @@ public abstract class AbstractVirtualHos
}
@Override
+ public MessageDestination getSystemDestination(final String name)
+ {
+ return _systemNodeDestinations.get(name);
+ }
+
+ @Override
public <T extends ConfiguredObject<?>> T getAttainedChildFromAddress(final Class<T> childClass,
final String address)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org