You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2010/03/18 17:24:53 UTC

svn commit: r924882 - in /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid: qmf/ server/ server/exchange/ server/exchange/topic/

Author: robbie
Date: Thu Mar 18 16:24:53 2010
New Revision: 924882

URL: http://svn.apache.org/viewvc?rev=924882&view=rev
Log:
QPID-2379: add Binding.msgMatched() support to the ManagementExchange. Remove the ManagementExchange from the list of creatable exchange types returned for use by the JMX management console. Remove the use of TopicBinding's and then remove the now-unused class entirely.

Removed:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicBinding.java
Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java?rev=924882&r1=924881&r2=924882&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java Thu Mar 18 16:24:53 2010
@@ -31,7 +31,6 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeReferrer;
 import org.apache.qpid.server.exchange.ExchangeType;
-import org.apache.qpid.server.exchange.topic.TopicBinding;
 import org.apache.qpid.server.exchange.topic.TopicExchangeResult;
 import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
 import org.apache.qpid.server.exchange.topic.TopicNormalizer;
@@ -47,7 +46,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -55,6 +53,7 @@ import java.util.TimerTask;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class ManagementExchange implements Exchange, QMFService.Listener
@@ -69,8 +68,7 @@ public class ManagementExchange implemen
     private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults =
             new ConcurrentHashMap<AMQShortString, TopicExchangeResult>();
 
-    private final Map<TopicBinding, FieldTable> _topicBindings = new HashMap<TopicBinding, FieldTable>();
-    private final Set<Binding> _bindingSet = new HashSet<Binding>();
+    private final Set<Binding> _bindingSet = new CopyOnWriteArraySet<Binding>();
     private UUID _id;
     private static final String AGENT_BANK = "0";
 
@@ -254,21 +252,7 @@ public class ManagementExchange implemen
     public synchronized void addBinding(final Binding b)
     {
 
-        _bindingSet.add(b);
-
-        for(BindingListener listener : _listeners)
-        {
-            listener.bindingAdded(this, b);
-        }
-
-        if(_bindingSet.size() > _bindingCountHigh)
-        {
-            _bindingCountHigh = _bindingSet.size();
-        }
-
-        TopicBinding binding = new TopicBinding(new AMQShortString(b.getBindingKey()), b.getQueue(), null);
-
-        if(!_topicBindings.containsKey(binding))
+        if(_bindingSet.add(b))
         {
             AMQShortString routingKey = TopicNormalizer.normalize(new AMQShortString(b.getBindingKey()));
 
@@ -284,10 +268,20 @@ public class ManagementExchange implemen
             {
                 result.addUnfilteredQueue(b.getQueue());
             }
-            _topicBindings.put(binding, null);
 
+            result.addBinding(b);
+        }
+        
+        for(BindingListener listener : _listeners)
+        {
+            listener.bindingAdded(this, b);
+        }
 
+        if(_bindingSet.size() > _bindingCountHigh)
+        {
+            _bindingCountHigh = _bindingSet.size();
         }
+        
         String bindingKey = b.getBindingKey();
 
         if(bindingKey.startsWith("schema.") || bindingKey.startsWith("*.") || bindingKey.startsWith("#."))
@@ -355,6 +349,13 @@ public class ManagementExchange implemen
             HashSet<AMQQueue> queues = new HashSet<AMQQueue>();
             for(TopicMatcherResult result : results)
             {
+                TopicExchangeResult res = (TopicExchangeResult)result;
+
+                for(Binding b : res.getBindings())
+                {
+                    b.incrementMatches();
+                }
+                
                 queues.addAll(((TopicExchangeResult)result).getUnfilteredQueues());
             }
             for(AMQQueue queue : queues)
@@ -378,14 +379,11 @@ public class ManagementExchange implemen
 
     public synchronized void removeBinding(final Binding binding)
     {
-        _bindingSet.remove(binding);
-
-        TopicBinding topicBinding = new TopicBinding(new AMQShortString(binding.getBindingKey()), binding.getQueue(), null);
-
-        if(_topicBindings.containsKey(topicBinding))
+        if(_bindingSet.remove(binding))
         {
-            AMQShortString bindingKey = TopicNormalizer.normalize(topicBinding.getBindingKey());
+            AMQShortString bindingKey = TopicNormalizer.normalize(new AMQShortString(binding.getBindingKey()));
             TopicExchangeResult result = _topicExchangeResults.get(bindingKey);
+            result.removeBinding(binding);
             result.removeUnfilteredQueue(binding.getQueue());
         }
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?rev=924882&r1=924881&r2=924882&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Thu Mar 18 16:24:53 2010
@@ -110,7 +110,7 @@ public class AMQBrokerManagerMBean exten
     public String[] getExchangeTypes() throws IOException
     {
         ArrayList<String> exchangeTypes = new ArrayList<String>();
-        for(ExchangeType<? extends Exchange> ex : _exchangeFactory.getRegisteredTypes())
+        for(ExchangeType<? extends Exchange> ex : _exchangeFactory.getPublicCreatableTypes())
         {
             exchangeTypes.add(ex.getName().toString());
         }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?rev=924882&r1=924881&r2=924882&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java Thu Mar 18 16:24:53 2010
@@ -20,8 +20,12 @@
  */
 package org.apache.qpid.server.exchange;
 
-import org.apache.log4j.Logger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQUnknownExchangeType;
 import org.apache.qpid.framing.AMQShortString;
@@ -30,10 +34,6 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
 public class DefaultExchangeFactory implements ExchangeFactory
 {
     private static final Logger _logger = Logger.getLogger(DefaultExchangeFactory.class);
@@ -60,6 +60,21 @@ public class DefaultExchangeFactory impl
     {
         return _exchangeClassMap.values();
     }
+    
+    public Collection<ExchangeType<? extends Exchange>> getPublicCreatableTypes()
+    {
+        Collection<ExchangeType<? extends Exchange>> publicTypes = 
+                                new ArrayList<ExchangeType<? extends Exchange>>();
+        publicTypes.addAll(_exchangeClassMap.values());
+        
+        //Remove the ManagementExchange type if present, as these 
+        //are private and cannot be created by external means
+        publicTypes.remove(ManagementExchange.TYPE);
+        
+        return publicTypes;
+    }
+    
+    
 
     public Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete)
             throws AMQException

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java?rev=924882&r1=924881&r2=924882&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java Thu Mar 18 16:24:53 2010
@@ -38,6 +38,8 @@ public interface ExchangeFactory
     void initialise(VirtualHostConfiguration hostConfig);
 
     Collection<ExchangeType<? extends Exchange>> getRegisteredTypes();
+    
+    Collection<ExchangeType<? extends Exchange>> getPublicCreatableTypes();
 
     Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete) throws AMQException;
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org