You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/06/02 02:55:37 UTC

svn commit: r1488636 - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/configuration/ main/java/org/apache/qpid/server/configuration/plugins/ main/java/org/apache/qpid/server/queue/ test/java/org/apache/qpid/server/configuration/ ...

Author: rgodfrey
Date: Sun Jun  2 00:55:36 2013
New Revision: 1488636

URL: http://svn.apache.org/r1488636
Log:
QPID-4898 : [Java Broker] Allow setting arbitrary arguments in queue defintion within XML config file

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/AbstractConfiguration.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java?rev=1488636&r1=1488635&r2=1488636&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java Sun Jun  2 00:55:36 2013
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.configuration;
 
+import java.util.Collections;
+import java.util.Map;
 import org.apache.commons.configuration.CompositeConfiguration;
 import org.apache.commons.configuration.ConfigurationException;
 
@@ -66,7 +68,8 @@ public class QueueConfiguration extends 
                             "lvqKey",
                             "sortKey",
                             "maximumDeliveryCount",
-                            "deadLetterQueues"
+                            "deadLetterQueues",
+                            "argument"
         };
     }
 
@@ -85,7 +88,7 @@ public class QueueConfiguration extends 
     {
         return getBooleanValue("durable");
     }
-    
+
     public boolean getExclusive()
     {
         return getBooleanValue("exclusive");
@@ -193,4 +196,9 @@ public class QueueConfiguration extends 
     {
         return getBooleanValue("deadLetterQueues", _vHostConfig.isDeadLetterQueueEnabled());
     }
+
+    public Map<String,String> getArguments()
+    {
+        return getMap("argument");
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/AbstractConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/AbstractConfiguration.java?rev=1488636&r1=1488635&r2=1488636&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/AbstractConfiguration.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/AbstractConfiguration.java Sun Jun  2 00:55:36 2013
@@ -18,6 +18,8 @@
  */
 package org.apache.qpid.server.configuration.plugins;
 
+import java.util.LinkedHashMap;
+import java.util.Map;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.ConversionException;
@@ -324,6 +326,19 @@ public abstract class AbstractConfigurat
     {
         _config = config;
     }
+
+    protected Map<String,String> getMap(String name)
+    {
+        List elements = getListValue(name,Collections.emptyList());
+
+        Map<String,String> map = new LinkedHashMap();
+        for(Object item : elements)
+        {
+            String[] keyValue = String.valueOf(item).split("=",2);
+            map.put(keyValue[0].trim(), keyValue.length > 1 ? keyValue[1].trim() : null);
+        }
+        return map;
+    }
 }
 
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=1488636&r1=1488635&r2=1488636&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Sun Jun  2 00:55:36 2013
@@ -239,6 +239,21 @@ public class AMQQueueFactory
                 {
                     priorities = ((Number)prioritiesObj).intValue();
                 }
+                else if(prioritiesObj instanceof String)
+                {
+                    try
+                    {
+                        priorities = Integer.parseInt(prioritiesObj.toString());
+                    }
+                    catch (NumberFormatException e)
+                    {
+                        // TODO - should warn here of invalid format
+                    }
+                }
+                else
+                {
+                    // TODO - should warn here of invalid format
+                }
             }
             else if(arguments.containsKey(QPID_QUEUE_SORT_KEY))
             {
@@ -445,6 +460,11 @@ public class AMQQueueFactory
     {
         Map<String,Object> arguments = new HashMap<String,Object>();
 
+        if(config.getArguments() != null && !config.getArguments().isEmpty())
+        {
+            arguments.putAll(config.getArguments());
+        }
+
         if(config.isLVQ() || config.getLVQKey() != null)
         {
             arguments.put(QPID_LAST_VALUE_QUEUE, 1);

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java?rev=1488636&r1=1488635&r2=1488636&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java Sun Jun  2 00:55:36 2013
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,6 +22,7 @@ package org.apache.qpid.server.configura
 
 import static org.mockito.Mockito.when;
 
+import java.util.Collections;
 import junit.framework.TestCase;
 import org.apache.commons.configuration.CompositeConfiguration;
 import org.apache.commons.configuration.ConfigurationException;
@@ -238,6 +239,43 @@ public class QueueConfigurationTest exte
         assertEquals("mydescription", qConf.getDescription());
     }
 
+
+    public void testQueueSingleArgument() throws ConfigurationException
+    {
+        //Check default value
+        QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf);
+        assertTrue(qConf.getArguments().isEmpty());
+
+        // Check explicit value
+        final VirtualHostConfiguration vhostConfig = overrideConfiguration("argument", "qpid.group_header_key=mykey");
+        qConf = new QueueConfiguration("test", vhostConfig);
+        assertEquals(Collections.singletonMap("qpid.group_header_key","mykey"), qConf.getArguments());
+    }
+
+
+    public void testQueueMultipleArguments() throws ConfigurationException
+    {
+        //Check default value
+        QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf);
+        assertTrue(qConf.getArguments().isEmpty());
+
+
+        PropertiesConfiguration queueConfig = new PropertiesConfiguration();
+        queueConfig.addProperty("queues.queue.test.argument", "qpid.group_header_key=mykey");
+        queueConfig.addProperty("queues.queue.test.argument", "qpid.shared_msg_group=1");
+
+        CompositeConfiguration config = new CompositeConfiguration();
+        config.addConfiguration(_fullHostConf.getConfig());
+        config.addConfiguration(queueConfig);
+
+        final VirtualHostConfiguration vhostConfig = new VirtualHostConfiguration("test", config, _broker);;
+        qConf = new QueueConfiguration("test", vhostConfig);
+        assertEquals(2, qConf.getArguments().size());
+        assertEquals("mykey", qConf.getArguments().get("qpid.group_header_key"));
+        assertEquals("1", qConf.getArguments().get("qpid.shared_msg_group"));
+    }
+
+
     private VirtualHostConfiguration overrideConfiguration(String property, Object value)
             throws ConfigurationException
     {

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java?rev=1488636&r1=1488635&r2=1488636&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java Sun Jun  2 00:55:36 2013
@@ -22,6 +22,8 @@ package org.apache.qpid.server.queue;
 
 import static org.mockito.Mockito.when;
 
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.configuration.XMLConfiguration;
 
 import org.apache.qpid.AMQException;
@@ -29,6 +31,7 @@ import org.apache.qpid.exchange.Exchange
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.configuration.QueueConfiguration;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.exchange.DefaultExchangeFactory;
 import org.apache.qpid.server.exchange.Exchange;
@@ -44,6 +47,7 @@ public class AMQQueueFactoryTest extends
 {
     private QueueRegistry _queueRegistry;
     private VirtualHost _virtualHost;
+    private Broker _broker;
 
     @Override
     public void setUp() throws Exception
@@ -55,14 +59,14 @@ public class AMQQueueFactoryTest extends
         configXml.addProperty("store.class", TestableMemoryMessageStore.class.getName());
 
 
-        Broker broker = BrokerTestHelper.createBrokerMock();
+        _broker = BrokerTestHelper.createBrokerMock();
         if (getName().equals("testDeadLetterQueueDoesNotInheritDLQorMDCSettings"))
         {
-            when(broker.getAttribute(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS)).thenReturn(5);
-            when(broker.getAttribute(Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED)).thenReturn(true);
+            when(_broker.getAttribute(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS)).thenReturn(5);
+            when(_broker.getAttribute(Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED)).thenReturn(true);
         }
 
-        _virtualHost = BrokerTestHelper.createVirtualHost(new VirtualHostConfiguration(getName(), configXml, broker));
+        _virtualHost = BrokerTestHelper.createVirtualHost(new VirtualHostConfiguration(getName(), configXml, _broker));
 
         _queueRegistry = _virtualHost.getQueueRegistry();
 
@@ -376,6 +380,21 @@ public class AMQQueueFactoryTest extends
         }
     }
 
+    public void testMessageGroupFromConfig() throws Exception
+    {
+
+        PropertiesConfiguration queueConfig = new PropertiesConfiguration();
+        queueConfig.addProperty("queues.queue.test.argument", "qpid.group_header_key=mykey");
+        queueConfig.addProperty("queues.queue.test.argument", "qpid.shared_msg_group=1");
+
+
+        final VirtualHostConfiguration vhostConfig = new VirtualHostConfiguration("test", queueConfig, _broker);;
+        QueueConfiguration qConf = new QueueConfiguration("test", vhostConfig);
+        AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(qConf, _virtualHost);
+        assertEquals("mykey", queue.getArguments().get(SimpleAMQQueue.QPID_GROUP_HEADER_KEY));
+        assertEquals("1", queue.getArguments().get(SimpleAMQQueue.QPID_SHARED_MSG_GROUP));
+    }
+
     private String generateStringWithLength(char ch, int length)
     {
         StringBuilder sb = new StringBuilder();
@@ -385,4 +404,6 @@ public class AMQQueueFactoryTest extends
         }
         return sb.toString();
     }
+
+
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org