You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/12/22 17:59:47 UTC

svn commit: r1775689 [2/4] - in /qpid/java/trunk: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/ bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/ bdbstore/systests/src/test/java/org/apache/qpid/server/sto...

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java Thu Dec 22 17:59:46 2016
@@ -23,9 +23,8 @@ package org.apache.qpid.server.exchange;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,8 +34,8 @@ import org.apache.qpid.server.filter.Fil
 import org.apache.qpid.server.filter.FilterSupport;
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.Binding;
 import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.queue.BaseQueue;
@@ -48,248 +47,242 @@ class FanoutExchangeImpl extends Abstrac
 
     private static final Integer ONE = Integer.valueOf(1);
 
-    /**
-     * Maps from queue name to queue instances
-     */
-    private final Map<Queue<?>,Integer> _queues = new HashMap<>();
-    private final CopyOnWriteArrayList<Queue<?>> _unfilteredQueues = new CopyOnWriteArrayList<>();
-    private final CopyOnWriteArrayList<Queue<?>> _filteredQueues = new CopyOnWriteArrayList<>();
-
-    private final AtomicReference<Map<Queue<?>,Map<Binding<?>, FilterManager>>>  _filteredBindings =
-            new AtomicReference<>();
+    private final class BindingSet
     {
-        Map<Queue<?>,Map<Binding<?>, FilterManager>> emptyMap = Collections.emptyMap();
-        _filteredBindings.set(emptyMap);
-    }
+        private final Map<MessageDestination,Integer> _queues;
 
-    @ManagedObjectFactoryConstructor
-    public FanoutExchangeImpl(final Map<String, Object> attributes, final QueueManagingVirtualHost<?> vhost)
-    {
-        super(attributes, vhost);
-    }
+        private final List<Queue<?>> _unfilteredQueues;
+        private final List<Queue<?>> _filteredQueues;
 
-    @Override
-    public ArrayList<BaseQueue> doRoute(ServerMessage payload,
-                                        final String routingKey,
-                                        final InstanceProperties instanceProperties)
-    {
+        private final Map<Queue<?>,Map<BindingIdentifier, FilterManager>> _filteredBindings;
 
-        for(Binding<?> b : getBindings())
+        public BindingSet(final Map<MessageDestination, Integer> queues,
+                          final List<Queue<?>> unfilteredQueues,
+                          final List<Queue<?>> filteredQueues,
+                          final Map<Queue<?>, Map<BindingIdentifier, FilterManager>> filteredBindings)
         {
-            b.incrementMatches();
+            _queues = queues;
+            _unfilteredQueues = unfilteredQueues;
+            _filteredQueues = filteredQueues;
+            _filteredBindings = filteredBindings;
         }
 
-        final ArrayList<BaseQueue> result = new ArrayList<BaseQueue>(_unfilteredQueues);
-
+        public BindingSet()
+        {
+            _queues = Collections.emptyMap();
+            _unfilteredQueues = Collections.emptyList();
+            _filteredQueues = Collections.emptyList();
+            _filteredBindings = Collections.emptyMap();
+        }
 
-        final Map<Queue<?>, Map<Binding<?>, FilterManager>> filteredBindings = _filteredBindings.get();
-        if(!_filteredQueues.isEmpty())
+        public BindingSet addBinding(final BindingIdentifier binding, final Map<String, Object> arguments)
         {
-            for(Queue<?> q : _filteredQueues)
-            {
-                final Map<Binding<?>, FilterManager> bindingMessageFilterMap = filteredBindings.get(q);
-                if(!(bindingMessageFilterMap == null || result.contains(q)))
+                if(FilterSupport.argumentsContainFilter(arguments))
                 {
-                    for(FilterManager filter : bindingMessageFilterMap.values())
+                    try
                     {
-                        if(filter.allAllow(Filterable.Factory.newInstance(payload, instanceProperties)))
+                        List<Queue<?>> filteredQueues;
+                        if (!(_filteredQueues.contains(binding.getDestination())
+                              || _unfilteredQueues.contains(binding.getDestination())))
                         {
-                            result.add(q);
-                            break;
+                            filteredQueues = new ArrayList<>(_filteredQueues);
+                            filteredQueues.add((Queue<?>) binding.getDestination());
+                            filteredQueues = Collections.unmodifiableList(filteredQueues);
                         }
+                        else
+                        {
+                            filteredQueues = _filteredQueues;
+                        }
+                        Map<Queue<?>, Map<BindingIdentifier, FilterManager>> filteredBindings =
+                                new HashMap<>(_filteredBindings);
+                        Map<BindingIdentifier, FilterManager> bindingsForQueue =
+                                filteredBindings.get(binding.getDestination());
+                        if (bindingsForQueue == null)
+                        {
+                            bindingsForQueue = new HashMap<>();
+                        }
+                        else
+                        {
+                            bindingsForQueue = new HashMap<>(bindingsForQueue);
+                        }
+                        bindingsForQueue.put(binding,
+                                             FilterSupport.createMessageFilter(arguments,
+                                                                               (Queue<?>) binding.getDestination()));
+                        filteredBindings.put((Queue<?>) binding.getDestination(), bindingsForQueue);
+                        return new BindingSet(_queues, _unfilteredQueues, filteredQueues, Collections.unmodifiableMap(filteredBindings));
+                    }
+                    catch (AMQInvalidArgumentException e)
+                    {
+                        _logger.warn("Binding ignored: cannot parse filter on binding of queue '" + binding.getDestination().getName()
+                                     + "' to exchange '" + FanoutExchangeImpl.this.getName()
+                                     + "' with arguments: " + arguments, e);
+                        return this;
                     }
                 }
-            }
-
+                else
+                {
+                    Map<MessageDestination, Integer> queues = new HashMap<>(_queues);
+                    List<Queue<?>> unfilteredQueues;
+                    List<Queue<?>> filteredQueues;
+                    if (queues.containsKey(binding.getDestination()))
+                    {
+                        queues.put(binding.getDestination(), queues.get(binding.getDestination()) + 1);
+                        unfilteredQueues = _unfilteredQueues;
+                        filteredQueues = _filteredQueues;
+                    }
+                    else
+                    {
+                        queues.put(binding.getDestination(), ONE);
+                        unfilteredQueues = new ArrayList<>(_unfilteredQueues);
+                        unfilteredQueues.add((Queue<?>)binding.getDestination());
+                        unfilteredQueues = Collections.unmodifiableList(unfilteredQueues);
+                        if(_filteredQueues.contains(binding.getDestination()))
+                        {
+                            filteredQueues = new ArrayList<>(_filteredQueues);
+                            filteredQueues.remove(binding.getDestination());
+                            filteredQueues = Collections.unmodifiableList(filteredQueues);
+                        }
+                        else
+                        {
+                            filteredQueues = _filteredQueues;
+                        }
+                    }
+                    return new BindingSet(queues, unfilteredQueues, filteredQueues, _filteredBindings);
+                }
         }
 
+        public BindingSet updateBinding(final BindingIdentifier binding, final Map<String, Object> newArguments)
+        {
 
-        _logger.debug("Publishing message to queue {}", result);
-
-        return result;
-
-    }
-
-    @Override
-    protected synchronized void onBindingUpdated(final Binding<?> binding, final Map<String, Object> oldArguments)
-    {
-        Queue<?> queue = binding.getQueue();
+            return removeBinding(binding).addBinding(binding, newArguments);
+        }
 
-        if (binding.getArguments() == null || binding.getArguments().isEmpty() || !FilterSupport.argumentsContainFilter(
-                binding.getArguments()))
+        public BindingSet removeBinding(final BindingIdentifier binding)
         {
-            if(oldArguments != null && !oldArguments.isEmpty() && FilterSupport.argumentsContainFilter(oldArguments))
+            Queue<?> queue = (Queue<?>) binding.getDestination();
+            if(_filteredBindings.get(queue).containsKey(binding))
             {
-                _unfilteredQueues.add(queue);
-                if(_queues.containsKey(queue))
+                final Map<Queue<?>, Map<BindingIdentifier, FilterManager>> filteredBindings = new HashMap<>(_filteredBindings);
+                final Map<BindingIdentifier, FilterManager> bindingsForQueue = new HashMap<>(filteredBindings.remove(queue));
+                bindingsForQueue.remove(binding);
+                List<Queue<?>> filteredQueues;
+                if(bindingsForQueue.isEmpty())
                 {
-                    _queues.put(queue,_queues.get(queue)+1);
+                    filteredQueues = new ArrayList<>(_filteredQueues);
+                    filteredQueues.remove(queue);
+                    filteredQueues = Collections.unmodifiableList(filteredQueues);
                 }
                 else
                 {
-                    _queues.put(queue, ONE);
+                    filteredBindings.put(queue, bindingsForQueue);
+                    filteredQueues = _filteredQueues;
                 }
-
-                // No longer any reason to check filters for this queue
-                _filteredQueues.remove(queue);
+                return new BindingSet(_queues, _unfilteredQueues, filteredQueues, Collections.unmodifiableMap(filteredBindings));
             }
-            // else - nothing has changed, remains unfiltered
-        }
-        else
-        {
-            HashMap<Queue<?>,Map<Binding<?>, FilterManager>> filteredBindings =
-                    new HashMap<>(_filteredBindings.get());
-
-            Map<Binding<?>,FilterManager> bindingsForQueue;
-
-            final FilterManager messageFilter;
-
-            try
-            {
-                messageFilter = FilterSupport.createMessageFilter(binding.getArguments(), binding.getQueue());
-            }
-            catch (AMQInvalidArgumentException e)
-            {
-                _logger.warn("Cannot bind queue " + queue + " to exchange this " + this + " because selector cannot be parsed.", e);
-                return;
-            }
-
-
-            if (oldArguments != null && !oldArguments.isEmpty() && FilterSupport.argumentsContainFilter(oldArguments))
+            else if(_unfilteredQueues.contains(queue))
             {
-                bindingsForQueue = new HashMap<>(filteredBindings.remove(binding.getQueue()));
-            }
-            else // previously unfiltered
-            {
-                bindingsForQueue = new HashMap<>();
-
-                Integer oldValue = _queues.remove(queue);
-                if (ONE.equals(oldValue))
+                Map<MessageDestination, Integer> queues = new HashMap<>(_queues);
+                int count = queues.remove(queue);
+                List<Queue<?>> unfilteredQueues;
+                List<Queue<?>> filteredQueues;
+                if(count > 1)
                 {
-                    // should start checking filters for this queue
-                    _filteredQueues.add(queue);
-                    _unfilteredQueues.remove(queue);
+                    queues.put(queue, --count);
+                    unfilteredQueues = _unfilteredQueues;
+                    filteredQueues = _filteredQueues;
                 }
                 else
                 {
-                    _queues.put(queue, oldValue - 1);
+                    unfilteredQueues = new ArrayList<>(_unfilteredQueues);
+                    unfilteredQueues.remove(queue);
+                    unfilteredQueues = Collections.unmodifiableList(unfilteredQueues);
+                    if(_filteredBindings.containsKey(queue))
+                    {
+                        filteredQueues = new ArrayList<>(_filteredQueues);
+                        filteredQueues.add(queue);
+                        filteredQueues = Collections.unmodifiableList(filteredQueues);
+                    }
+                    else
+                    {
+                        filteredQueues = _filteredQueues;
+                    }
                 }
-
+                return new BindingSet(Collections.unmodifiableMap(queues), unfilteredQueues, filteredQueues, _filteredBindings);
             }
-            bindingsForQueue.put(binding, messageFilter);
-            filteredBindings.put(binding.getQueue(),bindingsForQueue);
+            else
+            {
+                return this;
+            }
+        }
+    }
 
-            _filteredBindings.set(filteredBindings);
 
-        }
+    private volatile BindingSet _bindingSet = new BindingSet();
 
+    /**
+     * Maps from queue name to queue instances
+     */
+
+    @ManagedObjectFactoryConstructor
+    public FanoutExchangeImpl(final Map<String, Object> attributes, final QueueManagingVirtualHost<?> vhost)
+    {
+        super(attributes, vhost);
     }
 
     @Override
-    protected synchronized void onBind(final Binding<?> binding)
+    public ArrayList<BaseQueue> doRoute(ServerMessage payload,
+                                        final String routingKey,
+                                        final InstanceProperties instanceProperties)
     {
-        Queue<?> queue = binding.getQueue();
-        assert queue != null;
-        if(binding.getArguments() == null || binding.getArguments().isEmpty() || !FilterSupport.argumentsContainFilter(binding.getArguments()))
-        {
 
-            Integer oldVal;
-            if(_queues.containsKey(queue))
-            {
-                _queues.put(queue,_queues.get(queue)+1);
-            }
-            else
-            {
-                _queues.put(queue, ONE);
-                _unfilteredQueues.add(queue);
-                // No longer any reason to check filters for this queue
-                _filteredQueues.remove(queue);
-            }
+        BindingSet bindingSet = _bindingSet;
+        final ArrayList<BaseQueue> result = new ArrayList<BaseQueue>(bindingSet._unfilteredQueues);
 
-        }
-        else
+
+        final Map<Queue<?>, Map<BindingIdentifier, FilterManager>> filteredBindings = bindingSet._filteredBindings;
+        if(!bindingSet._filteredQueues.isEmpty())
         {
-            try
+            for(Queue<?> q : bindingSet._filteredQueues)
             {
-
-                HashMap<Queue<?>,Map<Binding<?>, FilterManager>> filteredBindings =
-                        new HashMap<>(_filteredBindings.get());
-
-                Map<Binding<?>, FilterManager> bindingsForQueue = filteredBindings.remove(binding.getQueue());
-                final FilterManager messageFilter =
-                        FilterSupport.createMessageFilter(binding.getArguments(), binding.getQueue());
-
-                if(bindingsForQueue != null)
-                {
-                    bindingsForQueue = new HashMap<>(bindingsForQueue);
-                    bindingsForQueue.put(binding, messageFilter);
-                }
-                else
+                final Map<BindingIdentifier, FilterManager> bindingMessageFilterMap = filteredBindings.get(q);
+                if(!(bindingMessageFilterMap == null || result.contains(q)))
                 {
-                    bindingsForQueue = Collections.<Binding<?>, FilterManager>singletonMap(binding, messageFilter);
-                    if(!_unfilteredQueues.contains(queue))
+                    for(FilterManager filter : bindingMessageFilterMap.values())
                     {
-                        _filteredQueues.add(queue);
+                        if(filter.allAllow(Filterable.Factory.newInstance(payload, instanceProperties)))
+                        {
+                            result.add(q);
+                            break;
+                        }
                     }
                 }
+            }
 
-                filteredBindings.put(binding.getQueue(), bindingsForQueue);
+        }
 
-                _filteredBindings.set(filteredBindings);
 
-            }
-            catch (AMQInvalidArgumentException e)
-            {
-                _logger.warn("Cannot bind queue " + queue + " to exchange this " + this + " because selector cannot be parsed.", e);
-                return;
-            }
-        }
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug("Binding queue " + queue
-                          + " with routing key " + binding.getBindingKey() + " to exchange " + this);
-        }
+        _logger.debug("Publishing message to queue {}", result);
+
+        return result;
+
     }
 
     @Override
-    protected synchronized void onUnbind(final Binding<?> binding)
+    protected void onBindingUpdated(final BindingIdentifier binding,
+                                    final Map<String, Object> newArguments)
     {
-        Queue<?> queue = binding.getQueue();
-        if(binding.getArguments() == null || binding.getArguments().isEmpty() || !FilterSupport.argumentsContainFilter(binding.getArguments()))
-        {
-            Integer oldValue = _queues.remove(queue);
-            if(ONE.equals(oldValue))
-            {
-                // should start checking filters for this queue
-                if(_filteredBindings.get().containsKey(queue))
-                {
-                    _filteredQueues.add(queue);
-                }
-                _unfilteredQueues.remove(queue);
-            }
-            else
-            {
-                _queues.put(queue,oldValue-1);
-            }
-        }
-        else // we are removing a binding with filters
-        {
-            HashMap<Queue<?>,Map<Binding<?>, FilterManager>> filteredBindings =
-                    new HashMap<>(_filteredBindings.get());
+        _bindingSet = _bindingSet.updateBinding(binding, newArguments);
+    }
 
-            Map<Binding<?>,FilterManager> bindingsForQueue = filteredBindings.remove(binding.getQueue());
-            if(bindingsForQueue.size()>1)
-            {
-                bindingsForQueue = new HashMap<>(bindingsForQueue);
-                bindingsForQueue.remove(binding);
-                filteredBindings.put(binding.getQueue(),bindingsForQueue);
-            }
-            else
-            {
-                _filteredQueues.remove(queue);
-            }
-            _filteredBindings.set(filteredBindings);
+    @Override
+    protected void onBind(final BindingIdentifier binding, final Map<String, Object> arguments)
+    {
+        _bindingSet = _bindingSet.addBinding(binding, arguments);
+    }
 
-        }
+    @Override
+    protected void onUnbind(final BindingIdentifier binding)
+    {
+        _bindingSet = _bindingSet.removeBinding(binding);
     }
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java Thu Dec 22 17:59:46 2016
@@ -35,7 +35,7 @@ import org.apache.qpid.server.filter.Fil
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.Queue;
 
 /**
  * Defines binding and matching based on a set of headers.
@@ -45,7 +45,7 @@ class HeadersBinding
     private static final Logger _logger = LoggerFactory.getLogger(HeadersBinding.class);
 
     private final Map<String,Object> _mappings;
-    private final Binding<?> _binding;
+    private final AbstractExchange.BindingIdentifier _binding;
     private final Set<String> required = new HashSet<String>();
     private final Map<String,Object> matches = new HashMap<String,Object>();
     private boolean matchAny;
@@ -59,12 +59,11 @@ class HeadersBinding
      *
      * @param binding the binding to create a header binding using
      */
-    public HeadersBinding(Binding<?> binding)
+    public HeadersBinding(AbstractExchange.BindingIdentifier binding, Map<String,Object> arguments)
     {
         _binding = binding;
         if(_binding !=null)
         {
-            Map<String, Object> arguments = _binding.getArguments();
             _mappings = arguments == null ? Collections.<String,Object>emptyMap() : arguments;
             initMappings();
         }
@@ -80,13 +79,12 @@ class HeadersBinding
         {
             try
             {
-                _filter = FilterSupport.createMessageFilter(_mappings,_binding.getQueue());
+                _filter = FilterSupport.createMessageFilter(_mappings, (Queue<?>) _binding.getDestination());
             }
             catch (AMQInvalidArgumentException e)
             {
-                _logger.warn("Invalid filter in binding queue '"+_binding.getQueue().getName()
-                             +"' to exchange '"+_binding.getExchange().getName()
-                             +"' with arguments: " + _binding.getArguments());
+                _logger.warn("Invalid filter in binding queue '"+_binding.getDestination().getName()
+                             +"' with arguments: " + _mappings);
                 _filter = new FilterManager();
 
                 _filter.add("x-exclude-all", new ExcludeAllFilter());
@@ -111,7 +109,7 @@ class HeadersBinding
         }
     }
 
-    public Binding<?> getBinding()
+    public AbstractExchange.BindingIdentifier getBinding()
     {
         return _binding;
     }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java Thu Dec 22 17:59:46 2016
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.exchange;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.LinkedHashSet;
 import java.util.ListIterator;
 import java.util.Map;
@@ -32,12 +33,9 @@ import java.util.concurrent.CopyOnWriteA
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.Binding;
-import org.apache.qpid.server.model.ManagedObject;
 import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.queue.BaseQueue;
@@ -75,7 +73,7 @@ public class HeadersExchangeImpl extends
 
     private static final Logger _logger = LoggerFactory.getLogger(HeadersExchangeImpl.class);
 
-    private final ConcurrentMap<String, CopyOnWriteArraySet<Binding<?>>> _bindingsByKey =
+    private final ConcurrentMap<String, CopyOnWriteArraySet<BindingIdentifier>> _bindingsByKey =
                             new ConcurrentHashMap<>();
 
     private final CopyOnWriteArrayList<HeadersBinding> _bindingHeaderMatchers =
@@ -100,16 +98,15 @@ public class HeadersExchangeImpl extends
         {
             if (hb.matches(Filterable.Factory.newInstance(payload,instanceProperties)))
             {
-                Binding<?> b = hb.getBinding();
+                BindingIdentifier b = hb.getBinding();
 
-                b.incrementMatches();
 
                 if (_logger.isDebugEnabled())
                 {
                     _logger.debug("Exchange " + getName() + ": delivering message with headers " +
-                                  payload.getMessageHeader() + " to " + b.getQueue().getName());
+                                  payload.getMessageHeader() + " to " + b.getDestination().getName());
                 }
-                queues.add(b.getQueue());
+                queues.add((BaseQueue) b.getDestination());
             }
         }
 
@@ -117,20 +114,17 @@ public class HeadersExchangeImpl extends
     }
 
     @Override
-    protected void onBind(final Binding<?> binding)
+    protected void onBind(final BindingIdentifier binding, Map<String,Object> arguments)
     {
         String bindingKey = binding.getBindingKey();
-        Queue<?> queue = binding.getQueue();
+        Queue<?> queue = (Queue<?>) binding.getDestination();
 
-        assert queue != null;
-        assert bindingKey != null;
-
-        CopyOnWriteArraySet<Binding<?>> bindings = _bindingsByKey.get(bindingKey);
+        CopyOnWriteArraySet<BindingIdentifier> bindings = _bindingsByKey.get(bindingKey);
 
         if(bindings == null)
         {
             bindings = new CopyOnWriteArraySet<>();
-            CopyOnWriteArraySet<Binding<?>> newBindings;
+            CopyOnWriteArraySet<BindingIdentifier> newBindings;
             if((newBindings = _bindingsByKey.putIfAbsent(bindingKey, bindings)) != null)
             {
                 bindings = newBindings;
@@ -140,18 +134,18 @@ public class HeadersExchangeImpl extends
         if(_logger.isDebugEnabled())
         {
             _logger.debug("Exchange " + getName() + ": Binding " + queue.getName() +
-                          " with binding key '" +bindingKey + "' and args: " + binding.getArguments());
+                          " with binding key '" +bindingKey + "' and args: " + arguments);
         }
 
-        _bindingHeaderMatchers.add(new HeadersBinding(binding));
+        _bindingHeaderMatchers.add(new HeadersBinding(binding, arguments));
         bindings.add(binding);
 
     }
 
     @Override
-    protected void onBindingUpdated(final Binding<?> binding, final Map<String, Object> oldArguments)
+    protected void onBindingUpdated(final BindingIdentifier binding, final Map<String, Object> arguments)
     {
-        HeadersBinding headersBinding = new HeadersBinding(binding);
+        HeadersBinding headersBinding = new HeadersBinding(binding, arguments);
         ListIterator<HeadersBinding> iter = _bindingHeaderMatchers.listIterator();
         while(iter.hasNext())
         {
@@ -163,17 +157,17 @@ public class HeadersExchangeImpl extends
 
     }
 
-    protected void onUnbind(final Binding<?> binding)
+    protected void onUnbind(final BindingIdentifier binding)
     {
         assert binding != null;
 
-        CopyOnWriteArraySet<Binding<?>> bindings = _bindingsByKey.get(binding.getBindingKey());
+        CopyOnWriteArraySet<BindingIdentifier> bindings = _bindingsByKey.get(binding.getBindingKey());
         if(bindings != null)
         {
             bindings.remove(binding);
         }
 
-        boolean removedBinding = _bindingHeaderMatchers.remove(new HeadersBinding(binding));
+        boolean removedBinding = _bindingHeaderMatchers.remove(new HeadersBinding(binding, Collections.<String,Object>emptyMap()));
         _logger.debug("Removing Binding: {}", removedBinding);
 
     }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java Thu Dec 22 17:59:46 2016
@@ -56,7 +56,7 @@ class TopicExchangeImpl extends Abstract
     private final Map<String, TopicExchangeResult> _topicExchangeResults =
             new ConcurrentHashMap<String, TopicExchangeResult>();
 
-    private final Map<Binding<?>, Map<String,Object>> _bindings = new HashMap<>();
+    private final Map<BindingIdentifier, Map<String,Object>> _bindings = new HashMap<>();
 
     @ManagedObjectFactoryConstructor
     public TopicExchangeImpl(final Map<String,Object> attributes, final QueueManagingVirtualHost<?> vhost)
@@ -65,14 +65,10 @@ class TopicExchangeImpl extends Abstract
     }
 
     @Override
-    protected synchronized void onBindingUpdated(final Binding<?> binding, final Map<String, Object> oldArguments)
+    protected synchronized void onBindingUpdated(final BindingIdentifier binding, final Map<String, Object> newArguments)
     {
         final String bindingKey = binding.getBindingKey();
-        Queue<?> queue = binding.getQueue();
-        Map<String,Object> args = binding.getArguments();
-
-        assert queue != null;
-        assert bindingKey != null;
+        Queue<?> queue = (Queue<?>) binding.getDestination();
 
         _logger.debug("Updating binding of queue {} with routing key {}", queue.getName(), bindingKey);
 
@@ -84,21 +80,20 @@ class TopicExchangeImpl extends Abstract
 
             if (_bindings.containsKey(binding))
             {
-                Map<String, Object> oldArgs = _bindings.get(binding);
-                _bindings.put(binding, args);
+                Map<String, Object> oldArgs = _bindings.put(binding, newArguments);
                 TopicExchangeResult result = _topicExchangeResults.get(routingKey);
 
-                if (FilterSupport.argumentsContainFilter(args))
+                if (FilterSupport.argumentsContainFilter(newArguments))
                 {
                     if (FilterSupport.argumentsContainFilter(oldArgs))
                     {
                         result.replaceQueueFilter(queue,
                                                   FilterSupport.createMessageFilter(oldArgs, queue),
-                                                  FilterSupport.createMessageFilter(args, queue));
+                                                  FilterSupport.createMessageFilter(newArguments, queue));
                     }
                     else
                     {
-                        result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue));
+                        result.addFilteredQueue(queue, FilterSupport.createMessageFilter(newArguments, queue));
                         result.removeUnfilteredQueue(queue);
                     }
                 }
@@ -126,14 +121,10 @@ class TopicExchangeImpl extends Abstract
 
     }
 
-    protected synchronized void registerQueue(final Binding<?> binding) throws AMQInvalidArgumentException
+    protected synchronized void registerQueue(final BindingIdentifier binding, Map<String,Object> arguments) throws AMQInvalidArgumentException
     {
         final String bindingKey = binding.getBindingKey();
-        Queue<?> queue = binding.getQueue();
-        Map<String,Object> args = binding.getArguments();
-
-        assert queue != null;
-        assert bindingKey != null;
+        Queue<?> queue = (Queue<?>) binding.getDestination();
 
         _logger.debug("Registering queue {} with routing key {}", queue.getName(), bindingKey);
 
@@ -142,20 +133,20 @@ class TopicExchangeImpl extends Abstract
 
         if(_bindings.containsKey(binding))
         {
-            Map<String,Object> oldArgs = _bindings.get(binding);
+            Map<String,Object> oldArgs = _bindings.put(binding, arguments);
             TopicExchangeResult result = _topicExchangeResults.get(routingKey);
 
-            if(FilterSupport.argumentsContainFilter(args))
+            if(FilterSupport.argumentsContainFilter(arguments))
             {
                 if(FilterSupport.argumentsContainFilter(oldArgs))
                 {
                     result.replaceQueueFilter(queue,
                                               FilterSupport.createMessageFilter(oldArgs, queue),
-                                              FilterSupport.createMessageFilter(args, queue));
+                                              FilterSupport.createMessageFilter(arguments, queue));
                 }
                 else
                 {
-                    result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue));
+                    result.addFilteredQueue(queue, FilterSupport.createMessageFilter(arguments, queue));
                     result.removeUnfilteredQueue(queue);
                 }
             }
@@ -183,9 +174,9 @@ class TopicExchangeImpl extends Abstract
             if(result == null)
             {
                 result = new TopicExchangeResult();
-                if(FilterSupport.argumentsContainFilter(args))
+                if(FilterSupport.argumentsContainFilter(arguments))
                 {
-                    result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue));
+                    result.addFilteredQueue(queue, FilterSupport.createMessageFilter(arguments, queue));
                 }
                 else
                 {
@@ -196,9 +187,9 @@ class TopicExchangeImpl extends Abstract
             }
             else
             {
-                if(FilterSupport.argumentsContainFilter(args))
+                if(FilterSupport.argumentsContainFilter(arguments))
                 {
-                    result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue));
+                    result.addFilteredQueue(queue, FilterSupport.createMessageFilter(arguments, queue));
                 }
                 else
                 {
@@ -207,7 +198,7 @@ class TopicExchangeImpl extends Abstract
             }
 
             result.addBinding(binding);
-            _bindings.put(binding, args);
+            _bindings.put(binding, arguments);
         }
 
     }
@@ -246,7 +237,7 @@ class TopicExchangeImpl extends Abstract
 
     }
 
-    private synchronized boolean deregisterQueue(final Binding<?> binding)
+    private synchronized boolean deregisterQueue(final BindingIdentifier binding)
     {
         if(_bindings.containsKey(binding))
         {
@@ -263,8 +254,8 @@ class TopicExchangeImpl extends Abstract
             {
                 try
                 {
-                    result.removeFilteredQueue(binding.getQueue(), FilterSupport.createMessageFilter(bindingArgs,
-                            binding.getQueue()));
+                    result.removeFilteredQueue((Queue<?>) binding.getDestination(), FilterSupport.createMessageFilter(bindingArgs,
+                                                                                                                      (Queue<?>) binding.getDestination()));
                 }
                 catch (AMQInvalidArgumentException e)
                 {
@@ -273,7 +264,7 @@ class TopicExchangeImpl extends Abstract
             }
             else
             {
-                result.removeUnfilteredQueue(binding.getQueue());
+                result.removeUnfilteredQueue((Queue<?>) binding.getDestination());
             }
             return true;
         }
@@ -301,11 +292,6 @@ class TopicExchangeImpl extends Abstract
                 {
                     TopicExchangeResult res = (TopicExchangeResult)result;
 
-                    for(Binding<?> b : res.getBindings())
-                    {
-                        b.incrementMatches();
-                    }
-
                     queues = res.processMessage(message, queues);
                 }
                 return queues;
@@ -314,11 +300,12 @@ class TopicExchangeImpl extends Abstract
 
     }
 
-    protected void onBind(final Binding<?> binding)
+    @Override
+    protected void onBind(final BindingIdentifier binding, Map<String, Object> arguments)
     {
         try
         {
-            registerQueue(binding);
+            registerQueue(binding, arguments);
         }
         catch (AMQInvalidArgumentException e)
         {
@@ -327,7 +314,8 @@ class TopicExchangeImpl extends Abstract
         }
     }
 
-    protected void onUnbind(final Binding<?> binding)
+    @Override
+    protected void onUnbind(final BindingIdentifier binding)
     {
         deregisterQueue(binding);
     }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java Thu Dec 22 17:59:46 2016
@@ -30,14 +30,14 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 
+import org.apache.qpid.server.exchange.AbstractExchange;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.Filterable;
-import org.apache.qpid.server.model.Binding;
 import org.apache.qpid.server.model.Queue;
 
 public final class TopicExchangeResult implements TopicMatcherResult
 {
-    private final List<Binding<?>> _bindings = new CopyOnWriteArrayList<>();
+    private final List<AbstractExchange.BindingIdentifier> _bindings = new CopyOnWriteArrayList<>();
     private final Map<Queue<?>, Integer> _unfilteredQueues = new ConcurrentHashMap<>();
     private final ConcurrentMap<Queue<?>, Map<FilterManager,Integer>> _filteredQueues = new ConcurrentHashMap<>();
     private volatile ArrayList<Queue<?>> _unfilteredQueueList = new ArrayList<>(0);
@@ -81,17 +81,17 @@ public final class TopicExchangeResult i
         return _unfilteredQueues.keySet();
     }
 
-    public void addBinding(Binding<?> binding)
+    public void addBinding(AbstractExchange.BindingIdentifier binding)
     {
         _bindings.add(binding);
     }
     
-    public void removeBinding(Binding<?> binding)
+    public void removeBinding(AbstractExchange.BindingIdentifier binding)
     {
         _bindings.remove(binding);
     }
     
-    public List<Binding<?>> getBindings()
+    public List<AbstractExchange.BindingIdentifier> getBindings()
     {
         return new ArrayList<>(_bindings);
     }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BindingMessages.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BindingMessages.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BindingMessages.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BindingMessages.java Thu Dec 22 17:59:46 2016
@@ -79,37 +79,14 @@ public class BindingMessages
 
     /**
      * Log a Binding message of the Format:
-     * <pre>BND-1001 : Create[ : Arguments : {0}]</pre>
+     * <pre>BND-1001 : Create : {0}</pre>
      * Optional values are contained in [square brackets] and are numbered
      * sequentially in the method call.
      *
      */
-    public static LogMessage CREATED(String param1, boolean opt1)
+    public static LogMessage CREATED(String param1)
     {
         String rawMessage = _messages.getString("CREATED");
-        StringBuffer msg = new StringBuffer();
-
-        // Split the formatted message up on the option values so we can
-        // rebuild the message based on the configured options.
-        String[] parts = rawMessage.split("\\[");
-        msg.append(parts[0]);
-
-        int end;
-        if (parts.length > 1)
-        {
-
-            // Add Option : : Arguments : {0}.
-            end = parts[1].indexOf(']');
-            if (opt1)
-            {
-                msg.append(parts[1].substring(0, end));
-            }
-
-            // Use 'end + 1' to remove the ']' from the output
-            msg.append(parts[1].substring(end + 1));
-        }
-
-        rawMessage = msg.toString();
 
         final Object[] messageArguments = {param1};
         // Create a new MessageFormat to ensure thread safety.
@@ -160,16 +137,21 @@ public class BindingMessages
 
     /**
      * Log a Binding message of the Format:
-     * <pre>BND-1002 : Deleted</pre>
+     * <pre>BND-1002 : Deleted : {0}</pre>
      * Optional values are contained in [square brackets] and are numbered
      * sequentially in the method call.
      *
      */
-    public static LogMessage DELETED()
+    public static LogMessage DELETED(String param1)
     {
         String rawMessage = _messages.getString("DELETED");
 
-        final String message = rawMessage;
+        final Object[] messageArguments = {param1};
+        // Create a new MessageFormat to ensure thread safety.
+        // Sharing a MessageFormat and using applyPattern is not thread safe
+        MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+        final String message = formatter.format(messageArguments);
 
         return new LogMessage()
         {

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Binding_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Binding_logmessages.properties?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Binding_logmessages.properties (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Binding_logmessages.properties Thu Dec 22 17:59:46 2016
@@ -18,7 +18,7 @@
 #
 # Default File used for all non-defined locales.
 #
-CREATED = BND-1001 : Create[ : Arguments : {0}]
-DELETED = BND-1002 : Deleted
+CREATED = BND-1001 : Create : {0}
+DELETED = BND-1002 : Deleted : {0}
 # 0 - operation name
 OPERATION = BND-1003 : Operation : {0}

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java Thu Dec 22 17:59:46 2016
@@ -54,4 +54,9 @@ public interface MessageDestination exte
                                                                           InstanceProperties instanceProperties,
                                                                           ServerTransaction txn,
                                                                           Action<? super MessageInstance> postEnqueueAction);
+
+    boolean isDurable();
+
+    void linkAdded(MessageSender sender, String linkName);
+    void linkRemoved(MessageSender sender, String linkName);
 }

Copied: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSender.java (from r1775646, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSender.java?p2=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSender.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java&r1=1775646&r2=1775689&rev=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSender.java Thu Dec 22 17:59:46 2016
@@ -18,34 +18,9 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.model;
+package org.apache.qpid.server.message;
 
-import java.util.Map;
-
-@ManagedObject( amqpName = "org.apache.qpid.Binding")
-public interface Binding<X extends Binding<X>> extends ConfiguredObject<X>
+public interface MessageSender
 {
-
-    public String ARGUMENTS = "arguments";
-    public String STATE = "state";
-    public String QUEUE = "queue";
-    public String EXCHANGE = "exchange";
-
-    String getBindingKey();
-
-    // TODO - this is a hack
-    @DerivedAttribute
-    Queue<?> getQueue();
-
-    // TODO - this is a hack
-    @DerivedAttribute
-    Exchange<?> getExchange();
-
-    @ManagedAttribute
-    Map<String,Object> getArguments();
-
-    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.COUNT, label = "Matches")
-    long getMatches();
-
-    void incrementMatches();
+    void destinationRemoved(MessageDestination destination);
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java Thu Dec 22 17:59:46 2016
@@ -22,30 +22,10 @@ package org.apache.qpid.server.model;
 
 import java.util.Map;
 
-@ManagedObject( amqpName = "org.apache.qpid.Binding")
-public interface Binding<X extends Binding<X>> extends ConfiguredObject<X>
+@ManagedAttributeValueType
+public interface Binding extends ManagedAttributeValue
 {
-
-    public String ARGUMENTS = "arguments";
-    public String STATE = "state";
-    public String QUEUE = "queue";
-    public String EXCHANGE = "exchange";
-
     String getBindingKey();
-
-    // TODO - this is a hack
-    @DerivedAttribute
-    Queue<?> getQueue();
-
-    // TODO - this is a hack
-    @DerivedAttribute
-    Exchange<?> getExchange();
-
-    @ManagedAttribute
+    String getDestination();
     Map<String,Object> getArguments();
-
-    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.COUNT, label = "Matches")
-    long getMatches();
-
-    void incrementMatches();
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java Thu Dec 22 17:59:46 2016
@@ -53,9 +53,10 @@ public final class BrokerModel extends M
      *     Bring model version and Qpid version into sync
      * 6.1 Remove JMX
      *     Remove PreferencesProvider
+     * 7.0 Remove bindings, Consumer sole parent is Queue
      */
-    public static final int MODEL_MAJOR_VERSION = 6;
-    public static final int MODEL_MINOR_VERSION = 1;
+    public static final int MODEL_MAJOR_VERSION = 7;
+    public static final int MODEL_MINOR_VERSION = 0;
     public static final String MODEL_VERSION = MODEL_MAJOR_VERSION + "." + MODEL_MINOR_VERSION;
     private static final Model MODEL_INSTANCE = new BrokerModel();
     private final Map<Class<? extends ConfiguredObject>, Collection<Class<? extends ConfiguredObject>>> _parents =
@@ -109,13 +110,8 @@ public final class BrokerModel extends M
 
         addRelationship(Connection.class, Session.class);
 
-        addRelationship(Queue.class, Binding.class);
         addRelationship(Queue.class, Consumer.class);
 
-        addRelationship(Exchange.class, Binding.class);
-
-        addRelationship(Session.class, Consumer.class);
-
         _objectFactory = new ConfiguredObjectFactoryImpl(this);
         _typeRegistry = new ConfiguredObjectTypeRegistry((new QpidServiceLoader()).instancesOf(ConfiguredObjectRegistration.class),
                                                          (new QpidServiceLoader()).instancesOf(ConfiguredObjectAttributeInjector.class),

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedMethodAttribute.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedMethodAttribute.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedMethodAttribute.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedMethodAttribute.java Thu Dec 22 17:59:46 2016
@@ -96,4 +96,11 @@ public class ConfiguredDerivedMethodAttr
         return _secureValuePattern;
     }
 
+    public T convertValue(Object input, C configuredObject)
+    {
+        final AttributeValueConverter<T> converter =
+                AttributeValueConverter.getConverter(getType(), getGetter().getGenericReturnType());
+        return converter.convert(input, configuredObject);
+    }
+
 }

Copied: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/DoOnConfigThread.java (from r1775646, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/DoOnConfigThread.java?p2=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/DoOnConfigThread.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java&r1=1775646&r2=1775689&rev=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/DoOnConfigThread.java Thu Dec 22 17:59:46 2016
@@ -20,32 +20,13 @@
  */
 package org.apache.qpid.server.model;
 
-import java.util.Map;
-
-@ManagedObject( amqpName = "org.apache.qpid.Binding")
-public interface Binding<X extends Binding<X>> extends ConfiguredObject<X>
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.SOURCE)
+@Target(ElementType.METHOD)
+public @interface DoOnConfigThread
 {
-
-    public String ARGUMENTS = "arguments";
-    public String STATE = "state";
-    public String QUEUE = "queue";
-    public String EXCHANGE = "exchange";
-
-    String getBindingKey();
-
-    // TODO - this is a hack
-    @DerivedAttribute
-    Queue<?> getQueue();
-
-    // TODO - this is a hack
-    @DerivedAttribute
-    Exchange<?> getExchange();
-
-    @ManagedAttribute
-    Map<String,Object> getArguments();
-
-    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.COUNT, label = "Matches")
-    long getMatches();
-
-    void incrementMatches();
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java Thu Dec 22 17:59:46 2016
@@ -23,17 +23,17 @@ package org.apache.qpid.server.model;
 import java.util.Collection;
 import java.util.Map;
 
-import com.google.common.util.concurrent.ListenableFuture;
-
 import org.apache.qpid.server.exchange.ExchangeReferrer;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageSender;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 
 @ManagedObject( description = Exchange.CLASS_DESCRIPTION,
         amqpName = "org.apache.qpid.Exchange"
         )
 public interface Exchange<X extends Exchange<X>> extends ConfiguredObject<X>, MessageDestination,
-                                                         ExchangeReferrer
+                                                         ExchangeReferrer, MessageSender
 {
     String CLASS_DESCRIPTION = "<p>An Exchange is a named entity within the Virtualhost which receives messages from "
                                + "producers and routes them to matching Queues within the Virtualhost.</p>"
@@ -41,6 +41,7 @@ public interface Exchange<X extends Exch
                                + "a different routing algorithm.</p>";
 
     String ALTERNATE_EXCHANGE                   = "alternateExchange";
+    String DURABLE_BINDINGS = "durableBindings";
 
     enum UnroutableMessageBehaviour
     {
@@ -55,9 +56,13 @@ public interface Exchange<X extends Exch
     @ManagedAttribute(description = "(AMQP 1.0 only) Default behaviour to apply when a message is not routed to any queues", defaultValue = "DISCARD")
     UnroutableMessageBehaviour getUnroutableMessageBehaviour();
 
-    //children
-    @ManagedOperation(nonModifying = true, changesConfiguredObjectState = false)
-    Collection<Binding<?>> getBindings();
+    @DerivedAttribute
+    Collection<Binding> getBindings();
+
+    Collection<Binding> getBindingsForDestination(MessageDestination destination);
+
+    @DerivedAttribute(persist = true)
+    Collection<Binding> getDurableBindings();
 
     // Statistics
     @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.COUNT, label = "Bindings")
@@ -77,24 +82,40 @@ public interface Exchange<X extends Exch
 
 
     @ManagedOperation(changesConfiguredObjectState = true)
-    void bind(@Param(name="queue", mandatory = true) Queue<?> queue,
-              @Param(name="bindingKey") String bindingKey,
-              @Param(name="arguments", defaultValue = "{}") Map<String,Object> arguments);
+    boolean bind(@Param(name = "destination", mandatory = true) String destination,
+                 @Param(name = "bindingKey") String bindingKey,
+                 @Param(name = "arguments", defaultValue = "{}") Map<String, Object> arguments,
+                 @Param(name = "replaceExistingArguments", defaultValue = "false") boolean replaceExistingArguments);
+
+    @ManagedOperation(changesConfiguredObjectState = true)
+    boolean unbind(@Param(name="destination", mandatory = true) String destination,
+                   @Param(name="bindingKey") String bindingKey);
+
 
     /**
      * @return true if the exchange will be deleted after all queues have been detached
      */
     boolean isAutoDelete();
 
-    boolean addBinding(String bindingKey, Queue<?> queue, Map<String, Object> arguments);
+    @DoOnConfigThread
+    boolean addBinding(@Param(name = "bindingKey") String bindingKey,
+                       @Param(name = "queue") Queue<?> queue,
+                       @Param(name = "arguments") Map<String, Object> arguments);
+
+    @DoOnConfigThread
+    boolean deleteBinding(@Param(name = "bindingKey") String bindingKey,
+                          @Param(name = "queue") Queue<?> queue);
+
+    @DoOnConfigThread
+    boolean hasBinding(@Param(name = "bindingKey") String bindingKey,
+                       @Param(name = "queue") Queue<?> queue);
+
+    @DoOnConfigThread
+    void replaceBinding(@Param(name = "bindingKey") String bindingKey,
+                        @Param(name = "queue") Queue<?> queue,
+                        @Param(name = "arguments") Map<String, Object> arguments);
 
-    boolean deleteBinding(String bindingKey, Queue<?> queue);
-
-    boolean hasBinding(String bindingKey, Queue<?> queue);
-
-    boolean replaceBinding(String bindingKey,
-                           Queue<?> queue,
-                           Map<String, Object> arguments);
+    QueueManagingVirtualHost<?> getVirtualHost();
 
     /**
      * Determines whether a message would be isBound to a particular queue using a specific routing key and arguments
@@ -103,8 +124,10 @@ public interface Exchange<X extends Exch
      * @param queue
      * @return
      */
-
-    boolean isBound(String bindingKey, Map<String, Object> arguments, Queue<?> queue);
+    @DoOnConfigThread
+    boolean isBound(@Param(name = "bindingKey") String bindingKey,
+                    @Param(name = "arguments") Map<String, Object> arguments,
+                    @Param(name = "queue") Queue<?> queue);
 
     /**
      * Determines whether a message would be isBound to a particular queue using a specific routing key
@@ -112,29 +135,38 @@ public interface Exchange<X extends Exch
      * @param queue
      * @return
      */
-
-    boolean isBound(String bindingKey, Queue<?> queue);
+    @DoOnConfigThread
+    boolean isBound(@Param(name = "bindingKey") String bindingKey,
+                    @Param(name = "queue") Queue<?> queue);
 
     /**
      * Determines whether a message is routing to any queue using a specific _routing key
      * @param bindingKey
      * @return
      */
-    boolean isBound(String bindingKey);
+    @DoOnConfigThread
+    boolean isBound(@Param(name = "bindingKey") String bindingKey);
 
     /**
      * Returns true if this exchange has at least one binding associated with it.
      * @return
      */
+    @DoOnConfigThread
     boolean hasBindings();
 
-    boolean isBound(Queue<?> queue);
-
-    boolean isBound(Map<String, Object> arguments);
+    @DoOnConfigThread
+    boolean isBound(@Param(name = "queue") Queue<?> queue);
 
-    boolean isBound(String bindingKey, Map<String, Object> arguments);
+    @DoOnConfigThread
+    boolean isBound(@Param(name = "arguments") Map<String, Object> arguments);
 
-    boolean isBound(Map<String, Object> arguments, Queue<?> queue);
+    @DoOnConfigThread
+    boolean isBound(@Param(name = "bindingKey") String bindingKey,
+                    @Param(name = "arguments") Map<String, Object> arguments);
+
+    @DoOnConfigThread
+    boolean isBound(@Param(name = "arguments") Map<String, Object> arguments,
+                    @Param(name = "queue") Queue<?> queue);
 
     void removeReference(ExchangeReferrer exchange);
 
@@ -142,9 +174,6 @@ public interface Exchange<X extends Exch
 
     boolean hasReferrers();
 
-    ListenableFuture<Void> removeBindingAsync(Binding<?> binding);
-
     EventLogger getEventLogger();
 
-    void addBinding(Binding<?> binding);
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Thu Dec 22 17:59:46 2016
@@ -260,7 +260,7 @@ public interface Queue<X extends Queue<X
 
     //children
     @ManagedOperation(nonModifying = true, changesConfiguredObjectState = false)
-    Collection<Binding<?>> getBindings();
+    Collection<Binding> getBindings();
 
 
     @ManagedOperation(nonModifying = true, changesConfiguredObjectState = false)
@@ -420,10 +420,6 @@ public interface Queue<X extends Queue<X
 
     boolean isExclusive();
 
-    void addBinding(Binding<?> binding);
-
-    void removeBinding(Binding<?> binding);
-
     LogSubject getLogSubject();
 
     VirtualHost<?> getVirtualHost();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java Thu Dec 22 17:59:46 2016
@@ -22,7 +22,6 @@ package org.apache.qpid.server.model.ada
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -36,7 +35,6 @@ import com.google.common.util.concurrent
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
-import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.NamedAddressSpace;
@@ -132,19 +130,6 @@ public final class SessionAdapter extend
     }
 
     @Override
-    public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
-    {
-        if (clazz == org.apache.qpid.server.model.Consumer.class)
-        {
-            return (Collection<C>) getConsumers();
-        }
-        else
-        {
-            return Collections.emptySet();
-        }
-    }
-
-    @Override
     public long getConsumerCount()
     {
         return _session.getConsumerCount();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Thu Dec 22 17:59:46 2016
@@ -60,7 +60,6 @@ import javax.security.auth.Subject;
 
 import com.google.common.collect.Lists;
 import com.google.common.io.ByteStreams;
-import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
@@ -91,6 +90,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageInfoImpl;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageSender;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.model.*;
@@ -224,7 +224,7 @@ public abstract class AbstractQueue<X ex
 
     private final AtomicBoolean _overfull = new AtomicBoolean(false);
     private final FlowToDiskChecker _flowToDiskChecker = new FlowToDiskChecker();
-    private final CopyOnWriteArrayList<Binding<?>> _bindings = new CopyOnWriteArrayList<>();
+    private final CopyOnWriteArrayList<Binding> _bindings = new CopyOnWriteArrayList<>();
     private Map<String, Object> _arguments;
 
     /** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */
@@ -233,6 +233,9 @@ public abstract class AbstractQueue<X ex
 
     private MessageGroupManager _messageGroupManager;
 
+    private final ConcurrentMap<MessageSender, Integer> _linkedSenders = new ConcurrentHashMap<>();
+
+
     private QueueNotificationListener  _notificationListener = NULL_NOTIFICATION_LISTENER;
     private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
 
@@ -1030,26 +1033,15 @@ public abstract class AbstractQueue<X ex
     }
 
 
-    public void addBinding(final Binding<?> binding)
-    {
-        _bindings.add(binding);
-        childAdded(binding);
-    }
-
-    public void removeBinding(final Binding<?> binding)
+    public Collection<Binding> getBindings()
     {
-        _bindings.remove(binding);
-        childRemoved(binding);
-    }
-
-    public Collection<Binding<?>> getBindings()
-    {
-        return getBindingsImpl();
-    }
-
-    private Collection<Binding<?>> getBindingsImpl()
-    {
-        return Collections.unmodifiableList(_bindings);
+        List<Binding> bindings = new ArrayList<>();
+        for(MessageSender sender : _linkedSenders.keySet())
+        {
+            //TODO - eliminate cast
+            bindings.addAll(((Exchange)sender).getBindingsForDestination(this));
+        }
+        return bindings;
     }
 
     public int getBindingCount()
@@ -1651,68 +1643,50 @@ public abstract class AbstractQueue<X ex
         if (_deleted.compareAndSet(false, true))
         {
             final int queueDepthMessages = getQueueDepthMessages();
-            final List<ListenableFuture<Void>> removeBindingFutures = new ArrayList<>(_bindings.size());
-            final ArrayList<Binding<?>> bindingCopy = new ArrayList<>(_bindings);
 
-            // TODO - RG - Need to sort out bindings!
-            for (Binding<?> b : bindingCopy)
+            for(MessageSender sender : _linkedSenders.keySet())
             {
-                removeBindingFutures.add(b.deleteAsync());
+                sender.destinationRemoved(this);
             }
 
-            ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(removeBindingFutures);
-
-            addFutureCallback(combinedFuture, new FutureCallback<List<Void>>()
+            try
             {
-                @Override
-                public void onSuccess(final List<Void> result)
-                {
-                    try
-                    {
 
-                        Iterator<QueueConsumer<?,?>> consumerIterator = _queueConsumerManager.getAllIterator();
+                Iterator<QueueConsumer<?,?>> consumerIterator = _queueConsumerManager.getAllIterator();
 
-                        while (consumerIterator.hasNext())
-                        {
-                            QueueConsumer<?,?> consumer = consumerIterator.next();
+                while (consumerIterator.hasNext())
+                {
+                    QueueConsumer<?,?> consumer = consumerIterator.next();
 
-                            if (consumer != null)
-                            {
-                                consumer.queueDeleted();
-                            }
-                        }
+                    if (consumer != null)
+                    {
+                        consumer.queueDeleted();
+                    }
+                }
 
-                        final List<QueueEntry> entries = getMessagesOnTheQueue(new AcquireAllQueueEntryFilter());
+                final List<QueueEntry> entries = getMessagesOnTheQueue(new AcquireAllQueueEntryFilter());
 
-                        routeToAlternate(entries);
+                routeToAlternate(entries);
 
-                        preSetAlternateExchange();
-                        _alternateExchange = null;
+                preSetAlternateExchange();
+                _alternateExchange = null;
 
-                        _stopped.set(true);
-                        _queueHouseKeepingTask.cancel();
+                _stopped.set(true);
+                _queueHouseKeepingTask.cancel();
 
-                        performQueueDeleteTasks();
-                        deleted();
+                performQueueDeleteTasks();
+                deleted();
 
-                        //Log Queue Deletion
-                        getEventLogger().message(_logSubject, QueueMessages.DELETED(getId().toString()));
+                //Log Queue Deletion
+                getEventLogger().message(_logSubject, QueueMessages.DELETED(getId().toString()));
 
-                        _deleteFuture.set(queueDepthMessages);
-                        setState(State.DELETED);
-                    }
-                    catch(Throwable e)
-                    {
-                        _deleteFuture.setException(e);
-                    }
-                }
-
-                @Override
-                public void onFailure(final Throwable t)
-                {
-                    _deleteFuture.setException(t);
-                }
-            }, getTaskExecutor());
+                _deleteFuture.set(queueDepthMessages);
+                setState(State.DELETED);
+            }
+            catch(Throwable e)
+            {
+                _deleteFuture.setException(e);
+            }
 
         }
         return _deleteFuture;
@@ -2957,11 +2931,7 @@ public abstract class AbstractQueue<X ex
     @Override
     public <C extends ConfiguredObject> Collection<C> getChildren(final Class<C> clazz)
     {
-        if(clazz == Binding.class)
-        {
-            return (Collection<C>) getBindingsImpl();
-        }
-        else if(clazz == org.apache.qpid.server.model.Consumer.class)
+        if(clazz == org.apache.qpid.server.model.Consumer.class)
         {
             return _queueConsumerManager == null
                     ? Collections.<C>emptySet()
@@ -2971,28 +2941,6 @@ public abstract class AbstractQueue<X ex
     }
 
     @Override
-    protected <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(final Class<C> childClass,
-                                                      final Map<String, Object> attributes,
-                                                      final ConfiguredObject... otherParents)
-    {
-        if(childClass == Binding.class && otherParents.length == 1 && otherParents[0] instanceof Exchange)
-        {
-            final String bindingKey = (String) attributes.get("name");
-            ((Exchange<?>)otherParents[0]).addBinding(bindingKey, this,
-                                                           (Map<String,Object>) attributes.get(Binding.ARGUMENTS));
-            for(Binding binding : _bindings)
-            {
-                if(binding.getExchange() == otherParents[0] && binding.getName().equals(bindingKey))
-                {
-                    return Futures.immediateFuture((C) binding);
-                }
-            }
-            return null;
-        }
-        return super.addChildAsync(childClass, attributes, otherParents);
-    }
-
-    @Override
     public boolean changeAttribute(String name, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException
     {
         if(EXCLUSIVE.equals(name))
@@ -3445,4 +3393,24 @@ public abstract class AbstractQueue<X ex
             }
         }
     }
+
+    @Override
+    public void linkAdded(final MessageSender sender, final String linkName)
+    {
+        Integer oldValue = _linkedSenders.putIfAbsent(sender, 1);
+        if(oldValue != null)
+        {
+            _linkedSenders.put(sender, oldValue+1);
+        }
+    }
+
+    @Override
+    public void linkRemoved(final MessageSender sender, final String linkName)
+    {
+        int oldValue = _linkedSenders.remove(sender);
+        if(oldValue != 1)
+        {
+            _linkedSenders.put(sender, oldValue-1);
+        }
+    }
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Thu Dec 22 17:59:46 2016
@@ -111,7 +111,7 @@ class QueueConsumerImpl<T extends Consum
                       EnumSet<ConsumerOption> optionSet,
                       final Integer priority)
     {
-        super(parentsMap(queue, target.getSessionModel().getModelObject()),
+        super(parentsMap(queue),
               createAttributeMap(target.getSessionModel(), consumerName, filters, optionSet, priority));
         _messageClass = messageClass;
         _sessionReference = target.getSessionModel().getConnectionReference();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java Thu Dec 22 17:59:46 2016
@@ -66,6 +66,8 @@ public class BrokerStoreUpgraderAndRecov
 
         register(new Upgrader_3_0_to_6_0());
         register(new Upgrader_6_0_to_6_1());
+        register(new Upgrader_6_1_to_7_0());
+
     }
 
     private static final class Upgrader_1_0_to_1_1 extends StoreUpgraderPhase
@@ -545,6 +547,30 @@ public class BrokerStoreUpgraderAndRecov
 
     }
 
+    private class Upgrader_6_1_to_7_0 extends StoreUpgraderPhase
+    {
+
+        public Upgrader_6_1_to_7_0()
+        {
+            super("modelVersion", "6.1", "7.0");
+        }
+
+        @Override
+        public void configuredObject(ConfiguredObjectRecord record)
+        {
+            if (record.getType().equals("Broker"))
+            {
+                upgradeRootRecord(record);
+            }
+        }
+
+        @Override
+        public void complete()
+        {
+
+        }
+    }
+
     private static class VirtualHostEntryUpgrader
     {
         @SuppressWarnings("serial")

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java Thu Dec 22 17:59:46 2016
@@ -44,6 +44,7 @@ import org.apache.qpid.server.model.Virt
 import org.apache.qpid.server.model.VirtualHostNode;
 import org.apache.qpid.server.queue.QueueArgumentsConverter;
 import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.util.FixedKeyMapCreator;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 
 public class VirtualHostStoreUpgraderAndRecoverer extends AbstractConfigurationStoreUpgraderAndRecoverer
@@ -73,6 +74,8 @@ public class VirtualHostStoreUpgraderAnd
         register(new Upgrader_2_0_to_3_0());
         register(new Upgrader_3_0_to_6_0());
         register(new Upgrader_6_0_to_6_1());
+        register(new Upgrader_6_1_to_7_0());
+
 
         Map<String, UUID> defaultExchangeIds = new HashMap<String, UUID>();
         for (String exchangeName : DEFAULT_EXCHANGES.keySet())
@@ -106,10 +109,10 @@ public class VirtualHostStoreUpgraderAnd
         private void removeSelectorArguments(Map<String, Object> binding)
         {
             @SuppressWarnings("unchecked")
-            Map<String, Object> arguments = new LinkedHashMap<String, Object>((Map<String,Object>)binding.get(Binding.ARGUMENTS));
+            Map<String, Object> arguments = new LinkedHashMap<String, Object>((Map<String,Object>)binding.get("arguments"));
 
             FilterSupport.removeFilters(arguments);
-            binding.put(Binding.ARGUMENTS, arguments);
+            binding.put("arguments", arguments);
         }
 
         private boolean isTopicExchange(ConfiguredObjectRecord entry)
@@ -141,7 +144,7 @@ public class VirtualHostStoreUpgraderAnd
         private boolean hasSelectorArguments(Map<String, Object> binding)
         {
             @SuppressWarnings("unchecked")
-            Map<String, Object> arguments = (Map<String, Object>) binding.get(Binding.ARGUMENTS);
+            Map<String, Object> arguments = (Map<String, Object>) binding.get("arguments");
             return (arguments != null) && FilterSupport.argumentsContainFilter(arguments);
         }
 
@@ -541,6 +544,182 @@ public class VirtualHostStoreUpgraderAnd
 
     }
 
+    private static final FixedKeyMapCreator BINDING_MAP_CREATOR = new FixedKeyMapCreator("bindingKey", "destination", "arguments");
+    private static final FixedKeyMapCreator NO_ARGUMENTS_BINDING_MAP_CREATOR = new FixedKeyMapCreator("bindingKey", "destination");
+
+    private class Upgrader_6_1_to_7_0 extends StoreUpgraderPhase
+    {
+        private final Map<UUID, List<BindingRecord>> _exchangeBindings = new HashMap<>();
+        private final Map<UUID, ConfiguredObjectRecord> _exchanges = new HashMap<>();
+        private final Map<UUID, String> _queues = new HashMap<>();
+        private final Map<String, List<Map<String,Object>>> _queueBindings = new HashMap<>();
+
+
+        public Upgrader_6_1_to_7_0()
+        {
+            super("modelVersion", "6.1", "7.0");
+        }
+
+        @Override
+        public void configuredObject(ConfiguredObjectRecord record)
+        {
+            if("VirtualHost".equals(record.getType()))
+            {
+                upgradeRootRecord(record);
+            }
+            else if("Binding".equals(record.getType()))
+            {
+                BindingRecord binding = new BindingRecord(String.valueOf(record.getAttributes().get("name")),
+                                                          record.getParents().get("Queue").toString(),
+                                                          record.getAttributes().get("arguments"));
+                final UUID exchangeId = record.getParents().get("Exchange");
+                List<BindingRecord> existingBindings = _exchangeBindings.get(exchangeId);
+                if(existingBindings == null)
+                {
+                    existingBindings = new ArrayList<>();
+                    _exchangeBindings.put(exchangeId, existingBindings);
+                }
+                existingBindings.add(binding);
+                getDeleteMap().put(record.getId(), record);
+            }
+            else if("Exchange".equals(record.getType()))
+            {
+                final UUID exchangeId = record.getId();
+                _exchanges.put(exchangeId, record);
+                if(record.getAttributes().containsKey("bindings"))
+                {
+                    List<BindingRecord> existingBindings = _exchangeBindings.get(exchangeId);
+                    if(existingBindings == null)
+                    {
+                        existingBindings = new ArrayList<>();
+                        _exchangeBindings.put(exchangeId, existingBindings);
+                    }
+
+                    List<Map<String,Object>> bindingList =
+                            (List<Map<String, Object>>) record.getAttributes().get("bindings");
+                    for(Map<String,Object> existingBinding : bindingList)
+                    {
+                        existingBindings.add(new BindingRecord((String)existingBinding.get("name"),
+                                                               String.valueOf(existingBinding.get("queue")),
+                                                               existingBinding.get("arguments")));
+                    }
+                }
+            }
+            else if("Queue".equals(record.getType()))
+            {
+                _queues.put(record.getId(), (String) record.getAttributes().get("name"));
+                if(record.getAttributes().containsKey("bindings"))
+                {
+                    _queueBindings.put(String.valueOf(record.getAttributes().get("name")),
+                                       (List<Map<String, Object>>) record.getAttributes().get("bindings"));
+                    Map<String, Object> updatedAttributes = new HashMap<>(record.getAttributes());
+                    updatedAttributes.remove("bindings");
+                    getUpdateMap().put(record.getId(), new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents()));
+                }
+            }
+        }
+
+        @Override
+        public void complete()
+        {
+            for(Map.Entry<String, List<Map<String,Object>>> entry : _queueBindings.entrySet())
+            {
+                for(Map<String, Object> existingBinding : entry.getValue())
+                {
+                    UUID exchangeId;
+                    if(existingBinding.get("exchange") instanceof UUID)
+                    {
+                        exchangeId = (UUID) existingBinding.get("exchange");
+                    }
+                    else
+                    {
+                        exchangeId = getExchangeIdFromNameOrId( existingBinding.get("exchange").toString());
+                    }
+                    List<BindingRecord> existingBindings = _exchangeBindings.get(exchangeId);
+                    if(existingBindings == null)
+                    {
+                        existingBindings = new ArrayList<>();
+                        _exchangeBindings.put(exchangeId, existingBindings);
+                    }
+                    existingBindings.add(new BindingRecord((String)existingBinding.get("name"),
+                                                           entry.getKey(),
+                                                           existingBinding.get("arguments")));
+                }
+            }
+
+            for(Map.Entry<UUID, List<BindingRecord>> entry : _exchangeBindings.entrySet())
+            {
+                ConfiguredObjectRecord exchangeRecord = _exchanges.get(entry.getKey());
+                if(exchangeRecord != null)
+                {
+                    final List<BindingRecord> bindingRecords = entry.getValue();
+                    List<Map<String,Object>> actualBindings = new ArrayList<>(bindingRecords.size());
+                    for(BindingRecord bindingRecord : bindingRecords)
+                    {
+                        if(bindingRecord._arguments == null)
+                        {
+                            actualBindings.add(NO_ARGUMENTS_BINDING_MAP_CREATOR.createMap(bindingRecord._name,
+                                                                                          getQueueFromIdOrName(bindingRecord)));
+                        }
+                        else
+                        {
+                            actualBindings.add(BINDING_MAP_CREATOR.createMap(bindingRecord._name,
+                                                                             getQueueFromIdOrName(bindingRecord), bindingRecord._arguments));
+                        }
+                    }
+                    Map<String, Object> updatedAttributes = new HashMap<>(exchangeRecord.getAttributes());
+                    updatedAttributes.remove("bindings");
+                    updatedAttributes.put("durableBindings", actualBindings);
+                    exchangeRecord = new ConfiguredObjectRecordImpl(exchangeRecord.getId(), exchangeRecord.getType(), updatedAttributes, exchangeRecord.getParents());
+                    getUpdateMap().put(exchangeRecord.getId(), exchangeRecord);
+                }
+
+            }
+        }
+
+        private UUID getExchangeIdFromNameOrId(final String exchange)
+        {
+            for(ConfiguredObjectRecord record : _exchanges.values())
+            {
+                if(exchange.equals(record.getAttributes().get("name")))
+                {
+                    return record.getId();
+                }
+            }
+            return UUID.fromString(exchange);
+        }
+
+        private String getQueueFromIdOrName(final BindingRecord bindingRecord)
+        {
+            final String queueIdOrName = bindingRecord._queueIdOrName;
+            try
+            {
+                UUID queueId = UUID.fromString(queueIdOrName);
+                String name = _queues.get(queueId);
+                return name == null ? queueIdOrName : name;
+            }
+            catch(IllegalArgumentException e)
+            {
+                return queueIdOrName;
+            }
+        }
+
+        private class BindingRecord
+        {
+            private final String _name;
+            private final String _queueIdOrName;
+            private final Object _arguments;
+
+            public BindingRecord(final String name, final String queueIdOrName, final Object arguments)
+            {
+                _name = name;
+                _queueIdOrName = queueIdOrName;
+                _arguments = arguments;
+            }
+        }
+    }
+
+
     public boolean upgradeAndRecover(final DurableConfigurationStore durableConfigurationStore,
                                      final ConfiguredObjectRecord... initialRecords)
     {

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1775689&r1=1775688&r2=1775689&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Thu Dec 22 17:59:46 2016
@@ -1316,6 +1316,12 @@ public abstract class AbstractVirtualHos
     }
 
     @Override
+    public Queue<?> getAttainedQueue(final String name)
+    {
+        return (Queue<?>) awaitChildClassToAttainState(Queue.class, name);
+    }
+
+    @Override
     public Broker<?> getBroker()
     {
         return _broker;
@@ -1362,6 +1368,12 @@ public abstract class AbstractVirtualHos
     }
 
     @Override
+    public MessageDestination getSystemDestination(final String name)
+    {
+        return _systemNodeDestinations.get(name);
+    }
+
+    @Override
     public <T extends ConfiguredObject<?>> T getAttainedChildFromAddress(final Class<T> childClass,
                                                                          final String address)
     {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org