You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/01 13:51:52 UTC

svn commit: r1295541 [2/10] - in /qpid/branches/rg-amqp-1-0-sandbox/qpid/java: ./ bdbstore/bin/ bdbstore/etc/scripts/ bdbstore/src/main/java/ bdbstore/src/resources/ bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ broker-plugins/access-...

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/PlainConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/PlainConfigurationTest.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/PlainConfigurationTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/PlainConfigurationTest.java Thu Mar  1 12:51:40 2012
@@ -22,12 +22,19 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileWriter;
 import java.io.PrintWriter;
+import java.util.Map;
 
 import junit.framework.TestCase;
 
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.server.security.access.ObjectProperties;
+import org.apache.qpid.server.security.access.ObjectProperties.Property;
+import org.apache.qpid.server.security.access.ObjectType;
+import org.apache.qpid.server.security.access.Operation;
 import org.apache.qpid.server.security.access.config.ConfigurationFile;
 import org.apache.qpid.server.security.access.config.PlainConfiguration;
+import org.apache.qpid.server.security.access.config.Rule;
+import org.apache.qpid.server.security.access.config.RuleSet;
 
 /**
  * These tests check that the ACL file parsing works correctly.
@@ -37,7 +44,7 @@ import org.apache.qpid.server.security.a
  */
 public class PlainConfigurationTest extends TestCase
 {
-    public void writeACLConfig(String...aclData) throws Exception
+    private PlainConfiguration writeACLConfig(String...aclData) throws Exception
     {
         File acl = File.createTempFile(getClass().getName() + getName(), "acl");
         acl.deleteOnExit();
@@ -51,8 +58,9 @@ public class PlainConfigurationTest exte
         aclWriter.close();
 
         // Load ruleset
-        ConfigurationFile configFile = new PlainConfiguration(acl);
+        PlainConfiguration configFile = new PlainConfiguration(acl);
         configFile.load();
+        return configFile;
     }
 
     public void testMissingACLConfig() throws Exception
@@ -191,4 +199,197 @@ public class PlainConfigurationTest exte
             assertEquals(String.format(PlainConfiguration.PROPERTY_NO_VALUE_MSG, 1), ce.getMessage());
         }
     }
+
+    /**
+     * Tests interpretation of an acl rule with no object properties.
+     *
+     */
+    public void testValidRule() throws Exception
+    {
+        final PlainConfiguration config = writeACLConfig("ACL DENY-LOG user1 ACCESS VIRTUALHOST");
+        final RuleSet rs = config.getConfiguration();
+        assertEquals(1, rs.getRuleCount());
+
+        final Map<Integer, Rule> rules = rs.getAllRules();
+        assertEquals(1, rules.size());
+        final Rule rule = rules.get(0);
+        assertEquals("Rule has unexpected identity", "user1", rule.getIdentity());
+        assertEquals("Rule has unexpected operation", Operation.ACCESS, rule.getAction().getOperation());
+        assertEquals("Rule has unexpected operation", ObjectType.VIRTUALHOST, rule.getAction().getObjectType());
+        assertEquals("Rule has unexpected object properties", ObjectProperties.EMPTY, rule.getAction().getProperties());
+    }
+
+    /**
+     * Tests interpretation of an acl rule with object properties quoted in single quotes.
+     */
+    public void testValidRuleWithSingleQuotedProperty() throws Exception
+    {
+        final PlainConfiguration config = writeACLConfig("ACL ALLOW all CREATE EXCHANGE name = \'value\'");
+        final RuleSet rs = config.getConfiguration();
+        assertEquals(1, rs.getRuleCount());
+
+        final Map<Integer, Rule> rules = rs.getAllRules();
+        assertEquals(1, rules.size());
+        final Rule rule = rules.get(0);
+        assertEquals("Rule has unexpected identity", "all", rule.getIdentity());
+        assertEquals("Rule has unexpected operation", Operation.CREATE, rule.getAction().getOperation());
+        assertEquals("Rule has unexpected operation", ObjectType.EXCHANGE, rule.getAction().getObjectType());
+        final ObjectProperties expectedProperties = new ObjectProperties();
+        expectedProperties.setName("value");
+        assertEquals("Rule has unexpected object properties", expectedProperties, rule.getAction().getProperties());
+    }
+
+    /**
+     * Tests interpretation of an acl rule with object properties quoted in double quotes.
+     */
+    public void testValidRuleWithDoubleQuotedProperty() throws Exception
+    {
+        final PlainConfiguration config = writeACLConfig("ACL ALLOW all CREATE EXCHANGE name = \"value\"");
+        final RuleSet rs = config.getConfiguration();
+        assertEquals(1, rs.getRuleCount());
+
+        final Map<Integer, Rule> rules = rs.getAllRules();
+        assertEquals(1, rules.size());
+        final Rule rule = rules.get(0);
+        assertEquals("Rule has unexpected identity", "all", rule.getIdentity());
+        assertEquals("Rule has unexpected operation", Operation.CREATE, rule.getAction().getOperation());
+        assertEquals("Rule has unexpected operation", ObjectType.EXCHANGE, rule.getAction().getObjectType());
+        final ObjectProperties expectedProperties = new ObjectProperties();
+        expectedProperties.setName("value");
+        assertEquals("Rule has unexpected object properties", expectedProperties, rule.getAction().getProperties());
+    }
+
+    /**
+     * Tests interpretation of an acl rule with many object properties.
+     */
+    public void testValidRuleWithManyProperties() throws Exception
+    {
+        final PlainConfiguration config = writeACLConfig("ACL ALLOW admin DELETE QUEUE name=name1 owner = owner1");
+        final RuleSet rs = config.getConfiguration();
+        assertEquals(1, rs.getRuleCount());
+
+        final Map<Integer, Rule> rules = rs.getAllRules();
+        assertEquals(1, rules.size());
+        final Rule rule = rules.get(0);
+        assertEquals("Rule has unexpected identity", "admin", rule.getIdentity());
+        assertEquals("Rule has unexpected operation", Operation.DELETE, rule.getAction().getOperation());
+        assertEquals("Rule has unexpected operation", ObjectType.QUEUE, rule.getAction().getObjectType());
+        final ObjectProperties expectedProperties = new ObjectProperties();
+        expectedProperties.setName("name1");
+        expectedProperties.put(Property.OWNER, "owner1");
+        assertEquals("Rule has unexpected operation", expectedProperties, rule.getAction().getProperties());
+    }
+
+    /**
+     * Tests interpretation of an acl rule with object properties containing wildcards.  Values containing
+     * hashes must be quoted otherwise they are interpreted as comments.
+     */
+    public void testValidRuleWithWildcardProperties() throws Exception
+    {
+        final PlainConfiguration config = writeACLConfig("ACL ALLOW all CREATE EXCHANGE routingKey = \'news.#\'",
+                                                         "ACL ALLOW all CREATE EXCHANGE routingKey = \'news.co.#\'",
+                                                         "ACL ALLOW all CREATE EXCHANGE routingKey = *.co.medellin");
+        final RuleSet rs = config.getConfiguration();
+        assertEquals(3, rs.getRuleCount());
+
+        final Map<Integer, Rule> rules = rs.getAllRules();
+        assertEquals(3, rules.size());
+        final Rule rule1 = rules.get(0);
+        assertEquals("Rule has unexpected identity", "all", rule1.getIdentity());
+        assertEquals("Rule has unexpected operation", Operation.CREATE, rule1.getAction().getOperation());
+        assertEquals("Rule has unexpected operation", ObjectType.EXCHANGE, rule1.getAction().getObjectType());
+        final ObjectProperties expectedProperties1 = new ObjectProperties();
+        expectedProperties1.put(Property.ROUTING_KEY,"news.#");
+        assertEquals("Rule has unexpected object properties", expectedProperties1, rule1.getAction().getProperties());
+
+        final Rule rule2 = rules.get(10);
+        final ObjectProperties expectedProperties2 = new ObjectProperties();
+        expectedProperties2.put(Property.ROUTING_KEY,"news.co.#");
+        assertEquals("Rule has unexpected object properties", expectedProperties2, rule2.getAction().getProperties());
+
+        final Rule rule3 = rules.get(20);
+        final ObjectProperties expectedProperties3 = new ObjectProperties();
+        expectedProperties3.put(Property.ROUTING_KEY,"*.co.medellin");
+        assertEquals("Rule has unexpected object properties", expectedProperties3, rule3.getAction().getProperties());
+    }
+
+    /**
+     * Tests that rules are case insignificant.
+     */
+    public void testMixedCaseRuleInterpretation() throws Exception
+    {
+        final PlainConfiguration config = writeACLConfig("AcL deny-LOG user1 BiND Exchange name=AmQ.dIrect");
+        final RuleSet rs = config.getConfiguration();
+        assertEquals(1, rs.getRuleCount());
+
+        final Map<Integer, Rule> rules = rs.getAllRules();
+        assertEquals(1, rules.size());
+        final Rule rule = rules.get(0);
+        assertEquals("Rule has unexpected identity", "user1", rule.getIdentity());
+        assertEquals("Rule has unexpected operation", Operation.BIND, rule.getAction().getOperation());
+        assertEquals("Rule has unexpected operation", ObjectType.EXCHANGE, rule.getAction().getObjectType());
+        final ObjectProperties expectedProperties = new ObjectProperties("amq.direct");
+        assertEquals("Rule has unexpected object properties", expectedProperties, rule.getAction().getProperties());
+    }
+
+    /**
+     * Tests whitespace is supported. Note that currently the Java implementation permits comments to
+     * be introduced anywhere in the ACL, whereas the C++ supports only whitespace at the beginning of
+     * of line.
+     */
+    public void testCommentsSuppported() throws Exception
+    {
+        final PlainConfiguration config = writeACLConfig("#Comment",
+                                                         "ACL DENY-LOG user1 ACCESS VIRTUALHOST # another comment",
+                                                         "  # final comment with leading whitespace");
+        final RuleSet rs = config.getConfiguration();
+        assertEquals(1, rs.getRuleCount());
+
+        final Map<Integer, Rule> rules = rs.getAllRules();
+        assertEquals(1, rules.size());
+        final Rule rule = rules.get(0);
+        assertEquals("Rule has unexpected identity", "user1", rule.getIdentity());
+        assertEquals("Rule has unexpected operation", Operation.ACCESS, rule.getAction().getOperation());
+        assertEquals("Rule has unexpected operation", ObjectType.VIRTUALHOST, rule.getAction().getObjectType());
+        assertEquals("Rule has unexpected object properties", ObjectProperties.EMPTY, rule.getAction().getProperties());
+    }
+
+    /**
+     * Tests interpretation of an acl rule using mixtures of tabs/spaces as token separators.
+     *
+     */
+    public void testWhitespace() throws Exception
+    {
+        final PlainConfiguration config = writeACLConfig("ACL\tDENY-LOG\t\t user1\t \tACCESS VIRTUALHOST");
+        final RuleSet rs = config.getConfiguration();
+        assertEquals(1, rs.getRuleCount());
+
+        final Map<Integer, Rule> rules = rs.getAllRules();
+        assertEquals(1, rules.size());
+        final Rule rule = rules.get(0);
+        assertEquals("Rule has unexpected identity", "user1", rule.getIdentity());
+        assertEquals("Rule has unexpected operation", Operation.ACCESS, rule.getAction().getOperation());
+        assertEquals("Rule has unexpected operation", ObjectType.VIRTUALHOST, rule.getAction().getObjectType());
+        assertEquals("Rule has unexpected object properties", ObjectProperties.EMPTY, rule.getAction().getProperties());
+    }
+
+    /**
+     * Tests interpretation of an acl utilising line continuation.
+     */
+    public void testLineContination() throws Exception
+    {
+        final PlainConfiguration config = writeACLConfig("ACL DENY-LOG user1 \\",
+                                                         "ACCESS VIRTUALHOST");
+        final RuleSet rs = config.getConfiguration();
+        assertEquals(1, rs.getRuleCount());
+
+        final Map<Integer, Rule> rules = rs.getAllRules();
+        assertEquals(1, rules.size());
+        final Rule rule = rules.get(0);
+        assertEquals("Rule has unexpected identity", "user1", rule.getIdentity());
+        assertEquals("Rule has unexpected operation", Operation.ACCESS, rule.getAction().getOperation());
+        assertEquals("Rule has unexpected operation", ObjectType.VIRTUALHOST, rule.getAction().getObjectType());
+        assertEquals("Rule has unexpected object properties", ObjectProperties.EMPTY, rule.getAction().getProperties());
+    }
+
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/RuleSetTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/RuleSetTest.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/RuleSetTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/RuleSetTest.java Thu Mar  1 12:51:40 2012
@@ -69,7 +69,6 @@ public class RuleSetTest extends QpidTes
         super.setUp();
 
         _ruleSet = new RuleSet();
-        _ruleSet.configure(RuleSet.TRANSITIVE, Boolean.FALSE);
     }
 
     @Override

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/MANIFEST.MF
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/MANIFEST.MF?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/MANIFEST.MF (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/MANIFEST.MF Thu Mar  1 12:51:40 2012
@@ -9,7 +9,8 @@ Bundle-Version: 1.0.0
 Bundle-Activator: org.apache.qpid.shutdown.Activator
 Import-Package: javax.management;resolution:=optional,
  org.apache.log4j,
- org.osgi.framework
+ org.osgi.framework,
+ org.apache.qpid.server.management
 Bundle-RequiredExecutionEnvironment: J2SE-1.5
 Bundle-ActivationPolicy: lazy
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/build.xml?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/build.xml (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/build.xml Thu Mar  1 12:51:40 2012
@@ -20,7 +20,7 @@
  -->
 <project name="AMQ Broker Shutdown Plugin" default="build">
 
-    <property name="module.depends" value="common broker broker-plugins"/>
+    <property name="module.depends" value="common broker management/common broker-plugins"/>
     <property name="module.test.depends" value="test broker/test management/common client systests"/>
     <property name="module.manifest" value="MANIFEST.MF"/>
     <property name="module.plugin" value="true"/>

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Activator.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Activator.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Activator.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Activator.java Thu Mar  1 12:51:40 2012
@@ -19,11 +19,6 @@
  */
 package org.apache.qpid.shutdown;
 
-import java.lang.management.ManagementFactory;
-
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 
 import org.apache.log4j.Logger;
 import org.osgi.framework.BundleActivator;
@@ -33,20 +28,17 @@ public class Activator implements Bundle
 {
     private static final Logger _logger = Logger.getLogger(Activator.class);
 
-    private static final String SHUTDOWN_MBEAN_NAME = "org.apache.qpid:type=ShutdownMBean";
+    private Shutdown _shutdown = null;
 
     /** @see org.osgi.framework.BundleActivator#start(org.osgi.framework.BundleContext) */
     public void start(BundleContext ctx) throws Exception {
-        Shutdown shutdown = new Shutdown();
+        _shutdown = new Shutdown();
         if (ctx != null)
         {
-            ctx.registerService(ShutdownMBean.class.getName(), shutdown, null);
+            ctx.registerService(ShutdownMBean.class.getName(), _shutdown, null);
         }
 
-        // MBean registration
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        ObjectName name = new ObjectName(SHUTDOWN_MBEAN_NAME);
-        mbs.registerMBean(shutdown, name);
+        _shutdown.register();
 
         _logger.info("Shutdown plugin MBean registered");
     }
@@ -54,16 +46,10 @@ public class Activator implements Bundle
     /** @see org.osgi.framework.BundleActivator#stop(org.osgi.framework.BundleContext) */
     public void stop(BundleContext ctx) throws Exception
     {
-        // Unregister MBean
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        ObjectName name = new ObjectName(SHUTDOWN_MBEAN_NAME);
-        try
-        {
-            mbs.unregisterMBean(name);
-        }
-        catch (InstanceNotFoundException e)
+        if (_shutdown != null)
         {
-            //ignore
+            _shutdown.unregister();
+            _shutdown = null;
         }
 
         _logger.info("Shutdown plugin MBean unregistered");

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Shutdown.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Shutdown.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Shutdown.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Shutdown.java Thu Mar  1 12:51:40 2012
@@ -27,21 +27,30 @@ import java.util.concurrent.ScheduledExe
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import javax.management.NotCompliantMBeanException;
+
 import org.apache.log4j.Logger;
+import org.apache.qpid.server.management.DefaultManagedObject;
 
 /**
  * Implementation of the JMX broker shutdown plugin.
  */
-public class Shutdown implements ShutdownMBean
+public class Shutdown extends DefaultManagedObject implements ShutdownMBean
 {
+
     private static final Logger _logger = Logger.getLogger(Shutdown.class);
 
-    private static final String FORMAT = "yyyyy/MM/dd hh:mm:ss";
+    private static final String FORMAT = "yyyy/MM/dd HH:mm:ss";
     private static final int THREAD_COUNT = 1;
     private static final ScheduledExecutorService EXECUTOR = new ScheduledThreadPoolExecutor(THREAD_COUNT);
 
     private final Runnable _shutdown = new SystemExiter();
 
+    public Shutdown() throws NotCompliantMBeanException
+    {
+        super(ShutdownMBean.class, ShutdownMBean.TYPE);
+    }
+
     /** @see ShutdownMBean#shutdown() */
     public void shutdown()
     {
@@ -50,14 +59,22 @@ public class Shutdown implements Shutdow
     }
 
     /** @see ShutdownMBean#shutdown(long) */
-    public void shutdown(long delay)
+    public void shutdown(final long delay)
     {
-        _logger.info("Scheduled broker shutdown after " + delay + "ms");
-        shutdownBroker(delay);
+        if (delay < 0)
+        {
+            _logger.info("Shutting down at user's request");
+            shutdownBroker(0);
+        }
+        else
+        {
+            _logger.info("Scheduled broker shutdown after " + delay + "ms");
+            shutdownBroker(delay);
+        }
     }
 
     /** @see ShutdownMBean#shutdownAt(String) */
-    public void shutdownAt(String when)
+    public void shutdownAt(final String when)
     {
         Date date;
         DateFormat df = new SimpleDateFormat(FORMAT);
@@ -101,4 +118,13 @@ public class Shutdown implements Shutdow
             System.exit(0);
         }
     }
+
+    /**
+     * @see org.apache.qpid.server.management.ManagedObject#getObjectInstanceName()
+     */
+    @Override
+    public String getObjectInstanceName()
+    {
+        return "Shutdown";
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/ShutdownMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/ShutdownMBean.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/ShutdownMBean.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/ShutdownMBean.java Thu Mar  1 12:51:40 2012
@@ -19,6 +19,9 @@
  */
 package org.apache.qpid.shutdown;
 
+import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParameter;
+
 /**
  * Shutdown plugin JMX MBean interface.
  * 
@@ -26,9 +29,12 @@ package org.apache.qpid.shutdown;
  */
 public interface ShutdownMBean
 {
+    static final String TYPE = "Shutdown";
+
     /**
      * Broker will be shut down immediately.
      */
+    @MBeanOperation(name="shutdown", description="Shut down immediately")
     public void shutdown();
 
     /**
@@ -36,12 +42,14 @@ public interface ShutdownMBean
      * 
      * @param delay the number of ms to wait
      */
-    public void shutdown(long delay);
+    @MBeanOperation(name="shutdown", description="Shutdown after the specified delay (ms)")
+    public void shutdown(@MBeanOperationParameter(name="when", description="delay (ms)")long delay);
 
     /**
      * Broker will be shutdown at the specified date and time.
      * 
      * @param when the date and time to shutdown
      */
-    public void shutdownAt(String when);
+    @MBeanOperation(name="shutdownAt", description="Shutdown at the specified date and time (yyyy/MM/dd HH:mm:ss)")
+    public void shutdownAt(@MBeanOperationParameter(name="when", description="shutdown date/time (yyyy/MM/dd HH:mm:ss)")String when);
 }

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/bin/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar  1 12:51:40 2012
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/broker/bin:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/broker/bin:805429-821809
 /qpid/branches/qpid-2935/qpid/java/broker/bin:1061302-1072333
-/qpid/trunk/qpid/java/broker/bin:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1157765,1160415-1162726,1162729-1166086,1166089-1200000
+/qpid/trunk/qpid/java/broker/bin:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1157765,1160415-1162726,1162729-1166086,1166089-1225000

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/build.xml?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/build.xml (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/build.xml Thu Mar  1 12:51:40 2012
@@ -72,14 +72,26 @@
         <fixcrlf srcdir="${module.release}/bin" fixlast="true" eol="dos" includes="*.bat"/>
     </target>
 
-    <target name="release-bin-other" description="copy broker-plugins into module release">
+    <target name="release-bin-other" depends="release-bin-other-bdbstore" description="copy broker-plugins into module release">
         <copy todir="${module.release}/lib/plugins" failonerror="true">
             <fileset dir="${build.lib}/plugins"/>
         </copy>
-        <!--copy optional bdbstore module if it exists -->
-        <copy todir="${module.release}/lib/" failonerror="false">
+    </target>
+
+    <target name="release-bin-other-bdbstore" depends="check-bdbstore-requested" if="bdbstore-requested"
+                                         description="copy bdbstore items into module release">
+        <copy todir="${module.release}/lib/" failonerror="true">
             <fileset file="${build.lib}/${project.name}-bdbstore-${project.version}.jar"/>
         </copy>
+        <copy todir="${module.release}/bin" failonerror="true" flatten="true">
+            <fileset dir="${basedir}/../bdbstore/bin"/>
+        </copy>
+    </target>
+
+    <target name="check-bdbstore-requested">
+         <condition property="bdbstore-requested">
+             <contains string="${modules.opt}" substring="bdbstore"/>
+         </condition>
     </target>
 
     <target name="release-bin" depends="release-bin-tasks"/>

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/etc/config.xml
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/etc/config.xml?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/etc/config.xml (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/etc/config.xml Thu Mar  1 12:51:40 2012
@@ -80,7 +80,12 @@
             </principal-database>
         </pd-auth-manager>
 
-        <allow-all />
+        <!-- By default, all authenticated users have permissions to perform all actions -->
+
+        <!-- ACL Example
+             This example illustrates securing the both Management (JMX) and Messaging.
+        <acl>${conf}/broker_example.acl</acl>
+        -->
         
         <msg-auth>false</msg-auth>
     </security>

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/etc/md5passwd
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/etc/md5passwd?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/etc/md5passwd (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/etc/md5passwd Thu Mar  1 12:51:40 2012
@@ -17,5 +17,6 @@
 # under the License.
 #
 guest:CE4DQ6BIb/BVMN9scFyLtA==
+client:CE4DQ6BIb/BVMN9scFyLtA==
+server:CE4DQ6BIb/BVMN9scFyLtA==
 admin:ISMvKXpXpadDiUoOSoAfww==
-user:aBzonUodYLhwSa8s9A10sA==

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Thu Mar  1 12:51:40 2012
@@ -20,8 +20,8 @@ package org.apache.qpid.server;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import javax.management.JMException;
 import javax.management.MBeanException;
@@ -30,6 +30,7 @@ import javax.management.ObjectName;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.management.common.mbeans.ManagedBroker;
 import org.apache.qpid.management.common.mbeans.ManagedQueue;
 import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
@@ -60,6 +61,7 @@ public class AMQBrokerManagerMBean exten
     private final QueueRegistry _queueRegistry;
     private final ExchangeRegistry _exchangeRegistry;
     private final ExchangeFactory _exchangeFactory;
+    private final Exchange _defaultExchange;
     private final DurableConfigurationStore _durableConfig;
 
     private final VirtualHostImpl.VirtualHostMBean _virtualHostMBean;
@@ -74,6 +76,7 @@ public class AMQBrokerManagerMBean exten
 
         _queueRegistry = virtualHost.getQueueRegistry();
         _exchangeRegistry = virtualHost.getExchangeRegistry();
+        _defaultExchange = _exchangeRegistry.getDefaultExchange();
         _durableConfig = virtualHost.getDurableConfigurationStore();
         _exchangeFactory = virtualHost.getExchangeFactory();
     }
@@ -241,7 +244,13 @@ public class AMQBrokerManagerMBean exten
      */
     public void createNewQueue(String queueName, String owner, boolean durable) throws JMException, MBeanException
     {
-        AMQQueue queue = _queueRegistry.getQueue(new AMQShortString(queueName));
+        createNewQueue(queueName, owner, durable, null);
+    }
+
+    public void createNewQueue(String queueName, String owner, boolean durable, Map<String,Object> arguments) throws JMException
+    {
+        final AMQShortString queueNameAsAMQShortString = new AMQShortString(queueName);
+        AMQQueue queue = _queueRegistry.getQueue(queueNameAsAMQShortString);
         if (queue != null)
         {
             throw new JMException("The queue \"" + queueName + "\" already exists.");
@@ -256,13 +265,21 @@ public class AMQBrokerManagerMBean exten
                 ownerShortString = new AMQShortString(owner);
             }
 
-            queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), durable, ownerShortString, false, false, getVirtualHost(), null);
+            FieldTable args = null;
+            if(arguments != null)
+            {
+                args = FieldTable.convertToFieldTable(arguments);
+            }
+            final VirtualHost virtualHost = getVirtualHost();
+
+            queue = AMQQueueFactory.createAMQQueueImpl(queueNameAsAMQShortString, durable, ownerShortString,
+                                                       false, false, getVirtualHost(), args);
             if (queue.isDurable() && !queue.isAutoDelete())
             {
-                _durableConfig.createQueue(queue);
+                _durableConfig.createQueue(queue, args);
             }
 
-            _queueRegistry.registerQueue(queue);
+            virtualHost.getBindingFactory().addBinding(queueName, queue, _defaultExchange, null);
         }
         catch (AMQException ex)
         {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Mar  1 12:51:40 2012
@@ -22,7 +22,6 @@ package org.apache.qpid.server;
 
 import org.apache.log4j.Logger;
 
-import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.framing.AMQMethodBody;
@@ -53,6 +52,7 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
 import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.message.MessageMetaData;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
@@ -63,6 +63,7 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.InboundMessageAdapter;
 import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -693,6 +694,31 @@ public class AMQChannel implements Sessi
 
     }
 
+    public boolean isMaxDeliveryCountEnabled(final long deliveryTag)
+    {
+        final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
+        if (queueEntry != null)
+        {
+            final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
+            return maximumDeliveryCount > 0;
+        }
+
+        return false;
+    }
+
+    public boolean isDeliveredTooManyTimes(final long deliveryTag)
+    {
+        final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
+        if (queueEntry != null)
+        {
+            final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
+            final int numDeliveries = queueEntry.getDeliveryCount();
+            return maximumDeliveryCount != 0 && numDeliveries >= maximumDeliveryCount;
+        }
+
+        return false;
+    }
+
     /**
      * Called to resend all outstanding unacknowledged messages to this same channel.
      *
@@ -740,9 +766,9 @@ public class AMQChannel implements Sessi
             QueueEntry message = entry.getValue();
             long deliveryTag = entry.getKey();
 
+            //Amend the delivery counter as the client hasn't seen these messages yet.
+            message.decrementDeliveryCount();
 
-
-            ServerMessage msg = message.getMessage();
             AMQQueue queue = message.getQueue();
 
             // Our Java Client will always suspend the channel when resending!
@@ -800,6 +826,10 @@ public class AMQChannel implements Sessi
         {
             QueueEntry message = entry.getValue();
             long deliveryTag = entry.getKey();
+
+            //Amend the delivery counter as the client hasn't seen these messages yet.
+            message.decrementDeliveryCount();
+
             _unacknowledgedMessageMap.remove(deliveryTag);
 
             message.setRedelivered();
@@ -1060,6 +1090,7 @@ public class AMQChannel implements Sessi
                 getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(),
                                                                                deliveryTag,
                                                                                ((SubscriptionImpl)sub).getConsumerTag());
+                entry.incrementDeliveryCount();
             }
 
         };
@@ -1248,7 +1279,6 @@ public class AMQChannel implements Sessi
     {
         private final Collection<QueueEntry> _ackedMessages;
 
-
         public MessageAcknowledgeAction(Collection<QueueEntry> ackedMessages)
         {
             _ackedMessages = ackedMessages;
@@ -1481,4 +1511,54 @@ public class AMQChannel implements Sessi
             }
         }
     }
+
+    public void deadLetter(long deliveryTag) throws AMQException
+    {
+        final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
+        final QueueEntry rejectedQueueEntry = unackedMap.get(deliveryTag);
+
+        if (rejectedQueueEntry == null)
+        {
+            _logger.warn("No message found, unable to DLQ delivery tag: " + deliveryTag);
+            return;
+        }
+        else
+        {
+            final ServerMessage msg = rejectedQueueEntry.getMessage();
+
+            final AMQQueue queue = rejectedQueueEntry.getQueue();
+
+            final Exchange altExchange = queue.getAlternateExchange();
+            unackedMap.remove(deliveryTag);
+
+            if (altExchange == null)
+            {
+                _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
+                _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
+                rejectedQueueEntry.discard();
+                return;
+            }
+
+            final InboundMessage m = new InboundMessageAdapter(rejectedQueueEntry);
+
+            final ArrayList<? extends BaseQueue> destinationQueues = altExchange.route(m);
+
+            if (destinationQueues == null || destinationQueues.isEmpty())
+            {
+                _logger.debug("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag);
+                _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
+                rejectedQueueEntry.discard();
+                return;
+            }
+
+            rejectedQueueEntry.routeToAlternate();
+
+            //output operational logging for each delivery post commit
+            for (final BaseQueue destinationQueue : destinationQueues)
+            {
+                _actor.message(_logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getNameShortString().asString()));
+            }
+
+        }
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java Thu Mar  1 12:51:40 2012
@@ -170,11 +170,17 @@ public class BindingFactory
         {
             arguments = Collections.emptyMap();
         }
-      
-        //Perform ACLs
-        if (!getVirtualHost().getSecurityManager().authoriseBind(exchange, queue, new AMQShortString(bindingKey)))
+
+        // The default exchange bindings must reflect the existence of queues, allow
+        // all operations on it to succeed. It is up to the broker to prevent illegal
+        // attempts at binding to this exchange, not the ACLs.
+        if(exchange != _defaultExchange)
         {
-            throw new AMQSecurityException("Permission denied: binding " + bindingKey);
+            //Perform ACLs
+            if (!getVirtualHost().getSecurityManager().authoriseBind(exchange, queue, new AMQShortString(bindingKey)))
+            {
+                throw new AMQSecurityException("Permission denied: binding " + bindingKey);
+            }
         }
         
         BindingImpl b = new BindingImpl(bindingKey,queue,exchange,arguments);
@@ -238,10 +244,16 @@ public class BindingFactory
             arguments = Collections.emptyMap();
         }
 
-        // Check access
-        if (!getVirtualHost().getSecurityManager().authoriseUnbind(exchange, new AMQShortString(bindingKey), queue))
+        // The default exchange bindings must reflect the existence of queues, allow
+        // all operations on it to succeed. It is up to the broker to prevent illegal
+        // attempts at binding to this exchange, not the ACLs.
+        if(exchange != _defaultExchange)
         {
-            throw new AMQSecurityException("Permission denied: binding " + bindingKey);
+            // Check access
+            if (!getVirtualHost().getSecurityManager().authoriseUnbind(exchange, new AMQShortString(bindingKey), queue))
+            {
+                throw new AMQSecurityException("Permission denied: unbinding " + bindingKey);
+            }
         }
         
         BindingImpl b = _bindings.remove(new BindingImpl(bindingKey,queue,exchange,arguments));

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java Thu Mar  1 12:51:40 2012
@@ -62,7 +62,10 @@ public class QueueConfiguration extends 
                             "capacity",
                             "flowResumeCapacity",
                             "lvq",
-                            "lvqKey"
+                            "lvqKey",
+                            "sortKey",
+                            "maximumDeliveryCount",
+                            "deadLetterQueues"
         };
     }
 
@@ -167,11 +170,30 @@ public class QueueConfiguration extends 
         return getStringValue("lvqKey", null);
     }
 
+
     public boolean isTopic()
     {
         return getBooleanValue("topic");
     }
 
+    public String getQueueSortKey()
+    {
+        return getStringValue("sortKey", null);
+    }
+
+    public int getMaxDeliveryCount()
+    {
+        return getIntValue("maximumDeliveryCount", _vHostConfig.getMaxDeliveryCount());
+    }
+
+    /**
+     * Check if dead letter queue delivery is enabled, deferring to the virtualhost configuration if not set.
+     */
+    public boolean isDeadLetterQueueEnabled()
+    {
+        return getBooleanValue("deadLetterQueues", _vHostConfig.isDeadLetterQueueEnabled());
+    }
+
 
     public static class QueueConfig extends ConfigurationPlugin
     {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java Thu Mar  1 12:51:40 2012
@@ -40,6 +40,8 @@ import org.apache.commons.configuration.
 import org.apache.commons.configuration.XMLConfiguration;
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.signal.SignalHandlerTask;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -805,4 +807,39 @@ public class ServerConfiguration extends
         final List<String> disabledFeatures = getListValue("disabledFeatures", Collections.emptyList());
         return disabledFeatures;
     }
+
+    public boolean getManagementRightsInferAllAccess()
+    {
+        return getBooleanValue("management.managementRightsInferAllAccess", true);
+    }
+
+    public int getMaxDeliveryCount()
+    {
+        return getConfig().getInt("maximumDeliveryCount", 0);
+    }
+
+    /**
+     * Check if dead letter queue delivery is enabled, defaults to disabled if not set.
+     */
+    public boolean isDeadLetterQueueEnabled()
+    {
+        return getConfig().getBoolean("deadLetterQueues", false);
+    }
+
+    /**
+     * String to affix to end of queue name when generating an alternate exchange for DLQ purposes.
+     */
+    public String getDeadLetterExchangeSuffix()
+    {
+        return getConfig().getString("deadLetterExchangeSuffix", DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+    }
+
+    /**
+     * String to affix to end of queue name when generating a queue for DLQ purposes.
+     */
+    public String getDeadLetterQueueSuffix()
+    {
+        return getConfig().getString("deadLetterQueueSuffix", AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+    }
+
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Thu Mar  1 12:51:40 2012
@@ -347,4 +347,18 @@ public class VirtualHostConfiguration ex
     {
         return getLongValue("transactionTimeout.idleClose", 0L);
     }
+
+    public int getMaxDeliveryCount()
+    {
+        return getIntValue("queues.maximumDeliveryCount", ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount());
+    }
+
+    /**
+     * Check if dead letter queue delivery is enabled, deferring to the broker configuration if not set.
+     */
+    public boolean isDeadLetterQueueEnabled()
+    {
+        return getBooleanValue("queues.deadLetterQueues", ApplicationRegistry.getInstance().getConfiguration().isDeadLetterQueueEnabled());
+    }
+
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java Thu Mar  1 12:51:40 2012
@@ -87,6 +87,19 @@ public class SlowConsumerDetectionQueueC
     @Override
     public void validateConfiguration() throws ConfigurationException
     {
+        PluginManager pluginManager;
+        try
+        {
+            pluginManager = ApplicationRegistry.getInstance().getPluginManager();
+        }
+        catch (IllegalStateException ise)
+        {
+            // We see this happen during shutdown due to asynchronous reconfig performed IO threads
+            // running at the same time as the shutdown handler.
+            _policyPlugin = null;
+            return;
+        }
+
         if (!containsPositiveLong("messageAge") &&
             !containsPositiveLong("depth") &&
             !containsPositiveLong("messageCount"))
@@ -96,8 +109,6 @@ public class SlowConsumerDetectionQueueC
         }
 
         SlowConsumerDetectionPolicyConfiguration policyConfig = getConfiguration(SlowConsumerDetectionPolicyConfiguration.class.getName());
-
-        PluginManager pluginManager = ApplicationRegistry.getInstance().getPluginManager();
         Map<String, SlowConsumerPolicyPluginFactory> factories = pluginManager.getSlowConsumerPlugins();
 
         if (policyConfig == null)

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java Thu Mar  1 12:51:40 2012
@@ -40,6 +40,7 @@ import org.apache.qpid.server.virtualhos
 public class DefaultExchangeFactory implements ExchangeFactory
 {
     private static final Logger _logger = Logger.getLogger(DefaultExchangeFactory.class);
+    public static final String DEFAULT_DLE_NAME_SUFFIX = "_DLE";
 
     private Map<AMQShortString, ExchangeType<? extends Exchange>> _exchangeClassMap = new HashMap<AMQShortString, ExchangeType<? extends Exchange>>();
     private final VirtualHost _host;
@@ -122,7 +123,7 @@ public class DefaultExchangeFactory impl
                 if (exchangeType == null)
                 {
                     _logger.error("No such custom exchange class found: \""+String.valueOf(className)+"\"");
-                    return;
+                    continue;
                 }
                 Class<? extends ExchangeType> exchangeTypeClass = exchangeType.getClass();
                 ExchangeType<? extends ExchangeType> type = exchangeTypeClass.newInstance();

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Thu Mar  1 12:51:40 2012
@@ -27,7 +27,9 @@ import java.util.concurrent.ConcurrentMa
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -87,12 +89,22 @@ public class DefaultExchangeRegistry imp
 
     public void unregisterExchange(AMQShortString name, boolean inUse) throws AMQException
     {
-        // Check access
-        if (!_host.getSecurityManager().authoriseDelete(_exchangeMap.get(name)))
+        final Exchange exchange = _exchangeMap.get(name);
+        if (exchange == null)
+        {
+            throw new AMQException(AMQConstant.NOT_FOUND, "Unknown exchange " + name, null);
+        }
+
+        if (ExchangeDefaults.DEFAULT_EXCHANGE_NAME.equals(name))
+        {
+            throw new AMQException(AMQConstant.NOT_ALLOWED, "Cannot unregister the default exchange", null);
+        }
+
+        if (!_host.getSecurityManager().authoriseDelete(exchange))
         {
             throw new AMQSecurityException();
         }
-        
+
         // TODO: check inUse argument
 
         Exchange e = _exchangeMap.remove(name);

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Thu Mar  1 12:51:40 2012
@@ -117,7 +117,7 @@ public class FanoutExchange extends Abst
 
     public boolean isBound(AMQShortString routingKey, AMQQueue queue)
     {
-        return _queues.contains(queue);
+        return _queues.containsKey(queue);
     }
 
     public boolean isBound(AMQShortString routingKey)
@@ -129,7 +129,7 @@ public class FanoutExchange extends Abst
     public boolean isBound(AMQQueue queue)
     {
 
-        return _queues.contains(queue);
+        return _queues.containsKey(queue);
     }
 
     public boolean hasBindings()

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeMBean.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeMBean.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeMBean.java Thu Mar  1 12:51:40 2012
@@ -20,12 +20,21 @@
  */
 package org.apache.qpid.server.exchange;
 
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
 import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
 import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.ManagementActor;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import javax.management.JMException;
+import javax.management.MBeanException;
 import javax.management.openmbean.*;
+
+import java.util.HashMap;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Map;
@@ -94,5 +103,48 @@ final class HeadersExchangeMBean extends
         return bindingList;
     }
 
+    @Override
+    public void createNewBinding(String queueName, String binding) throws JMException
+    {
+        VirtualHost vhost = getExchange().getVirtualHost();
+        AMQQueue queue = vhost.getQueueRegistry().getQueue(new AMQShortString(queueName));
+        if (queue == null)
+        {
+            throw new JMException("Queue \"" + queueName + "\" is not registered with the virtualhost.");
+        }
+
+        CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger()));
+
+        final Map<String,Object> arguments = new HashMap<String, Object>();
+        final String[] bindings = binding.split(",");
+        for (int i = 0; i < bindings.length; i++)
+        {
+            final String[] keyAndValue = bindings[i].split("=");
+            if (keyAndValue == null || keyAndValue.length == 0 || keyAndValue.length > 2 || keyAndValue[0].length() == 0)
+            {
+                throw new JMException("Format for headers binding should be \"<attribute1>=<value1>,<attribute2>=<value2>\" ");
+            }
+
+            if(keyAndValue.length == 1)
+            {
+                //no value was given, only a key. Use an empty value to signal match on key presence alone
+                arguments.put(keyAndValue[0], "");
+            }
+            else
+            {
+                arguments.put(keyAndValue[0], keyAndValue[1]);
+            }
+        }
+        try
+        {
+            vhost.getBindingFactory().addBinding(binding,queue,getExchange(),arguments);
+        }
+        catch (AMQException ex)
+        {
+            JMException jme = new JMException(ex.toString());
+            throw new MBeanException(jme, "Error creating new binding " + binding);
+        }
+        CurrentActor.remove();
+    }
 
 } // End of MBean class

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java Thu Mar  1 12:51:40 2012
@@ -1,6 +1,5 @@
 package org.apache.qpid.server.flow;
 
-import org.apache.qpid.server.message.ServerMessage;
 
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -59,9 +58,8 @@ public class BytesOnlyCreditManager exte
         return _bytesCredit.get() > 0L;
     }
 
-    public boolean useCreditForMessage(ServerMessage msg)
+    public boolean useCreditForMessage(long msgSize)
     {
-        final long msgSize = msg.getSize();
         if(hasCredit())
         {
             if(_bytesCredit.addAndGet(-msgSize) >= 0)

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java Thu Mar  1 12:51:40 2012
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.flow;
 
-import org.apache.qpid.server.message.ServerMessage;
 
 public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
 {
@@ -118,7 +117,7 @@ public class CreditCreditManager extends
         return (_bytesCredit != 0L  && _messageCredit != 0L);
     }
 
-    public synchronized boolean useCreditForMessage(final ServerMessage msg)
+    public synchronized boolean useCreditForMessage(long msgSize)
     {
         if(_messageCredit >= 0L)
         {
@@ -130,10 +129,10 @@ public class CreditCreditManager extends
 
                     return true;
                 }
-                else if(msg.getSize() <= _bytesCredit)
+                else if(msgSize <= _bytesCredit)
                 {
                     _messageCredit--;
-                    _bytesCredit -= msg.getSize();
+                    _bytesCredit -= msgSize;
 
                     return true;
                 }
@@ -151,9 +150,9 @@ public class CreditCreditManager extends
         }
         else if(_bytesCredit >= 0L)
         {
-            if(msg.getSize() <= _bytesCredit)
+            if(msgSize <= _bytesCredit)
             {
-                 _bytesCredit -= msg.getSize();
+                 _bytesCredit -= msgSize;
 
                 return true;
             }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java Thu Mar  1 12:51:40 2012
@@ -1,6 +1,5 @@
 package org.apache.qpid.server.flow;
 
-import org.apache.qpid.server.message.ServerMessage;
 
 /*
 *
@@ -41,6 +40,6 @@ public interface FlowCreditManager
 
     public boolean hasCredit();
 
-    public boolean useCreditForMessage(ServerMessage msg);
+    public boolean useCreditForMessage(long msgSize);
 
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java Thu Mar  1 12:51:40 2012
@@ -1,6 +1,5 @@
 package org.apache.qpid.server.flow;
 
-import org.apache.qpid.server.message.ServerMessage;
 
 /*
 *
@@ -47,7 +46,7 @@ public class LimitlessCreditManager exte
         return true;
     }
 
-    public boolean useCreditForMessage(ServerMessage msg)
+    public boolean useCreditForMessage(long msgSize)
     {
         return true;
     }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java Thu Mar  1 12:51:40 2012
@@ -1,6 +1,5 @@
 package org.apache.qpid.server.flow;
 
-import org.apache.qpid.server.message.ServerMessage;
 
 /*
 *
@@ -62,7 +61,7 @@ public class MessageAndBytesCreditManage
         return (_messageCredit > 0L) && ( _bytesCredit > 0L );
     }
 
-    public synchronized boolean useCreditForMessage(ServerMessage msg)
+    public synchronized boolean useCreditForMessage(final long msgSize)
     {
         if(_messageCredit == 0L)
         {
@@ -71,7 +70,6 @@ public class MessageAndBytesCreditManage
         }
         else
         {
-            final long msgSize = msg.getSize();
             if(msgSize > _bytesCredit)
             {
                 setSuspended(true);

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java Thu Mar  1 12:51:40 2012
@@ -1,6 +1,5 @@
 package org.apache.qpid.server.flow;
 
-import org.apache.qpid.server.message.ServerMessage;
 
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -61,7 +60,7 @@ public class MessageOnlyCreditManager ex
         return _messageCredit.get() > 0L;
     }
 
-    public boolean useCreditForMessage(ServerMessage msg)
+    public boolean useCreditForMessage(long msgSize)
     {
         if(hasCredit())
         {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java Thu Mar  1 12:51:40 2012
@@ -20,7 +20,6 @@
 */
 package org.apache.qpid.server.flow;
 
-import org.apache.qpid.server.message.ServerMessage;
 
 public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager
 {
@@ -133,7 +132,7 @@ public class Pre0_10CreditManager extend
                 && (_messageCreditLimit == 0L || _messageCredit > 0);
     }
 
-    public synchronized boolean useCreditForMessage(final ServerMessage msg)
+    public synchronized boolean useCreditForMessage(final long msgSize)
     {
         if(_messageCreditLimit != 0L)
         {
@@ -147,10 +146,10 @@ public class Pre0_10CreditManager extend
                 }
                 else
                 {
-                    if((_bytesCredit >= msg.getSize()) || (_bytesCredit == _bytesCreditLimit))
+                    if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit))
                     {
                         _messageCredit--;
-                        _bytesCredit -= msg.getSize();
+                        _bytesCredit -= msgSize;
 
                         return true;
                     }
@@ -176,9 +175,9 @@ public class Pre0_10CreditManager extend
             }
             else
             {
-                if((_bytesCredit >= msg.getSize()) || (_bytesCredit == _bytesCreditLimit))
+                if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit))
                 {
-                    _bytesCredit -= msg.getSize();
+                    _bytesCredit -= msgSize;
 
                     return true;
                 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java Thu Mar  1 12:51:40 2012
@@ -20,10 +20,12 @@
  */
 package org.apache.qpid.server.flow;
 
-import org.apache.qpid.server.message.ServerMessage;
+import org.apache.log4j.Logger;
 
 public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
 {
+    private static final Logger LOGGER = Logger.getLogger(WindowCreditManager.class);
+
     private volatile long _bytesCreditLimit;
     private volatile long _messageCreditLimit;
 
@@ -43,6 +45,15 @@ public class WindowCreditManager extends
 
     }
 
+    public long getBytesCreditLimit()
+    {
+        return _bytesCreditLimit;
+    }
+
+    public long getMessageCreditLimit()
+    {
+        return _messageCreditLimit;
+    }
 
     public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit)
     {
@@ -70,31 +81,30 @@ public class WindowCreditManager extends
 
     public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
     {
+        _messageUsed -= messageCredit;
+        if(_messageUsed < 0L)
+        {
+            LOGGER.error("Message credit used value was negative: "+ _messageUsed);
+            _messageUsed = 0;
+        }
+
         boolean notifyIncrease = true;
+
         if(_messageCreditLimit > 0L)
         {
             notifyIncrease = (_messageUsed != _messageCreditLimit);
-            _messageUsed -= messageCredit;
-
-            //TODO log warning
-            if(_messageUsed < 0L)
-            {
-                _messageUsed = 0;
-            }
         }
 
-
+        _bytesUsed -= bytesCredit;
+        if(_bytesUsed < 0L)
+        {
+            LOGGER.error("Bytes credit used value was negative: "+ _messageUsed);
+            _bytesUsed = 0;
+        }
 
         if(_bytesCreditLimit > 0L)
         {
             notifyIncrease = notifyIncrease && bytesCredit>0;
-            _bytesUsed -= bytesCredit;
-
-            //TODO log warning
-            if(_bytesUsed < 0L)
-            {
-                _bytesUsed = 0;
-            }
 
             if(notifyIncrease)
             {
@@ -102,10 +112,7 @@ public class WindowCreditManager extends
             }
         }
 
-
-
         setSuspended(!hasCredit());
-
     }
 
 
@@ -116,7 +123,7 @@ public class WindowCreditManager extends
                 && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed);
     }
 
-    public synchronized boolean useCreditForMessage(final ServerMessage msg)
+    public synchronized boolean useCreditForMessage(final long msgSize)
     {
         if(_messageCreditLimit >= 0L)
         {
@@ -128,10 +135,10 @@ public class WindowCreditManager extends
 
                     return true;
                 }
-                else if(_bytesUsed + msg.getSize() <= _bytesCreditLimit)
+                else if(_bytesUsed + msgSize <= _bytesCreditLimit)
                 {
                     _messageUsed++;
-                    _bytesUsed += msg.getSize();
+                    _bytesUsed += msgSize;
 
                     return true;
                 }
@@ -149,9 +156,9 @@ public class WindowCreditManager extends
         }
         else if(_bytesCreditLimit >= 0L)
         {
-            if(_bytesUsed + msg.getSize() <= _bytesCreditLimit)
+            if(_bytesUsed + msgSize <= _bytesCreditLimit)
             {
-                 _bytesUsed += msg.getSize();
+                 _bytesUsed += msgSize;
 
                 return true;
             }
@@ -169,18 +176,6 @@ public class WindowCreditManager extends
 
     }
 
-    public void stop()
-    {
-        if(_bytesCreditLimit > 0)
-        {
-            _bytesCreditLimit = 0;
-        }
-        if(_messageCreditLimit > 0)
-        {
-            _messageCreditLimit = 0;
-        }
-
-    }
 
     public synchronized void addCredit(long count, long bytes)
     {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java Thu Mar  1 12:51:40 2012
@@ -127,16 +127,15 @@ public class BasicGetMethodHandler imple
         final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod()
         {
 
-            int _msg;
-
             public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
             throws AMQException
             {
-                singleMessageCredit.useCreditForMessage(entry.getMessage());
+                singleMessageCredit.useCreditForMessage(entry.getMessage().getSize());
                 if(entry.getMessage() instanceof AMQMessage)
                 {
                     session.getProtocolOutputConverter().writeGetOk(entry, channel.getChannelId(),
                                                                             deliveryTag, queue.getMessageCount());
+                    entry.incrementDeliveryCount();
                 }
                 else
                 {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java Thu Mar  1 12:51:40 2012
@@ -59,7 +59,6 @@ public class BasicRejectMethodHandler im
         {
             _logger.debug("Rejecting:" + body.getDeliveryTag() +
                           ": Requeue:" + body.getRequeue() +
-                          //": Resend:" + evt.getMethod().resend +
                           " on channel:" + channel.debugIdentity());
         }
 
@@ -70,26 +69,23 @@ public class BasicRejectMethodHandler im
         if (message == null)
         {
             _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
-//            throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known");
         }                 
         else
         {
             if (message.isQueueDeleted())
             {
-                _logger.warn("Message's Queue as already been purged, unable to Reject. " +
-                             "Dropping message should use Dead Letter Queue");
+                _logger.warn("Message's Queue has already been purged, dropping message");
                 message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
                 if(message != null)
                 {
                     message.discard();
                 }
-                //sendtoDeadLetterQueue(msg)
                 return;
             }
 
             if (message.getMessage() == null)
             {
-                _logger.warn("Message as already been purged, unable to Reject.");
+                _logger.warn("Message has already been purged, unable to Reject.");
                 return;
             }
 
@@ -98,27 +94,44 @@ public class BasicRejectMethodHandler im
             {
                 _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() +
                               ": Requeue:" + body.getRequeue() +
-                              //": Resend:" + evt.getMethod().resend +
                               " on channel:" + channel.debugIdentity());
             }
 
-            // If we haven't requested message to be resent to this consumer then reject it from ever getting it.
-            //if (!evt.getMethod().resend)
-            {
-                message.reject();
-            }
+            message.reject();
 
             if (body.getRequeue())
             {
                 channel.requeue(deliveryTag);
+
+                //this requeue represents a message rejected from the pre-dispatch queue
+                //therefore we need to amend the delivery counter.
+                message.decrementDeliveryCount();
             }
             else
             {
-                _logger.warn("Dropping message as requeue not required and there is no dead letter queue");
-                 message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
-                //sendtoDeadLetterQueue(AMQMessage message)
-//                message.queue = channel.getDefaultDeadLetterQueue();
-//                channel.requeue(deliveryTag);
+                 final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag);
+                 _logger.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag);
+                 if (maxDeliveryCountEnabled)
+                 {
+                     final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag);
+                     _logger.debug("deliveredTooManyTimes: " + deliveredTooManyTimes + " deliveryTag " + deliveryTag);
+                     if (deliveredTooManyTimes)
+                     {
+                         channel.deadLetter(body.getDeliveryTag());
+                     }
+                     else
+                     {
+                         //this requeue represents a message rejected because of a recover/rollback that we
+                         //are not ready to DLQ. We rely on the reject command to resend from the unacked map
+                         //and therefore need to increment the delivery counter so we cancel out the effect
+                         //of the AMQChannel#resend() decrement.
+                         message.incrementDeliveryCount();
+                     }
+                 }
+                 else
+                 {
+                     channel.deadLetter(body.getDeliveryTag());
+                 }
             }
         }
     }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java Thu Mar  1 12:51:40 2012
@@ -25,8 +25,10 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.framing.QueueUnbindBody;
 import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.exchange.Exchange;
@@ -62,7 +64,7 @@ public class QueueUnbindHandler implemen
 
 
         final AMQQueue queue;
-        final AMQShortString routingKey;               
+        final AMQShortString routingKey;
 
         if (body.getQueue() == null)
         {
@@ -114,10 +116,21 @@ public class QueueUnbindHandler implemen
             _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
         }
 
-        MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry();
-        AMQMethodBody responseBody = methodRegistry.createQueueUnbindOkBody();
+        final MethodRegistry registry = session.getMethodRegistry();
+        final AMQMethodBody responseBody;
+        if (registry instanceof MethodRegistry_0_9)
+        {
+            responseBody = ((MethodRegistry_0_9)registry).createQueueUnbindOkBody();
+        }
+        else if (registry instanceof MethodRegistry_0_91)
+        {
+            responseBody = ((MethodRegistry_0_91)registry).createQueueUnbindOkBody();
+        }
+        else
+        {
+            // 0-8 does not support QueueUnbind
+            throw new AMQException(AMQConstant.COMMAND_INVALID, "QueueUnbind not present in AMQP version: " + session.getProtocolVersion(), null);
+        }
         session.writeFrame(responseBody.generateFrame(channelId));
-
-
     }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Thu Mar  1 12:51:40 2012
@@ -72,7 +72,12 @@ public class TxRollbackHandler implement
             };
 
             channel.rollback(task);
-            
+
+            //Now resend all the unacknowledged messages back to the original subscribers.
+            //(Must be done after the TxnRollback-ok response).
+            // Why, are we not allowed to send messages back to client before the ok method?
+            channel.resend(false);
+
         }
         catch (AMQException e)
         {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties Thu Mar  1 12:51:40 2012
@@ -32,3 +32,7 @@ FLOW_REMOVED = CHN-1006 : Flow Control R
 # 0 - time in milliseconds
 OPEN_TXN = CHN-1007 : Open Transaction : {0,number} ms
 IDLE_TXN = CHN-1008 : Idle Transaction : {0,number} ms
+
+DISCARDMSG_NOALTEXCH = CHN-1009 : Discarded message : {0,number} as no alternate exchange configured for queue : {1} routing key : {2}
+DISCARDMSG_NOROUTE = CHN-1010 : Discarded message : {0,number} as no binding on alternate exchange : {1}
+DEADLETTERMSG = CHN-1011 : Message : {0,number} moved to dead letter queue : {1}

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar  1 12:51:40 2012
@@ -4,4 +4,4 @@
 /qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/management:805429-821809
 /qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/management:787599
 /qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/management:1061302-1072333
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1157765,1160415-1162726,1162729-1166086,1166089-1200000
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1157765,1160415-1162726,1162729-1166086,1166089-1225000

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AMQManagedObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AMQManagedObject.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AMQManagedObject.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AMQManagedObject.java Thu Mar  1 12:51:40 2012
@@ -52,8 +52,6 @@ public abstract class AMQManagedObject e
      */
     protected long _notificationSequenceNumber = 0;
 
-    protected MBeanInfo _mbeanInfo;
-
     protected LogActor _logActor;
 
     protected AMQManagedObject(Class<?> managementInterface, String typeName)
@@ -63,27 +61,8 @@ public abstract class AMQManagedObject e
         // CurrentActor will be defined as these objects are created during
         // broker startup.
         _logActor = new ManagementActor(CurrentActor.get().getRootMessageLogger());
-        buildMBeanInfo();
-    }
-
-    @Override
-    public MBeanInfo getMBeanInfo()
-    {
-        return _mbeanInfo;
-    }
-    
-    private void buildMBeanInfo() throws NotCompliantMBeanException
-    {
-        _mbeanInfo = new MBeanInfo(this.getClass().getName(),
-                      MBeanIntrospector.getMBeanDescription(this.getClass()),
-                      MBeanIntrospector.getMBeanAttributesInfo(getManagementInterface()),
-                      MBeanIntrospector.getMBeanConstructorsInfo(this.getClass()),
-                      MBeanIntrospector.getMBeanOperationsInfo(getManagementInterface()),
-                      this.getNotificationInfo());
     }
 
-
-
     // notification broadcaster implementation
 
     public void addNotificationListener(NotificationListener listener,
@@ -99,8 +78,5 @@ public abstract class AMQManagedObject e
         _broadcaster.removeNotificationListener(listener);
     }
 
-    public MBeanNotificationInfo[] getNotificationInfo()
-    {
-        return null;
-    } 
+
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org