You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2011/12/14 13:20:19 UTC

svn commit: r1214206 - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/exchange/HeadersExchangeMBean.java test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java

Author: kwall
Date: Wed Dec 14 12:20:18 2011
New Revision: 1214206

URL: http://svn.apache.org/viewvc?rev=1214206&view=rev
Log:
QPID-3680: Header exchange bindings not being created via JMX Management Console

Applied patch from Andrew MacBean <an...@gmail.com> and myself.

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeMBean.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeMBean.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeMBean.java?rev=1214206&r1=1214205&r2=1214206&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeMBean.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeMBean.java Wed Dec 14 12:20:18 2011
@@ -20,12 +20,21 @@
  */
 package org.apache.qpid.server.exchange;
 
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
 import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
 import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.ManagementActor;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import javax.management.JMException;
+import javax.management.MBeanException;
 import javax.management.openmbean.*;
+
+import java.util.HashMap;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Map;
@@ -94,5 +103,48 @@ final class HeadersExchangeMBean extends
         return bindingList;
     }
 
+    @Override
+    public void createNewBinding(String queueName, String binding) throws JMException
+    {
+        VirtualHost vhost = getExchange().getVirtualHost();
+        AMQQueue queue = vhost.getQueueRegistry().getQueue(new AMQShortString(queueName));
+        if (queue == null)
+        {
+            throw new JMException("Queue \"" + queueName + "\" is not registered with the virtualhost.");
+        }
+
+        CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger()));
+
+        final Map<String,Object> arguments = new HashMap<String, Object>();
+        final String[] bindings = binding.split(",");
+        for (int i = 0; i < bindings.length; i++)
+        {
+            final String[] keyAndValue = bindings[i].split("=");
+            if (keyAndValue == null || keyAndValue.length == 0 || keyAndValue.length > 2 || keyAndValue[0].length() == 0)
+            {
+                throw new JMException("Format for headers binding should be \"<attribute1>=<value1>,<attribute2>=<value2>\" ");
+            }
+
+            if(keyAndValue.length == 1)
+            {
+                //no value was given, only a key. Use an empty value to signal match on key presence alone
+                arguments.put(keyAndValue[0], "");
+            }
+            else
+            {
+                arguments.put(keyAndValue[0], keyAndValue[1]);
+            }
+        }
+        try
+        {
+            vhost.getBindingFactory().addBinding(binding,queue,getExchange(),arguments);
+        }
+        catch (AMQException ex)
+        {
+            JMException jme = new JMException(ex.toString());
+            throw new MBeanException(jme, "Error creating new binding " + binding);
+        }
+        CurrentActor.remove();
+    }
 
 } // End of MBean class

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java?rev=1214206&r1=1214205&r2=1214206&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java Wed Dec 14 12:20:18 2011
@@ -20,8 +20,7 @@
  */
 package org.apache.qpid.server.exchange;
 
-import junit.framework.TestCase;
-
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.qpid.management.common.mbeans.ManagedExchange;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
@@ -34,9 +33,11 @@ import org.apache.qpid.server.virtualhos
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 
+import javax.management.JMException;
+import javax.management.openmbean.CompositeDataSupport;
 import javax.management.openmbean.TabularData;
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Iterator;
 
 /**
  * Unit test class for testing different Exchange MBean operations
@@ -47,10 +48,20 @@ public class ExchangeMBeanTest  extends 
     private QueueRegistry _queueRegistry;
     private VirtualHost _virtualHost;
 
-    /**
-     * Test for direct exchange mbean
-     * @throws Exception
-     */
+    public void testGeneralProperties() throws Exception
+    {
+        DirectExchange exchange = new DirectExchange();
+        exchange.initialise(_virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true);
+        ManagedObject managedObj = exchange.getManagedObject();
+        ManagedExchange mbean = (ManagedExchange)managedObj;
+
+        // test general exchange properties
+        assertEquals("Unexpected exchange name", "amq.direct", mbean.getName());
+        assertEquals("Unexpected exchange type", "direct", mbean.getExchangeType());
+        assertEquals("Unexpected ticket number", Integer.valueOf(0), mbean.getTicketNo());
+        assertFalse("Unexpected durable flag", mbean.isDurable());
+        assertTrue("Unexpected auto delete flag", mbean.isAutoDelete());
+    }
 
     public void testDirectExchangeMBean() throws Exception
     {
@@ -65,20 +76,8 @@ public class ExchangeMBeanTest  extends 
         TabularData data = mbean.bindings();
         ArrayList<Object> list = new ArrayList<Object>(data.values());
         assertTrue(list.size() == 2);
-
-        // test general exchange properties
-        assertEquals(mbean.getName(), "amq.direct");
-        assertEquals(mbean.getExchangeType(), "direct");
-        assertTrue(mbean.getTicketNo() == 0);
-        assertTrue(!mbean.isDurable());
-        assertTrue(mbean.isAutoDelete());
     }
 
-    /**
-     * Test for "topic" exchange mbean
-     * @throws Exception
-     */
-
     public void testTopicExchangeMBean() throws Exception
     {
         TopicExchange exchange = new TopicExchange();
@@ -92,42 +91,81 @@ public class ExchangeMBeanTest  extends 
         TabularData data = mbean.bindings();
         ArrayList<Object> list = new ArrayList<Object>(data.values());
         assertTrue(list.size() == 2);
+    }
 
-        // test general exchange properties
-        assertEquals(mbean.getName(), "amq.topic");
-        assertEquals(mbean.getExchangeType(), "topic");
-        assertTrue(mbean.getTicketNo() == 0);
-        assertTrue(!mbean.isDurable());
-        assertTrue(mbean.isAutoDelete());
+    public void testHeadersExchangeMBean() throws Exception
+    {
+        HeadersExchange exchange = new HeadersExchange();
+        exchange.initialise(_virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true);
+        ManagedObject managedObj = exchange.getManagedObject();
+        ManagedExchange mbean = (ManagedExchange)managedObj;
+
+        mbean.createNewBinding(_queue.getNameShortString().toString(), "x-match=any,key1=binding1,key2=binding2");
+
+        TabularData data = mbean.bindings();
+        ArrayList<Object> list = new ArrayList<Object>(data.values());
+        assertEquals("Unexpected number of bindings", 1, list.size());
+
+        final Iterator<CompositeDataSupport> rowItr = (Iterator<CompositeDataSupport>) data.values().iterator();
+        CompositeDataSupport row = rowItr.next();
+        assertBinding(1, _queue.getName(), new String[]{"x-match=any","key1=binding1","key2=binding2"}, row);
     }
 
     /**
-     * Test for "Headers" exchange mbean
-     * @throws Exception
+     * Included to ensure 0-10 Specification compliance:
+     * 2.3.1.4 "the field in the bind arguments has no value and a field of the same name is present in the message headers
      */
-
-    public void testHeadersExchangeMBean() throws Exception
+    public void testHeadersExchangeMBeanMatchPropertyNoValue() throws Exception
     {
         HeadersExchange exchange = new HeadersExchange();
         exchange.initialise(_virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true);
         ManagedObject managedObj = exchange.getManagedObject();
         ManagedExchange mbean = (ManagedExchange)managedObj;
 
-        mbean.createNewBinding(_queue.getNameShortString().toString(), "key1=binding1,key2=binding2");
-        mbean.createNewBinding(_queue.getNameShortString().toString(), "key3=binding3");
+        mbean.createNewBinding(_queue.getNameShortString().toString(), "x-match=any,key4,key5=");
 
         TabularData data = mbean.bindings();
         ArrayList<Object> list = new ArrayList<Object>(data.values());
-        assertTrue(list.size() == 2);
+        assertEquals("Unexpected number of bindings", 1, list.size());
 
-        // test general exchange properties
-        assertEquals(mbean.getName(), "amq.match");
-        assertEquals(mbean.getExchangeType(), "headers");
-        assertTrue(mbean.getTicketNo() == 0);
-        assertTrue(!mbean.isDurable());
-        assertTrue(mbean.isAutoDelete());
+        final Iterator<CompositeDataSupport> rowItr = (Iterator<CompositeDataSupport>) data.values().iterator();
+        CompositeDataSupport row = rowItr.next();
+        assertBinding(1, _queue.getName(), new String[]{"x-match=any","key4=","key5="}, row);
     }
-    
+
+    public void testInvalidHeaderBindingMalformed() throws Exception
+    {
+        HeadersExchange exchange = new HeadersExchange();
+        exchange.initialise(_virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true);
+        ManagedObject managedObj = exchange.getManagedObject();
+        ManagedExchange mbean = (ManagedExchange)managedObj;
+
+        try
+        {
+            mbean.createNewBinding(_queue.getNameShortString().toString(), "x-match=any,=value4");
+            fail("Exception not thrown");
+        }
+        catch (JMException jme)
+        {
+            //pass
+        }
+    }
+
+    private void assertBinding(final int expectedBindingNo, final String expectedQueueName, final String[] expectedBindingArray,
+                                final CompositeDataSupport row)
+    {
+        final Number bindingNumber = (Number) row.get(ManagedExchange.HDR_BINDING_NUMBER);
+        final String queueName = (String) row.get(ManagedExchange.HDR_QUEUE_NAME);
+        final String[] bindings = (String[]) row.get(ManagedExchange.HDR_QUEUE_BINDINGS);
+        assertEquals("Unexpected binding number", expectedBindingNo, bindingNumber);
+        assertEquals("Unexpected queue name", expectedQueueName, queueName);
+        assertEquals("Unexpected no of bindings", expectedBindingArray.length, bindings.length);
+        for(String binding : bindings)
+        {
+            assertTrue("Expected binding not found: " + binding, ArrayUtils.contains(expectedBindingArray, binding));
+        }
+    }
+
     /**
      * Test adding bindings and removing them from the default exchange via JMX.
      * <p>



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org