You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/01 13:51:52 UTC
svn commit: r1295541 [2/10] - in
/qpid/branches/rg-amqp-1-0-sandbox/qpid/java: ./ bdbstore/bin/
bdbstore/etc/scripts/ bdbstore/src/main/java/ bdbstore/src/resources/
bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/
broker-plugins/access-...
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/PlainConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/PlainConfigurationTest.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/PlainConfigurationTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/PlainConfigurationTest.java Thu Mar 1 12:51:40 2012
@@ -22,12 +22,19 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.PrintWriter;
+import java.util.Map;
import junit.framework.TestCase;
import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.server.security.access.ObjectProperties;
+import org.apache.qpid.server.security.access.ObjectProperties.Property;
+import org.apache.qpid.server.security.access.ObjectType;
+import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.security.access.config.ConfigurationFile;
import org.apache.qpid.server.security.access.config.PlainConfiguration;
+import org.apache.qpid.server.security.access.config.Rule;
+import org.apache.qpid.server.security.access.config.RuleSet;
/**
* These tests check that the ACL file parsing works correctly.
@@ -37,7 +44,7 @@ import org.apache.qpid.server.security.a
*/
public class PlainConfigurationTest extends TestCase
{
- public void writeACLConfig(String...aclData) throws Exception
+ private PlainConfiguration writeACLConfig(String...aclData) throws Exception
{
File acl = File.createTempFile(getClass().getName() + getName(), "acl");
acl.deleteOnExit();
@@ -51,8 +58,9 @@ public class PlainConfigurationTest exte
aclWriter.close();
// Load ruleset
- ConfigurationFile configFile = new PlainConfiguration(acl);
+ PlainConfiguration configFile = new PlainConfiguration(acl);
configFile.load();
+ return configFile;
}
public void testMissingACLConfig() throws Exception
@@ -191,4 +199,197 @@ public class PlainConfigurationTest exte
assertEquals(String.format(PlainConfiguration.PROPERTY_NO_VALUE_MSG, 1), ce.getMessage());
}
}
+
+ /**
+ * Tests interpretation of an acl rule with no object properties.
+ *
+ */
+ public void testValidRule() throws Exception
+ {
+ final PlainConfiguration config = writeACLConfig("ACL DENY-LOG user1 ACCESS VIRTUALHOST");
+ final RuleSet rs = config.getConfiguration();
+ assertEquals(1, rs.getRuleCount());
+
+ final Map<Integer, Rule> rules = rs.getAllRules();
+ assertEquals(1, rules.size());
+ final Rule rule = rules.get(0);
+ assertEquals("Rule has unexpected identity", "user1", rule.getIdentity());
+ assertEquals("Rule has unexpected operation", Operation.ACCESS, rule.getAction().getOperation());
+ assertEquals("Rule has unexpected operation", ObjectType.VIRTUALHOST, rule.getAction().getObjectType());
+ assertEquals("Rule has unexpected object properties", ObjectProperties.EMPTY, rule.getAction().getProperties());
+ }
+
+ /**
+ * Tests interpretation of an acl rule with object properties quoted in single quotes.
+ */
+ public void testValidRuleWithSingleQuotedProperty() throws Exception
+ {
+ final PlainConfiguration config = writeACLConfig("ACL ALLOW all CREATE EXCHANGE name = \'value\'");
+ final RuleSet rs = config.getConfiguration();
+ assertEquals(1, rs.getRuleCount());
+
+ final Map<Integer, Rule> rules = rs.getAllRules();
+ assertEquals(1, rules.size());
+ final Rule rule = rules.get(0);
+ assertEquals("Rule has unexpected identity", "all", rule.getIdentity());
+ assertEquals("Rule has unexpected operation", Operation.CREATE, rule.getAction().getOperation());
+ assertEquals("Rule has unexpected operation", ObjectType.EXCHANGE, rule.getAction().getObjectType());
+ final ObjectProperties expectedProperties = new ObjectProperties();
+ expectedProperties.setName("value");
+ assertEquals("Rule has unexpected object properties", expectedProperties, rule.getAction().getProperties());
+ }
+
+ /**
+ * Tests interpretation of an acl rule with object properties quoted in double quotes.
+ */
+ public void testValidRuleWithDoubleQuotedProperty() throws Exception
+ {
+ final PlainConfiguration config = writeACLConfig("ACL ALLOW all CREATE EXCHANGE name = \"value\"");
+ final RuleSet rs = config.getConfiguration();
+ assertEquals(1, rs.getRuleCount());
+
+ final Map<Integer, Rule> rules = rs.getAllRules();
+ assertEquals(1, rules.size());
+ final Rule rule = rules.get(0);
+ assertEquals("Rule has unexpected identity", "all", rule.getIdentity());
+ assertEquals("Rule has unexpected operation", Operation.CREATE, rule.getAction().getOperation());
+ assertEquals("Rule has unexpected operation", ObjectType.EXCHANGE, rule.getAction().getObjectType());
+ final ObjectProperties expectedProperties = new ObjectProperties();
+ expectedProperties.setName("value");
+ assertEquals("Rule has unexpected object properties", expectedProperties, rule.getAction().getProperties());
+ }
+
+ /**
+ * Tests interpretation of an acl rule with many object properties.
+ */
+ public void testValidRuleWithManyProperties() throws Exception
+ {
+ final PlainConfiguration config = writeACLConfig("ACL ALLOW admin DELETE QUEUE name=name1 owner = owner1");
+ final RuleSet rs = config.getConfiguration();
+ assertEquals(1, rs.getRuleCount());
+
+ final Map<Integer, Rule> rules = rs.getAllRules();
+ assertEquals(1, rules.size());
+ final Rule rule = rules.get(0);
+ assertEquals("Rule has unexpected identity", "admin", rule.getIdentity());
+ assertEquals("Rule has unexpected operation", Operation.DELETE, rule.getAction().getOperation());
+ assertEquals("Rule has unexpected operation", ObjectType.QUEUE, rule.getAction().getObjectType());
+ final ObjectProperties expectedProperties = new ObjectProperties();
+ expectedProperties.setName("name1");
+ expectedProperties.put(Property.OWNER, "owner1");
+ assertEquals("Rule has unexpected operation", expectedProperties, rule.getAction().getProperties());
+ }
+
+ /**
+ * Tests interpretation of an acl rule with object properties containing wildcards. Values containing
+ * hashes must be quoted otherwise they are interpreted as comments.
+ */
+ public void testValidRuleWithWildcardProperties() throws Exception
+ {
+ final PlainConfiguration config = writeACLConfig("ACL ALLOW all CREATE EXCHANGE routingKey = \'news.#\'",
+ "ACL ALLOW all CREATE EXCHANGE routingKey = \'news.co.#\'",
+ "ACL ALLOW all CREATE EXCHANGE routingKey = *.co.medellin");
+ final RuleSet rs = config.getConfiguration();
+ assertEquals(3, rs.getRuleCount());
+
+ final Map<Integer, Rule> rules = rs.getAllRules();
+ assertEquals(3, rules.size());
+ final Rule rule1 = rules.get(0);
+ assertEquals("Rule has unexpected identity", "all", rule1.getIdentity());
+ assertEquals("Rule has unexpected operation", Operation.CREATE, rule1.getAction().getOperation());
+ assertEquals("Rule has unexpected operation", ObjectType.EXCHANGE, rule1.getAction().getObjectType());
+ final ObjectProperties expectedProperties1 = new ObjectProperties();
+ expectedProperties1.put(Property.ROUTING_KEY,"news.#");
+ assertEquals("Rule has unexpected object properties", expectedProperties1, rule1.getAction().getProperties());
+
+ final Rule rule2 = rules.get(10);
+ final ObjectProperties expectedProperties2 = new ObjectProperties();
+ expectedProperties2.put(Property.ROUTING_KEY,"news.co.#");
+ assertEquals("Rule has unexpected object properties", expectedProperties2, rule2.getAction().getProperties());
+
+ final Rule rule3 = rules.get(20);
+ final ObjectProperties expectedProperties3 = new ObjectProperties();
+ expectedProperties3.put(Property.ROUTING_KEY,"*.co.medellin");
+ assertEquals("Rule has unexpected object properties", expectedProperties3, rule3.getAction().getProperties());
+ }
+
+ /**
+ * Tests that rules are case insignificant.
+ */
+ public void testMixedCaseRuleInterpretation() throws Exception
+ {
+ final PlainConfiguration config = writeACLConfig("AcL deny-LOG user1 BiND Exchange name=AmQ.dIrect");
+ final RuleSet rs = config.getConfiguration();
+ assertEquals(1, rs.getRuleCount());
+
+ final Map<Integer, Rule> rules = rs.getAllRules();
+ assertEquals(1, rules.size());
+ final Rule rule = rules.get(0);
+ assertEquals("Rule has unexpected identity", "user1", rule.getIdentity());
+ assertEquals("Rule has unexpected operation", Operation.BIND, rule.getAction().getOperation());
+ assertEquals("Rule has unexpected operation", ObjectType.EXCHANGE, rule.getAction().getObjectType());
+ final ObjectProperties expectedProperties = new ObjectProperties("amq.direct");
+ assertEquals("Rule has unexpected object properties", expectedProperties, rule.getAction().getProperties());
+ }
+
+ /**
+ * Tests whitespace is supported. Note that currently the Java implementation permits comments to
+ * be introduced anywhere in the ACL, whereas the C++ supports only whitespace at the beginning of
+ * of line.
+ */
+ public void testCommentsSuppported() throws Exception
+ {
+ final PlainConfiguration config = writeACLConfig("#Comment",
+ "ACL DENY-LOG user1 ACCESS VIRTUALHOST # another comment",
+ " # final comment with leading whitespace");
+ final RuleSet rs = config.getConfiguration();
+ assertEquals(1, rs.getRuleCount());
+
+ final Map<Integer, Rule> rules = rs.getAllRules();
+ assertEquals(1, rules.size());
+ final Rule rule = rules.get(0);
+ assertEquals("Rule has unexpected identity", "user1", rule.getIdentity());
+ assertEquals("Rule has unexpected operation", Operation.ACCESS, rule.getAction().getOperation());
+ assertEquals("Rule has unexpected operation", ObjectType.VIRTUALHOST, rule.getAction().getObjectType());
+ assertEquals("Rule has unexpected object properties", ObjectProperties.EMPTY, rule.getAction().getProperties());
+ }
+
+ /**
+ * Tests interpretation of an acl rule using mixtures of tabs/spaces as token separators.
+ *
+ */
+ public void testWhitespace() throws Exception
+ {
+ final PlainConfiguration config = writeACLConfig("ACL\tDENY-LOG\t\t user1\t \tACCESS VIRTUALHOST");
+ final RuleSet rs = config.getConfiguration();
+ assertEquals(1, rs.getRuleCount());
+
+ final Map<Integer, Rule> rules = rs.getAllRules();
+ assertEquals(1, rules.size());
+ final Rule rule = rules.get(0);
+ assertEquals("Rule has unexpected identity", "user1", rule.getIdentity());
+ assertEquals("Rule has unexpected operation", Operation.ACCESS, rule.getAction().getOperation());
+ assertEquals("Rule has unexpected operation", ObjectType.VIRTUALHOST, rule.getAction().getObjectType());
+ assertEquals("Rule has unexpected object properties", ObjectProperties.EMPTY, rule.getAction().getProperties());
+ }
+
+ /**
+ * Tests interpretation of an acl utilising line continuation.
+ */
+ public void testLineContination() throws Exception
+ {
+ final PlainConfiguration config = writeACLConfig("ACL DENY-LOG user1 \\",
+ "ACCESS VIRTUALHOST");
+ final RuleSet rs = config.getConfiguration();
+ assertEquals(1, rs.getRuleCount());
+
+ final Map<Integer, Rule> rules = rs.getAllRules();
+ assertEquals(1, rules.size());
+ final Rule rule = rules.get(0);
+ assertEquals("Rule has unexpected identity", "user1", rule.getIdentity());
+ assertEquals("Rule has unexpected operation", Operation.ACCESS, rule.getAction().getOperation());
+ assertEquals("Rule has unexpected operation", ObjectType.VIRTUALHOST, rule.getAction().getObjectType());
+ assertEquals("Rule has unexpected object properties", ObjectProperties.EMPTY, rule.getAction().getProperties());
+ }
+
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/RuleSetTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/RuleSetTest.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/RuleSetTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/RuleSetTest.java Thu Mar 1 12:51:40 2012
@@ -69,7 +69,6 @@ public class RuleSetTest extends QpidTes
super.setUp();
_ruleSet = new RuleSet();
- _ruleSet.configure(RuleSet.TRANSITIVE, Boolean.FALSE);
}
@Override
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/MANIFEST.MF
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/MANIFEST.MF?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/MANIFEST.MF (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/MANIFEST.MF Thu Mar 1 12:51:40 2012
@@ -9,7 +9,8 @@ Bundle-Version: 1.0.0
Bundle-Activator: org.apache.qpid.shutdown.Activator
Import-Package: javax.management;resolution:=optional,
org.apache.log4j,
- org.osgi.framework
+ org.osgi.framework,
+ org.apache.qpid.server.management
Bundle-RequiredExecutionEnvironment: J2SE-1.5
Bundle-ActivationPolicy: lazy
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/build.xml?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/build.xml (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/build.xml Thu Mar 1 12:51:40 2012
@@ -20,7 +20,7 @@
-->
<project name="AMQ Broker Shutdown Plugin" default="build">
- <property name="module.depends" value="common broker broker-plugins"/>
+ <property name="module.depends" value="common broker management/common broker-plugins"/>
<property name="module.test.depends" value="test broker/test management/common client systests"/>
<property name="module.manifest" value="MANIFEST.MF"/>
<property name="module.plugin" value="true"/>
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Activator.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Activator.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Activator.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Activator.java Thu Mar 1 12:51:40 2012
@@ -19,11 +19,6 @@
*/
package org.apache.qpid.shutdown;
-import java.lang.management.ManagementFactory;
-
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
import org.apache.log4j.Logger;
import org.osgi.framework.BundleActivator;
@@ -33,20 +28,17 @@ public class Activator implements Bundle
{
private static final Logger _logger = Logger.getLogger(Activator.class);
- private static final String SHUTDOWN_MBEAN_NAME = "org.apache.qpid:type=ShutdownMBean";
+ private Shutdown _shutdown = null;
/** @see org.osgi.framework.BundleActivator#start(org.osgi.framework.BundleContext) */
public void start(BundleContext ctx) throws Exception {
- Shutdown shutdown = new Shutdown();
+ _shutdown = new Shutdown();
if (ctx != null)
{
- ctx.registerService(ShutdownMBean.class.getName(), shutdown, null);
+ ctx.registerService(ShutdownMBean.class.getName(), _shutdown, null);
}
- // MBean registration
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- ObjectName name = new ObjectName(SHUTDOWN_MBEAN_NAME);
- mbs.registerMBean(shutdown, name);
+ _shutdown.register();
_logger.info("Shutdown plugin MBean registered");
}
@@ -54,16 +46,10 @@ public class Activator implements Bundle
/** @see org.osgi.framework.BundleActivator#stop(org.osgi.framework.BundleContext) */
public void stop(BundleContext ctx) throws Exception
{
- // Unregister MBean
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- ObjectName name = new ObjectName(SHUTDOWN_MBEAN_NAME);
- try
- {
- mbs.unregisterMBean(name);
- }
- catch (InstanceNotFoundException e)
+ if (_shutdown != null)
{
- //ignore
+ _shutdown.unregister();
+ _shutdown = null;
}
_logger.info("Shutdown plugin MBean unregistered");
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Shutdown.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Shutdown.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Shutdown.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/Shutdown.java Thu Mar 1 12:51:40 2012
@@ -27,21 +27,30 @@ import java.util.concurrent.ScheduledExe
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import javax.management.NotCompliantMBeanException;
+
import org.apache.log4j.Logger;
+import org.apache.qpid.server.management.DefaultManagedObject;
/**
* Implementation of the JMX broker shutdown plugin.
*/
-public class Shutdown implements ShutdownMBean
+public class Shutdown extends DefaultManagedObject implements ShutdownMBean
{
+
private static final Logger _logger = Logger.getLogger(Shutdown.class);
- private static final String FORMAT = "yyyyy/MM/dd hh:mm:ss";
+ private static final String FORMAT = "yyyy/MM/dd HH:mm:ss";
private static final int THREAD_COUNT = 1;
private static final ScheduledExecutorService EXECUTOR = new ScheduledThreadPoolExecutor(THREAD_COUNT);
private final Runnable _shutdown = new SystemExiter();
+ public Shutdown() throws NotCompliantMBeanException
+ {
+ super(ShutdownMBean.class, ShutdownMBean.TYPE);
+ }
+
/** @see ShutdownMBean#shutdown() */
public void shutdown()
{
@@ -50,14 +59,22 @@ public class Shutdown implements Shutdow
}
/** @see ShutdownMBean#shutdown(long) */
- public void shutdown(long delay)
+ public void shutdown(final long delay)
{
- _logger.info("Scheduled broker shutdown after " + delay + "ms");
- shutdownBroker(delay);
+ if (delay < 0)
+ {
+ _logger.info("Shutting down at user's request");
+ shutdownBroker(0);
+ }
+ else
+ {
+ _logger.info("Scheduled broker shutdown after " + delay + "ms");
+ shutdownBroker(delay);
+ }
}
/** @see ShutdownMBean#shutdownAt(String) */
- public void shutdownAt(String when)
+ public void shutdownAt(final String when)
{
Date date;
DateFormat df = new SimpleDateFormat(FORMAT);
@@ -101,4 +118,13 @@ public class Shutdown implements Shutdow
System.exit(0);
}
}
+
+ /**
+ * @see org.apache.qpid.server.management.ManagedObject#getObjectInstanceName()
+ */
+ @Override
+ public String getObjectInstanceName()
+ {
+ return "Shutdown";
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/ShutdownMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/ShutdownMBean.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/ShutdownMBean.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/ShutdownMBean.java Thu Mar 1 12:51:40 2012
@@ -19,6 +19,9 @@
*/
package org.apache.qpid.shutdown;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParameter;
+
/**
* Shutdown plugin JMX MBean interface.
*
@@ -26,9 +29,12 @@ package org.apache.qpid.shutdown;
*/
public interface ShutdownMBean
{
+ static final String TYPE = "Shutdown";
+
/**
* Broker will be shut down immediately.
*/
+ @MBeanOperation(name="shutdown", description="Shut down immediately")
public void shutdown();
/**
@@ -36,12 +42,14 @@ public interface ShutdownMBean
*
* @param delay the number of ms to wait
*/
- public void shutdown(long delay);
+ @MBeanOperation(name="shutdown", description="Shutdown after the specified delay (ms)")
+ public void shutdown(@MBeanOperationParameter(name="when", description="delay (ms)")long delay);
/**
* Broker will be shutdown at the specified date and time.
*
* @param when the date and time to shutdown
*/
- public void shutdownAt(String when);
+ @MBeanOperation(name="shutdownAt", description="Shutdown at the specified date and time (yyyy/MM/dd HH:mm:ss)")
+ public void shutdownAt(@MBeanOperationParameter(name="when", description="shutdown date/time (yyyy/MM/dd HH:mm:ss)")String when);
}
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/bin/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 1 12:51:40 2012
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/broker/bin:795950-829653
/qpid/branches/java-network-refactor/qpid/java/broker/bin:805429-821809
/qpid/branches/qpid-2935/qpid/java/broker/bin:1061302-1072333
-/qpid/trunk/qpid/java/broker/bin:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1157765,1160415-1162726,1162729-1166086,1166089-1200000
+/qpid/trunk/qpid/java/broker/bin:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1157765,1160415-1162726,1162729-1166086,1166089-1225000
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/build.xml?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/build.xml (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/build.xml Thu Mar 1 12:51:40 2012
@@ -72,14 +72,26 @@
<fixcrlf srcdir="${module.release}/bin" fixlast="true" eol="dos" includes="*.bat"/>
</target>
- <target name="release-bin-other" description="copy broker-plugins into module release">
+ <target name="release-bin-other" depends="release-bin-other-bdbstore" description="copy broker-plugins into module release">
<copy todir="${module.release}/lib/plugins" failonerror="true">
<fileset dir="${build.lib}/plugins"/>
</copy>
- <!--copy optional bdbstore module if it exists -->
- <copy todir="${module.release}/lib/" failonerror="false">
+ </target>
+
+ <target name="release-bin-other-bdbstore" depends="check-bdbstore-requested" if="bdbstore-requested"
+ description="copy bdbstore items into module release">
+ <copy todir="${module.release}/lib/" failonerror="true">
<fileset file="${build.lib}/${project.name}-bdbstore-${project.version}.jar"/>
</copy>
+ <copy todir="${module.release}/bin" failonerror="true" flatten="true">
+ <fileset dir="${basedir}/../bdbstore/bin"/>
+ </copy>
+ </target>
+
+ <target name="check-bdbstore-requested">
+ <condition property="bdbstore-requested">
+ <contains string="${modules.opt}" substring="bdbstore"/>
+ </condition>
</target>
<target name="release-bin" depends="release-bin-tasks"/>
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/etc/config.xml
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/etc/config.xml?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/etc/config.xml (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/etc/config.xml Thu Mar 1 12:51:40 2012
@@ -80,7 +80,12 @@
</principal-database>
</pd-auth-manager>
- <allow-all />
+ <!-- By default, all authenticated users have permissions to perform all actions -->
+
+ <!-- ACL Example
+ This example illustrates securing the both Management (JMX) and Messaging.
+ <acl>${conf}/broker_example.acl</acl>
+ -->
<msg-auth>false</msg-auth>
</security>
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/etc/md5passwd
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/etc/md5passwd?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/etc/md5passwd (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/etc/md5passwd Thu Mar 1 12:51:40 2012
@@ -17,5 +17,6 @@
# under the License.
#
guest:CE4DQ6BIb/BVMN9scFyLtA==
+client:CE4DQ6BIb/BVMN9scFyLtA==
+server:CE4DQ6BIb/BVMN9scFyLtA==
admin:ISMvKXpXpadDiUoOSoAfww==
-user:aBzonUodYLhwSa8s9A10sA==
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Thu Mar 1 12:51:40 2012
@@ -20,8 +20,8 @@ package org.apache.qpid.server;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
+import java.util.Map;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -30,6 +30,7 @@ import javax.management.ObjectName;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.management.common.mbeans.ManagedBroker;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
@@ -60,6 +61,7 @@ public class AMQBrokerManagerMBean exten
private final QueueRegistry _queueRegistry;
private final ExchangeRegistry _exchangeRegistry;
private final ExchangeFactory _exchangeFactory;
+ private final Exchange _defaultExchange;
private final DurableConfigurationStore _durableConfig;
private final VirtualHostImpl.VirtualHostMBean _virtualHostMBean;
@@ -74,6 +76,7 @@ public class AMQBrokerManagerMBean exten
_queueRegistry = virtualHost.getQueueRegistry();
_exchangeRegistry = virtualHost.getExchangeRegistry();
+ _defaultExchange = _exchangeRegistry.getDefaultExchange();
_durableConfig = virtualHost.getDurableConfigurationStore();
_exchangeFactory = virtualHost.getExchangeFactory();
}
@@ -241,7 +244,13 @@ public class AMQBrokerManagerMBean exten
*/
public void createNewQueue(String queueName, String owner, boolean durable) throws JMException, MBeanException
{
- AMQQueue queue = _queueRegistry.getQueue(new AMQShortString(queueName));
+ createNewQueue(queueName, owner, durable, null);
+ }
+
+ public void createNewQueue(String queueName, String owner, boolean durable, Map<String,Object> arguments) throws JMException
+ {
+ final AMQShortString queueNameAsAMQShortString = new AMQShortString(queueName);
+ AMQQueue queue = _queueRegistry.getQueue(queueNameAsAMQShortString);
if (queue != null)
{
throw new JMException("The queue \"" + queueName + "\" already exists.");
@@ -256,13 +265,21 @@ public class AMQBrokerManagerMBean exten
ownerShortString = new AMQShortString(owner);
}
- queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), durable, ownerShortString, false, false, getVirtualHost(), null);
+ FieldTable args = null;
+ if(arguments != null)
+ {
+ args = FieldTable.convertToFieldTable(arguments);
+ }
+ final VirtualHost virtualHost = getVirtualHost();
+
+ queue = AMQQueueFactory.createAMQQueueImpl(queueNameAsAMQShortString, durable, ownerShortString,
+ false, false, getVirtualHost(), args);
if (queue.isDurable() && !queue.isAutoDelete())
{
- _durableConfig.createQueue(queue);
+ _durableConfig.createQueue(queue, args);
}
- _queueRegistry.registerQueue(queue);
+ virtualHost.getBindingFactory().addBinding(queueName, queue, _defaultExchange, null);
}
catch (AMQException ex)
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Mar 1 12:51:40 2012
@@ -22,7 +22,6 @@ package org.apache.qpid.server;
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQMethodBody;
@@ -53,6 +52,7 @@ import org.apache.qpid.server.logging.me
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
@@ -63,6 +63,7 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.InboundMessageAdapter;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -693,6 +694,31 @@ public class AMQChannel implements Sessi
}
+ public boolean isMaxDeliveryCountEnabled(final long deliveryTag)
+ {
+ final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
+ if (queueEntry != null)
+ {
+ final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
+ return maximumDeliveryCount > 0;
+ }
+
+ return false;
+ }
+
+ public boolean isDeliveredTooManyTimes(final long deliveryTag)
+ {
+ final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
+ if (queueEntry != null)
+ {
+ final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
+ final int numDeliveries = queueEntry.getDeliveryCount();
+ return maximumDeliveryCount != 0 && numDeliveries >= maximumDeliveryCount;
+ }
+
+ return false;
+ }
+
/**
* Called to resend all outstanding unacknowledged messages to this same channel.
*
@@ -740,9 +766,9 @@ public class AMQChannel implements Sessi
QueueEntry message = entry.getValue();
long deliveryTag = entry.getKey();
+ //Amend the delivery counter as the client hasn't seen these messages yet.
+ message.decrementDeliveryCount();
-
- ServerMessage msg = message.getMessage();
AMQQueue queue = message.getQueue();
// Our Java Client will always suspend the channel when resending!
@@ -800,6 +826,10 @@ public class AMQChannel implements Sessi
{
QueueEntry message = entry.getValue();
long deliveryTag = entry.getKey();
+
+ //Amend the delivery counter as the client hasn't seen these messages yet.
+ message.decrementDeliveryCount();
+
_unacknowledgedMessageMap.remove(deliveryTag);
message.setRedelivered();
@@ -1060,6 +1090,7 @@ public class AMQChannel implements Sessi
getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(),
deliveryTag,
((SubscriptionImpl)sub).getConsumerTag());
+ entry.incrementDeliveryCount();
}
};
@@ -1248,7 +1279,6 @@ public class AMQChannel implements Sessi
{
private final Collection<QueueEntry> _ackedMessages;
-
public MessageAcknowledgeAction(Collection<QueueEntry> ackedMessages)
{
_ackedMessages = ackedMessages;
@@ -1481,4 +1511,54 @@ public class AMQChannel implements Sessi
}
}
}
+
+ public void deadLetter(long deliveryTag) throws AMQException
+ {
+ final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
+ final QueueEntry rejectedQueueEntry = unackedMap.get(deliveryTag);
+
+ if (rejectedQueueEntry == null)
+ {
+ _logger.warn("No message found, unable to DLQ delivery tag: " + deliveryTag);
+ return;
+ }
+ else
+ {
+ final ServerMessage msg = rejectedQueueEntry.getMessage();
+
+ final AMQQueue queue = rejectedQueueEntry.getQueue();
+
+ final Exchange altExchange = queue.getAlternateExchange();
+ unackedMap.remove(deliveryTag);
+
+ if (altExchange == null)
+ {
+ _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
+ _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
+ rejectedQueueEntry.discard();
+ return;
+ }
+
+ final InboundMessage m = new InboundMessageAdapter(rejectedQueueEntry);
+
+ final ArrayList<? extends BaseQueue> destinationQueues = altExchange.route(m);
+
+ if (destinationQueues == null || destinationQueues.isEmpty())
+ {
+ _logger.debug("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag);
+ _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
+ rejectedQueueEntry.discard();
+ return;
+ }
+
+ rejectedQueueEntry.routeToAlternate();
+
+ //output operational logging for each delivery post commit
+ for (final BaseQueue destinationQueue : destinationQueues)
+ {
+ _actor.message(_logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getNameShortString().asString()));
+ }
+
+ }
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java Thu Mar 1 12:51:40 2012
@@ -170,11 +170,17 @@ public class BindingFactory
{
arguments = Collections.emptyMap();
}
-
- //Perform ACLs
- if (!getVirtualHost().getSecurityManager().authoriseBind(exchange, queue, new AMQShortString(bindingKey)))
+
+ // The default exchange bindings must reflect the existence of queues, allow
+ // all operations on it to succeed. It is up to the broker to prevent illegal
+ // attempts at binding to this exchange, not the ACLs.
+ if(exchange != _defaultExchange)
{
- throw new AMQSecurityException("Permission denied: binding " + bindingKey);
+ //Perform ACLs
+ if (!getVirtualHost().getSecurityManager().authoriseBind(exchange, queue, new AMQShortString(bindingKey)))
+ {
+ throw new AMQSecurityException("Permission denied: binding " + bindingKey);
+ }
}
BindingImpl b = new BindingImpl(bindingKey,queue,exchange,arguments);
@@ -238,10 +244,16 @@ public class BindingFactory
arguments = Collections.emptyMap();
}
- // Check access
- if (!getVirtualHost().getSecurityManager().authoriseUnbind(exchange, new AMQShortString(bindingKey), queue))
+ // The default exchange bindings must reflect the existence of queues, allow
+ // all operations on it to succeed. It is up to the broker to prevent illegal
+ // attempts at binding to this exchange, not the ACLs.
+ if(exchange != _defaultExchange)
{
- throw new AMQSecurityException("Permission denied: binding " + bindingKey);
+ // Check access
+ if (!getVirtualHost().getSecurityManager().authoriseUnbind(exchange, new AMQShortString(bindingKey), queue))
+ {
+ throw new AMQSecurityException("Permission denied: unbinding " + bindingKey);
+ }
}
BindingImpl b = _bindings.remove(new BindingImpl(bindingKey,queue,exchange,arguments));
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java Thu Mar 1 12:51:40 2012
@@ -62,7 +62,10 @@ public class QueueConfiguration extends
"capacity",
"flowResumeCapacity",
"lvq",
- "lvqKey"
+ "lvqKey",
+ "sortKey",
+ "maximumDeliveryCount",
+ "deadLetterQueues"
};
}
@@ -167,11 +170,30 @@ public class QueueConfiguration extends
return getStringValue("lvqKey", null);
}
+
public boolean isTopic()
{
return getBooleanValue("topic");
}
+ public String getQueueSortKey()
+ {
+ return getStringValue("sortKey", null);
+ }
+
+ public int getMaxDeliveryCount()
+ {
+ return getIntValue("maximumDeliveryCount", _vHostConfig.getMaxDeliveryCount());
+ }
+
+ /**
+ * Check if dead letter queue delivery is enabled, deferring to the virtualhost configuration if not set.
+ */
+ public boolean isDeadLetterQueueEnabled()
+ {
+ return getBooleanValue("deadLetterQueues", _vHostConfig.isDeadLetterQueueEnabled());
+ }
+
public static class QueueConfig extends ConfigurationPlugin
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java Thu Mar 1 12:51:40 2012
@@ -40,6 +40,8 @@ import org.apache.commons.configuration.
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.signal.SignalHandlerTask;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -805,4 +807,39 @@ public class ServerConfiguration extends
final List<String> disabledFeatures = getListValue("disabledFeatures", Collections.emptyList());
return disabledFeatures;
}
+
+ public boolean getManagementRightsInferAllAccess()
+ {
+ return getBooleanValue("management.managementRightsInferAllAccess", true);
+ }
+
+ public int getMaxDeliveryCount()
+ {
+ return getConfig().getInt("maximumDeliveryCount", 0);
+ }
+
+ /**
+ * Check if dead letter queue delivery is enabled, defaults to disabled if not set.
+ */
+ public boolean isDeadLetterQueueEnabled()
+ {
+ return getConfig().getBoolean("deadLetterQueues", false);
+ }
+
+ /**
+ * String to affix to end of queue name when generating an alternate exchange for DLQ purposes.
+ */
+ public String getDeadLetterExchangeSuffix()
+ {
+ return getConfig().getString("deadLetterExchangeSuffix", DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+ }
+
+ /**
+ * String to affix to end of queue name when generating a queue for DLQ purposes.
+ */
+ public String getDeadLetterQueueSuffix()
+ {
+ return getConfig().getString("deadLetterQueueSuffix", AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ }
+
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Thu Mar 1 12:51:40 2012
@@ -347,4 +347,18 @@ public class VirtualHostConfiguration ex
{
return getLongValue("transactionTimeout.idleClose", 0L);
}
+
+ public int getMaxDeliveryCount()
+ {
+ return getIntValue("queues.maximumDeliveryCount", ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount());
+ }
+
+ /**
+ * Check if dead letter queue delivery is enabled, deferring to the broker configuration if not set.
+ */
+ public boolean isDeadLetterQueueEnabled()
+ {
+ return getBooleanValue("queues.deadLetterQueues", ApplicationRegistry.getInstance().getConfiguration().isDeadLetterQueueEnabled());
+ }
+
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java Thu Mar 1 12:51:40 2012
@@ -87,6 +87,19 @@ public class SlowConsumerDetectionQueueC
@Override
public void validateConfiguration() throws ConfigurationException
{
+ PluginManager pluginManager;
+ try
+ {
+ pluginManager = ApplicationRegistry.getInstance().getPluginManager();
+ }
+ catch (IllegalStateException ise)
+ {
+ // We see this happen during shutdown due to asynchronous reconfig performed IO threads
+ // running at the same time as the shutdown handler.
+ _policyPlugin = null;
+ return;
+ }
+
if (!containsPositiveLong("messageAge") &&
!containsPositiveLong("depth") &&
!containsPositiveLong("messageCount"))
@@ -96,8 +109,6 @@ public class SlowConsumerDetectionQueueC
}
SlowConsumerDetectionPolicyConfiguration policyConfig = getConfiguration(SlowConsumerDetectionPolicyConfiguration.class.getName());
-
- PluginManager pluginManager = ApplicationRegistry.getInstance().getPluginManager();
Map<String, SlowConsumerPolicyPluginFactory> factories = pluginManager.getSlowConsumerPlugins();
if (policyConfig == null)
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java Thu Mar 1 12:51:40 2012
@@ -40,6 +40,7 @@ import org.apache.qpid.server.virtualhos
public class DefaultExchangeFactory implements ExchangeFactory
{
private static final Logger _logger = Logger.getLogger(DefaultExchangeFactory.class);
+ public static final String DEFAULT_DLE_NAME_SUFFIX = "_DLE";
private Map<AMQShortString, ExchangeType<? extends Exchange>> _exchangeClassMap = new HashMap<AMQShortString, ExchangeType<? extends Exchange>>();
private final VirtualHost _host;
@@ -122,7 +123,7 @@ public class DefaultExchangeFactory impl
if (exchangeType == null)
{
_logger.error("No such custom exchange class found: \""+String.valueOf(className)+"\"");
- return;
+ continue;
}
Class<? extends ExchangeType> exchangeTypeClass = exchangeType.getClass();
ExchangeType<? extends ExchangeType> type = exchangeTypeClass.newInstance();
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Thu Mar 1 12:51:40 2012
@@ -27,7 +27,9 @@ import java.util.concurrent.ConcurrentMa
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -87,12 +89,22 @@ public class DefaultExchangeRegistry imp
public void unregisterExchange(AMQShortString name, boolean inUse) throws AMQException
{
- // Check access
- if (!_host.getSecurityManager().authoriseDelete(_exchangeMap.get(name)))
+ final Exchange exchange = _exchangeMap.get(name);
+ if (exchange == null)
+ {
+ throw new AMQException(AMQConstant.NOT_FOUND, "Unknown exchange " + name, null);
+ }
+
+ if (ExchangeDefaults.DEFAULT_EXCHANGE_NAME.equals(name))
+ {
+ throw new AMQException(AMQConstant.NOT_ALLOWED, "Cannot unregister the default exchange", null);
+ }
+
+ if (!_host.getSecurityManager().authoriseDelete(exchange))
{
throw new AMQSecurityException();
}
-
+
// TODO: check inUse argument
Exchange e = _exchangeMap.remove(name);
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Thu Mar 1 12:51:40 2012
@@ -117,7 +117,7 @@ public class FanoutExchange extends Abst
public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
- return _queues.contains(queue);
+ return _queues.containsKey(queue);
}
public boolean isBound(AMQShortString routingKey)
@@ -129,7 +129,7 @@ public class FanoutExchange extends Abst
public boolean isBound(AMQQueue queue)
{
- return _queues.contains(queue);
+ return _queues.containsKey(queue);
}
public boolean hasBindings()
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeMBean.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeMBean.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeMBean.java Thu Mar 1 12:51:40 2012
@@ -20,12 +20,21 @@
*/
package org.apache.qpid.server.exchange;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.ManagementActor;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import javax.management.JMException;
+import javax.management.MBeanException;
import javax.management.openmbean.*;
+
+import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
@@ -94,5 +103,48 @@ final class HeadersExchangeMBean extends
return bindingList;
}
+ @Override
+ public void createNewBinding(String queueName, String binding) throws JMException
+ {
+ VirtualHost vhost = getExchange().getVirtualHost();
+ AMQQueue queue = vhost.getQueueRegistry().getQueue(new AMQShortString(queueName));
+ if (queue == null)
+ {
+ throw new JMException("Queue \"" + queueName + "\" is not registered with the virtualhost.");
+ }
+
+ CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger()));
+
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ final String[] bindings = binding.split(",");
+ for (int i = 0; i < bindings.length; i++)
+ {
+ final String[] keyAndValue = bindings[i].split("=");
+ if (keyAndValue == null || keyAndValue.length == 0 || keyAndValue.length > 2 || keyAndValue[0].length() == 0)
+ {
+ throw new JMException("Format for headers binding should be \"<attribute1>=<value1>,<attribute2>=<value2>\" ");
+ }
+
+ if(keyAndValue.length == 1)
+ {
+ //no value was given, only a key. Use an empty value to signal match on key presence alone
+ arguments.put(keyAndValue[0], "");
+ }
+ else
+ {
+ arguments.put(keyAndValue[0], keyAndValue[1]);
+ }
+ }
+ try
+ {
+ vhost.getBindingFactory().addBinding(binding,queue,getExchange(),arguments);
+ }
+ catch (AMQException ex)
+ {
+ JMException jme = new JMException(ex.toString());
+ throw new MBeanException(jme, "Error creating new binding " + binding);
+ }
+ CurrentActor.remove();
+ }
} // End of MBean class
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java Thu Mar 1 12:51:40 2012
@@ -1,6 +1,5 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
import java.util.concurrent.atomic.AtomicLong;
@@ -59,9 +58,8 @@ public class BytesOnlyCreditManager exte
return _bytesCredit.get() > 0L;
}
- public boolean useCreditForMessage(ServerMessage msg)
+ public boolean useCreditForMessage(long msgSize)
{
- final long msgSize = msg.getSize();
if(hasCredit())
{
if(_bytesCredit.addAndGet(-msgSize) >= 0)
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java Thu Mar 1 12:51:40 2012
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
{
@@ -118,7 +117,7 @@ public class CreditCreditManager extends
return (_bytesCredit != 0L && _messageCredit != 0L);
}
- public synchronized boolean useCreditForMessage(final ServerMessage msg)
+ public synchronized boolean useCreditForMessage(long msgSize)
{
if(_messageCredit >= 0L)
{
@@ -130,10 +129,10 @@ public class CreditCreditManager extends
return true;
}
- else if(msg.getSize() <= _bytesCredit)
+ else if(msgSize <= _bytesCredit)
{
_messageCredit--;
- _bytesCredit -= msg.getSize();
+ _bytesCredit -= msgSize;
return true;
}
@@ -151,9 +150,9 @@ public class CreditCreditManager extends
}
else if(_bytesCredit >= 0L)
{
- if(msg.getSize() <= _bytesCredit)
+ if(msgSize <= _bytesCredit)
{
- _bytesCredit -= msg.getSize();
+ _bytesCredit -= msgSize;
return true;
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java Thu Mar 1 12:51:40 2012
@@ -1,6 +1,5 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
/*
*
@@ -41,6 +40,6 @@ public interface FlowCreditManager
public boolean hasCredit();
- public boolean useCreditForMessage(ServerMessage msg);
+ public boolean useCreditForMessage(long msgSize);
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java Thu Mar 1 12:51:40 2012
@@ -1,6 +1,5 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
/*
*
@@ -47,7 +46,7 @@ public class LimitlessCreditManager exte
return true;
}
- public boolean useCreditForMessage(ServerMessage msg)
+ public boolean useCreditForMessage(long msgSize)
{
return true;
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java Thu Mar 1 12:51:40 2012
@@ -1,6 +1,5 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
/*
*
@@ -62,7 +61,7 @@ public class MessageAndBytesCreditManage
return (_messageCredit > 0L) && ( _bytesCredit > 0L );
}
- public synchronized boolean useCreditForMessage(ServerMessage msg)
+ public synchronized boolean useCreditForMessage(final long msgSize)
{
if(_messageCredit == 0L)
{
@@ -71,7 +70,6 @@ public class MessageAndBytesCreditManage
}
else
{
- final long msgSize = msg.getSize();
if(msgSize > _bytesCredit)
{
setSuspended(true);
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java Thu Mar 1 12:51:40 2012
@@ -1,6 +1,5 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
import java.util.concurrent.atomic.AtomicLong;
@@ -61,7 +60,7 @@ public class MessageOnlyCreditManager ex
return _messageCredit.get() > 0L;
}
- public boolean useCreditForMessage(ServerMessage msg)
+ public boolean useCreditForMessage(long msgSize)
{
if(hasCredit())
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java Thu Mar 1 12:51:40 2012
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager
{
@@ -133,7 +132,7 @@ public class Pre0_10CreditManager extend
&& (_messageCreditLimit == 0L || _messageCredit > 0);
}
- public synchronized boolean useCreditForMessage(final ServerMessage msg)
+ public synchronized boolean useCreditForMessage(final long msgSize)
{
if(_messageCreditLimit != 0L)
{
@@ -147,10 +146,10 @@ public class Pre0_10CreditManager extend
}
else
{
- if((_bytesCredit >= msg.getSize()) || (_bytesCredit == _bytesCreditLimit))
+ if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit))
{
_messageCredit--;
- _bytesCredit -= msg.getSize();
+ _bytesCredit -= msgSize;
return true;
}
@@ -176,9 +175,9 @@ public class Pre0_10CreditManager extend
}
else
{
- if((_bytesCredit >= msg.getSize()) || (_bytesCredit == _bytesCreditLimit))
+ if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit))
{
- _bytesCredit -= msg.getSize();
+ _bytesCredit -= msgSize;
return true;
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java Thu Mar 1 12:51:40 2012
@@ -20,10 +20,12 @@
*/
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
+import org.apache.log4j.Logger;
public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
{
+ private static final Logger LOGGER = Logger.getLogger(WindowCreditManager.class);
+
private volatile long _bytesCreditLimit;
private volatile long _messageCreditLimit;
@@ -43,6 +45,15 @@ public class WindowCreditManager extends
}
+ public long getBytesCreditLimit()
+ {
+ return _bytesCreditLimit;
+ }
+
+ public long getMessageCreditLimit()
+ {
+ return _messageCreditLimit;
+ }
public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit)
{
@@ -70,31 +81,30 @@ public class WindowCreditManager extends
public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
{
+ _messageUsed -= messageCredit;
+ if(_messageUsed < 0L)
+ {
+ LOGGER.error("Message credit used value was negative: "+ _messageUsed);
+ _messageUsed = 0;
+ }
+
boolean notifyIncrease = true;
+
if(_messageCreditLimit > 0L)
{
notifyIncrease = (_messageUsed != _messageCreditLimit);
- _messageUsed -= messageCredit;
-
- //TODO log warning
- if(_messageUsed < 0L)
- {
- _messageUsed = 0;
- }
}
-
+ _bytesUsed -= bytesCredit;
+ if(_bytesUsed < 0L)
+ {
+ LOGGER.error("Bytes credit used value was negative: "+ _messageUsed);
+ _bytesUsed = 0;
+ }
if(_bytesCreditLimit > 0L)
{
notifyIncrease = notifyIncrease && bytesCredit>0;
- _bytesUsed -= bytesCredit;
-
- //TODO log warning
- if(_bytesUsed < 0L)
- {
- _bytesUsed = 0;
- }
if(notifyIncrease)
{
@@ -102,10 +112,7 @@ public class WindowCreditManager extends
}
}
-
-
setSuspended(!hasCredit());
-
}
@@ -116,7 +123,7 @@ public class WindowCreditManager extends
&& (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed);
}
- public synchronized boolean useCreditForMessage(final ServerMessage msg)
+ public synchronized boolean useCreditForMessage(final long msgSize)
{
if(_messageCreditLimit >= 0L)
{
@@ -128,10 +135,10 @@ public class WindowCreditManager extends
return true;
}
- else if(_bytesUsed + msg.getSize() <= _bytesCreditLimit)
+ else if(_bytesUsed + msgSize <= _bytesCreditLimit)
{
_messageUsed++;
- _bytesUsed += msg.getSize();
+ _bytesUsed += msgSize;
return true;
}
@@ -149,9 +156,9 @@ public class WindowCreditManager extends
}
else if(_bytesCreditLimit >= 0L)
{
- if(_bytesUsed + msg.getSize() <= _bytesCreditLimit)
+ if(_bytesUsed + msgSize <= _bytesCreditLimit)
{
- _bytesUsed += msg.getSize();
+ _bytesUsed += msgSize;
return true;
}
@@ -169,18 +176,6 @@ public class WindowCreditManager extends
}
- public void stop()
- {
- if(_bytesCreditLimit > 0)
- {
- _bytesCreditLimit = 0;
- }
- if(_messageCreditLimit > 0)
- {
- _messageCreditLimit = 0;
- }
-
- }
public synchronized void addCredit(long count, long bytes)
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java Thu Mar 1 12:51:40 2012
@@ -127,16 +127,15 @@ public class BasicGetMethodHandler imple
final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod()
{
- int _msg;
-
public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
throws AMQException
{
- singleMessageCredit.useCreditForMessage(entry.getMessage());
+ singleMessageCredit.useCreditForMessage(entry.getMessage().getSize());
if(entry.getMessage() instanceof AMQMessage)
{
session.getProtocolOutputConverter().writeGetOk(entry, channel.getChannelId(),
deliveryTag, queue.getMessageCount());
+ entry.incrementDeliveryCount();
}
else
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java Thu Mar 1 12:51:40 2012
@@ -59,7 +59,6 @@ public class BasicRejectMethodHandler im
{
_logger.debug("Rejecting:" + body.getDeliveryTag() +
": Requeue:" + body.getRequeue() +
- //": Resend:" + evt.getMethod().resend +
" on channel:" + channel.debugIdentity());
}
@@ -70,26 +69,23 @@ public class BasicRejectMethodHandler im
if (message == null)
{
_logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
-// throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known");
}
else
{
if (message.isQueueDeleted())
{
- _logger.warn("Message's Queue as already been purged, unable to Reject. " +
- "Dropping message should use Dead Letter Queue");
+ _logger.warn("Message's Queue has already been purged, dropping message");
message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
if(message != null)
{
message.discard();
}
- //sendtoDeadLetterQueue(msg)
return;
}
if (message.getMessage() == null)
{
- _logger.warn("Message as already been purged, unable to Reject.");
+ _logger.warn("Message has already been purged, unable to Reject.");
return;
}
@@ -98,27 +94,44 @@ public class BasicRejectMethodHandler im
{
_logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() +
": Requeue:" + body.getRequeue() +
- //": Resend:" + evt.getMethod().resend +
" on channel:" + channel.debugIdentity());
}
- // If we haven't requested message to be resent to this consumer then reject it from ever getting it.
- //if (!evt.getMethod().resend)
- {
- message.reject();
- }
+ message.reject();
if (body.getRequeue())
{
channel.requeue(deliveryTag);
+
+ //this requeue represents a message rejected from the pre-dispatch queue
+ //therefore we need to amend the delivery counter.
+ message.decrementDeliveryCount();
}
else
{
- _logger.warn("Dropping message as requeue not required and there is no dead letter queue");
- message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
- //sendtoDeadLetterQueue(AMQMessage message)
-// message.queue = channel.getDefaultDeadLetterQueue();
-// channel.requeue(deliveryTag);
+ final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag);
+ _logger.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag);
+ if (maxDeliveryCountEnabled)
+ {
+ final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag);
+ _logger.debug("deliveredTooManyTimes: " + deliveredTooManyTimes + " deliveryTag " + deliveryTag);
+ if (deliveredTooManyTimes)
+ {
+ channel.deadLetter(body.getDeliveryTag());
+ }
+ else
+ {
+ //this requeue represents a message rejected because of a recover/rollback that we
+ //are not ready to DLQ. We rely on the reject command to resend from the unacked map
+ //and therefore need to increment the delivery counter so we cancel out the effect
+ //of the AMQChannel#resend() decrement.
+ message.incrementDeliveryCount();
+ }
+ }
+ else
+ {
+ channel.deadLetter(body.getDeliveryTag());
+ }
}
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java Thu Mar 1 12:51:40 2012
@@ -25,8 +25,10 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.QueueUnbindBody;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
@@ -62,7 +64,7 @@ public class QueueUnbindHandler implemen
final AMQQueue queue;
- final AMQShortString routingKey;
+ final AMQShortString routingKey;
if (body.getQueue() == null)
{
@@ -114,10 +116,21 @@ public class QueueUnbindHandler implemen
_log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
}
- MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createQueueUnbindOkBody();
+ final MethodRegistry registry = session.getMethodRegistry();
+ final AMQMethodBody responseBody;
+ if (registry instanceof MethodRegistry_0_9)
+ {
+ responseBody = ((MethodRegistry_0_9)registry).createQueueUnbindOkBody();
+ }
+ else if (registry instanceof MethodRegistry_0_91)
+ {
+ responseBody = ((MethodRegistry_0_91)registry).createQueueUnbindOkBody();
+ }
+ else
+ {
+ // 0-8 does not support QueueUnbind
+ throw new AMQException(AMQConstant.COMMAND_INVALID, "QueueUnbind not present in AMQP version: " + session.getProtocolVersion(), null);
+ }
session.writeFrame(responseBody.generateFrame(channelId));
-
-
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Thu Mar 1 12:51:40 2012
@@ -72,7 +72,12 @@ public class TxRollbackHandler implement
};
channel.rollback(task);
-
+
+ //Now resend all the unacknowledged messages back to the original subscribers.
+ //(Must be done after the TxnRollback-ok response).
+ // Why, are we not allowed to send messages back to client before the ok method?
+ channel.resend(false);
+
}
catch (AMQException e)
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties Thu Mar 1 12:51:40 2012
@@ -32,3 +32,7 @@ FLOW_REMOVED = CHN-1006 : Flow Control R
# 0 - time in milliseconds
OPEN_TXN = CHN-1007 : Open Transaction : {0,number} ms
IDLE_TXN = CHN-1008 : Idle Transaction : {0,number} ms
+
+DISCARDMSG_NOALTEXCH = CHN-1009 : Discarded message : {0,number} as no alternate exchange configured for queue : {1} routing key : {2}
+DISCARDMSG_NOROUTE = CHN-1010 : Discarded message : {0,number} as no binding on alternate exchange : {1}
+DEADLETTERMSG = CHN-1011 : Message : {0,number} moved to dead letter queue : {1}
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 1 12:51:40 2012
@@ -4,4 +4,4 @@
/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/management:805429-821809
/qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/management:787599
/qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/management:1061302-1072333
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1157765,1160415-1162726,1162729-1166086,1166089-1200000
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1157765,1160415-1162726,1162729-1166086,1166089-1225000
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AMQManagedObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AMQManagedObject.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AMQManagedObject.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AMQManagedObject.java Thu Mar 1 12:51:40 2012
@@ -52,8 +52,6 @@ public abstract class AMQManagedObject e
*/
protected long _notificationSequenceNumber = 0;
- protected MBeanInfo _mbeanInfo;
-
protected LogActor _logActor;
protected AMQManagedObject(Class<?> managementInterface, String typeName)
@@ -63,27 +61,8 @@ public abstract class AMQManagedObject e
// CurrentActor will be defined as these objects are created during
// broker startup.
_logActor = new ManagementActor(CurrentActor.get().getRootMessageLogger());
- buildMBeanInfo();
- }
-
- @Override
- public MBeanInfo getMBeanInfo()
- {
- return _mbeanInfo;
- }
-
- private void buildMBeanInfo() throws NotCompliantMBeanException
- {
- _mbeanInfo = new MBeanInfo(this.getClass().getName(),
- MBeanIntrospector.getMBeanDescription(this.getClass()),
- MBeanIntrospector.getMBeanAttributesInfo(getManagementInterface()),
- MBeanIntrospector.getMBeanConstructorsInfo(this.getClass()),
- MBeanIntrospector.getMBeanOperationsInfo(getManagementInterface()),
- this.getNotificationInfo());
}
-
-
// notification broadcaster implementation
public void addNotificationListener(NotificationListener listener,
@@ -99,8 +78,5 @@ public abstract class AMQManagedObject e
_broadcaster.removeNotificationListener(listener);
}
- public MBeanNotificationInfo[] getNotificationInfo()
- {
- return null;
- }
+
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org