You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/16 23:57:08 UTC

svn commit: r1301814 - in /qpid/branches/java-config-and-management/qpid/java: broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ broker/src/main/java/org/apache/qpid/server/model/ broker/src/main/java/org/apache/qpid/server/model/adap...

Author: rgodfrey
Date: Fri Mar 16 22:57:07 2012
New Revision: 1301814

URL: http://svn.apache.org/viewvc?rev=1301814&view=rev
Log:
NO-JIRA : [Java Config] added ability to create Bindings through new API, and via JMX

Modified:
    qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java
    qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java
    qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
    qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
    qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
    qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/ExchangeImpl.java

Modified: qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java?rev=1301814&r1=1301813&r2=1301814&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java Fri Mar 16 22:57:07 2012
@@ -30,6 +30,7 @@ import org.apache.qpid.server.model.Bind
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.VirtualHost;
 
 import javax.management.JMException;
 import javax.management.MalformedObjectNameException;
@@ -47,6 +48,7 @@ import javax.management.openmbean.Tabula
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -77,6 +79,7 @@ public class ExchangeMBean extends AMQMa
             HEADERS_COMPOSITE_ITEM_DESC.toArray(new String[HEADERS_COMPOSITE_ITEM_DESC.size()]);
     private static final  String[] HEADERS_TABULAR_UNIQUE_INDEX_ARRAY =
             HEADERS_TABULAR_UNIQUE_INDEX.toArray(new String[HEADERS_TABULAR_UNIQUE_INDEX.size()]);
+    public static final   String   HEADERS_EXCHANGE_TYPE              = "headers";
 
     static
     {
@@ -166,7 +169,7 @@ public class ExchangeMBean extends AMQMa
 
     public TabularData bindings() throws IOException, JMException
     {
-        if("headers".equals(_exchange.getExchangeType()))
+        if(HEADERS_EXCHANGE_TYPE.equals(_exchange.getExchangeType()))
         {
             return getHeadersBindings(_exchange.getBindings()); 
         }
@@ -254,11 +257,44 @@ public class ExchangeMBean extends AMQMa
         return bindingList;
     }
 
-    public void createNewBinding(
-            @MBeanOperationParameter(name = ManagedQueue.TYPE, description = "Queue name") String queueName,
-            @MBeanOperationParameter(name = "Binding", description = "New binding") String binding) throws JMException
+    public void createNewBinding(String queueName, String binding) throws JMException
     {
-        // TODO
+        final Map<String,Object> arguments = new HashMap<String, Object>();
+
+        if(HEADERS_EXCHANGE_TYPE.equals(_exchange.getExchangeType()))
+        {
+            final String[] bindings = binding.split(",");
+            for (int i = 0; i < bindings.length; i++)
+            {
+                final String[] keyAndValue = bindings[i].split("=");
+                if (keyAndValue == null || keyAndValue.length == 0 || keyAndValue.length > 2 || keyAndValue[0].length() == 0)
+                {
+                    throw new JMException("Format for headers binding should be \"<attribute1>=<value1>,<attribute2>=<value2>\" ");
+                }
+
+                if(keyAndValue.length == 1)
+                {
+                    //no value was given, only a key. Use an empty value to signal match on key presence alone
+                    arguments.put(keyAndValue[0], "");
+                }
+                else
+                {
+                    arguments.put(keyAndValue[0], keyAndValue[1]);
+                }
+            }
+        }
+
+        Queue queue = null;
+        VirtualHost vhost = _exchange.getParent(VirtualHost.class);
+        for(Queue aQueue : vhost.getQueues())
+        {
+            if(aQueue.getName().equals(queueName))
+            {
+                queue = aQueue;
+                break;
+            }
+        }
+        _exchange.createBinding(binding, queue, arguments, Collections.EMPTY_MAP);
     }
 
     public void removeBinding(

Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java?rev=1301814&r1=1301813&r2=1301814&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java Fri Mar 16 22:57:07 2012
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.model;
 
 import java.util.Collection;
+import java.util.Map;
 
 public interface Exchange extends ConfiguredObject
 {
@@ -33,6 +34,12 @@ public interface Exchange extends Config
     //children
     Collection<Binding> getBindings();
     Collection<Publisher> getPublishers();
+    
+    //operations
+    Binding createBinding(String bindingKey,
+                          Queue queue,
+                          Map<String,Object> bindingArguments,
+                          Map<String, Object> attributes);
 
 
     // Statistics

Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java?rev=1301814&r1=1301813&r2=1301814&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java Fri Mar 16 22:57:07 2012
@@ -27,12 +27,18 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.Publisher;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.Statistics;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
 final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apache.qpid.server.exchange.Exchange.BindingListener
 {
@@ -48,6 +54,8 @@ final class ExchangeAdapter extends Abst
     {
         _vhost = virtualHostAdapter;
         _exchange = exchange;
+        addParent(org.apache.qpid.server.model.VirtualHost.class, virtualHostAdapter);
+
         exchange.addBindingListener(this);
         populateBindings();
     }
@@ -92,6 +100,46 @@ final class ExchangeAdapter extends Abst
         return Collections.emptyList();
     }
 
+    public org.apache.qpid.server.model.Binding createBinding(String bindingKey, Queue queue,
+                                                              Map<String, Object> bindingArguments,
+                                                              Map<String, Object> attributes)
+            throws AccessControlException, IllegalStateException
+    {
+        VirtualHost virtualHost = _vhost.getVirtualHost();
+
+
+        AMQQueue amqQueue = ((QueueAdapter)queue).getAMQQueue();
+        
+        try
+        {
+            if(!virtualHost.getBindingFactory().addBinding(bindingKey, amqQueue, _exchange, bindingArguments))
+            {
+                Binding oldBinding = virtualHost.getBindingFactory().getBinding(bindingKey, amqQueue, _exchange, 
+                                                                                bindingArguments);
+    
+                Map<String, Object> oldArgs = oldBinding.getArguments();
+                if((oldArgs == null && !bindingArguments.isEmpty()) || (oldArgs != null && !oldArgs.equals(bindingArguments)))
+                {
+                    virtualHost.getBindingFactory().replaceBinding(bindingKey, amqQueue, _exchange, bindingArguments);
+                }
+            }
+            Binding binding = virtualHost.getBindingFactory().getBinding(bindingKey, amqQueue, _exchange, bindingArguments);
+            
+            synchronized (_bindingAdapters)
+            {
+                return binding == null ? null : _bindingAdapters.get(binding);
+            }
+        }
+        catch(AMQSecurityException e)
+        {
+            throw new AccessControlException(e.toString());
+        }
+        catch(AMQInternalException e)
+        {
+            throw new IllegalStateException(e);
+        }
+    }
+
     public String getName()
     {
         return _exchange.getName();

Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java?rev=1301814&r1=1301813&r2=1301814&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java Fri Mar 16 22:57:07 2012
@@ -52,6 +52,8 @@ final class QueueAdapter extends Abstrac
     public QueueAdapter(final VirtualHostAdapter virtualHostAdapter, final AMQQueue queue)
     {
         _vhost = virtualHostAdapter;
+        addParent(org.apache.qpid.server.model.VirtualHost.class, virtualHostAdapter);
+
         _queue = queue;
         _statistics = new QueueStatisticsAdapter(queue);
     }
@@ -356,6 +358,11 @@ final class QueueAdapter extends Abstrac
         }
     }
 
+    AMQQueue getAMQQueue()
+    {
+        return _queue;
+    }
+
     private static class QueueStatisticsAdapter implements Statistics
     {
 

Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java?rev=1301814&r1=1301813&r2=1301814&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java Fri Mar 16 22:57:07 2012
@@ -442,4 +442,9 @@ final class VirtualHostAdapter extends A
         // TODO
         throw new UnsupportedOperationException("Not Yet Implemented");
     }
+
+    org.apache.qpid.server.virtualhost.VirtualHost getVirtualHost()
+    {
+        return _virtualHost;
+    }
 }

Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/ExchangeImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/ExchangeImpl.java?rev=1301814&r1=1301813&r2=1301814&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/ExchangeImpl.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/ExchangeImpl.java Fri Mar 16 22:57:07 2012
@@ -24,6 +24,7 @@ import org.apache.qpid.server.model.Bind
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.Publisher;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.Statistics;
 import org.apache.qpid.server.model.VirtualHost;
@@ -83,6 +84,12 @@ class ExchangeImpl extends AbstractConfi
         return null;  //TODO
     }
 
+    public Binding createBinding(String bindingKey, Queue queue, Map<String, Object> bindingArguments,
+                                 Map<String, Object> attributes)
+    {
+        return null;  // TODO - Implement
+    }
+
     public State getActualState()
     {
         State vhostState = _virtualHost.getActualState();



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org