You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/08/14 19:15:08 UTC
svn commit: r1157566 [22/23] - in /qpid:
branches/rg-amqp-1-0-sandbox/qpid/java/
branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/
branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/
branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/...
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.xml?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.xml (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.xml Sun Aug 14 17:14:51 2011
@@ -26,7 +26,7 @@
<findSubProjects name="broker-plugins" dir="broker-plugins"/>
<findSubProjects name="management" dir="management" excludes="common,example,tools/qpid-cli"/>
- <property name="modules.core" value="junit-toolkit common management/common broker client tools"/>
+ <property name="modules.core" value="junit-toolkit common management/common amqp-1-0-common broker client amqp-1-0-client tools"/>
<property name="modules.examples" value="client/example management/example"/>
<property name="modules.tests" value="systests perftests integrationtests testkit"/>
<property name="modules.management" value="${management}"/>
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java Sun Aug 14 17:14:51 2011
@@ -233,4 +233,6 @@ public abstract class Method extends Str
return str.toString();
}
+
+
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Sun Aug 14 17:14:51 2011
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.AMQException;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.subscription.Subscription;
@@ -65,7 +66,7 @@ public class SubscriptionTestHelper impl
public void setNoLocal(boolean noLocal)
{
-
+
}
public void send(QueueEntry msg)
@@ -168,11 +169,6 @@ public class SubscriptionTestHelper impl
return false; //To change body of implemented methods use File | Settings | File Templates.
}
- public void confirmAutoClose()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
public void set(String key, Object value)
{
//To change body of implemented methods use File | Settings | File Templates.
@@ -222,11 +218,6 @@ public class SubscriptionTestHelper impl
return true;
}
- public boolean isAutoClose()
- {
- return false;
- }
-
public Queue<QueueEntry> getPreDeliveryQueue()
{
return null;
@@ -286,9 +277,14 @@ public class SubscriptionTestHelper impl
{
return key.toString();
}
-
+
public boolean isSessionTransactional()
{
return false;
}
+
+ public void queueEmpty() throws AMQException
+ {
+ //TODO
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java Sun Aug 14 17:14:51 2011
@@ -20,6 +20,18 @@
*/
package org.apache.qpid.server;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.logging.*;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
@@ -28,9 +40,28 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.Broker.InitException;
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.xml.QpidLog4JConfigurator;
+import org.apache.qpid.common.QpidProperties;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean;
+import org.apache.qpid.server.information.management.ServerInformationMBean;
+import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.BrokerActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.GenericActor;
+import org.apache.qpid.server.logging.management.LoggingManagementMBean;
+import org.apache.qpid.server.logging.messages.BrokerMessages;
+import org.apache.qpid.server.protocol.AMQProtocolEngineFactory;
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION;
import org.apache.qpid.server.registry.ApplicationRegistry;
-
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.server.transport.QpidAcceptor;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
/**
* Main entry point for AMQPD.
@@ -38,41 +69,39 @@ import org.apache.qpid.server.registry.A
*/
public class Main
{
- private final Options options = new Options();
- private CommandLine commandLine;
+ private static Logger _logger;
- public static void main(String[] args)
+ private static final String DEFAULT_CONFIG_FILE = "etc/config.xml";
+
+ public static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
+ public static final String QPID_HOME = "QPID_HOME";
+ private static final int IPV4_ADDRESS_LENGTH = 4;
+
+ private static final char IPV4_LITERAL_SEPARATOR = '.';
+ private java.util.logging.Logger FRAME_LOGGER;
+ private java.util.logging.Logger RAW_LOGGER;
+
+ protected static class InitException extends Exception
{
- //if the -Dlog4j.configuration property has not been set, enable the init override
- //to stop Log4J wondering off and picking up the first log4j.xml/properties file it
- //finds from the classpath when we get the first Loggers
- if(System.getProperty("log4j.configuration") == null)
+ InitException(String msg, Throwable cause)
{
- System.setProperty("log4j.defaultInitOverride", "true");
+ super(msg, cause);
}
-
- new Main(args);
}
- public Main(final String[] args)
+ protected final Options options = new Options();
+ protected CommandLine commandLine;
+
+ protected Main(String[] args)
{
setOptions(options);
if (parseCommandline(args))
{
- try
- {
- execute();
- }
- catch(Exception e)
- {
- System.err.println("Exception during startup: " + e);
- e.printStackTrace();
- shutdown(1);
- }
+ execute();
}
}
- protected boolean parseCommandline(final String[] args)
+ protected boolean parseCommandline(String[] args)
{
try
{
@@ -90,7 +119,8 @@ public class Main
}
}
- protected void setOptions(final Options options)
+ @SuppressWarnings("static-access")
+ protected void setOptions(Options options)
{
Option help = new Option("h", "help", false, "print this message");
Option version = new Option("v", "version", false, "print the version information and exit");
@@ -134,21 +164,16 @@ public class Main
Option bind =
OptionBuilder.withArgName("bind").hasArg()
.withDescription("bind to the specified address. Overrides any value in the config file")
- .withLongOpt("bind").create(BrokerOptions.BIND);
+ .withLongOpt("bind").create("b");
Option logconfig =
OptionBuilder.withArgName("logconfig").hasArg()
.withDescription("use the specified log4j xml configuration file. By "
- + "default looks for a file named " + BrokerOptions.DEFAULT_LOG_CONFIG_FILE
- + " in the same directory as the configuration file").withLongOpt("logconfig").create(BrokerOptions.LOG_CONFIG);
+ + "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME
+ + " in the same directory as the configuration file").withLongOpt("logconfig").create("l");
Option logwatchconfig =
OptionBuilder.withArgName("logwatch").hasArg()
.withDescription("monitor the log file configuration file for changes. Units are seconds. "
- + "Zero means do not check for changes.").withLongOpt("logwatch").create(BrokerOptions.WATCH);
-
- Option sslport =
- OptionBuilder.withArgName("sslport").hasArg()
- .withDescription("SSL port. Overrides any value in the config file")
- .withLongOpt("sslport").create(BrokerOptions.SSL_PORTS);
+ + "Zero means do not check for changes.").withLongOpt("logwatch").create("w");
options.addOption(help);
options.addOption(version);
@@ -162,120 +187,472 @@ public class Main
options.addOption(exclude0_8);
options.addOption(mport);
options.addOption(bind);
- options.addOption(sslport);
}
- protected void execute() throws Exception
+ protected void execute()
{
- BrokerOptions options = new BrokerOptions();
- String configFile = commandLine.getOptionValue(BrokerOptions.CONFIG);
- if(configFile != null)
+ // note this understands either --help or -h. If an option only has a long name you can use that but if
+ // an option has a short name and a long name you must use the short name here.
+ if (commandLine.hasOption("h"))
{
- options.setConfigFile(configFile);
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("Qpid", options, true);
}
+ else if (commandLine.hasOption("v"))
+ {
+ String ver = QpidProperties.getVersionString();
+
+ StringBuilder protocol = new StringBuilder("AMQP version(s) [major.minor]: ");
+
+ boolean first = true;
+ for (ProtocolVersion pv : ProtocolVersion.getSupportedProtocolVersions())
+ {
+ if (first)
+ {
+ first = false;
+ }
+ else
+ {
+ protocol.append(", ");
+ }
+
+ protocol.append(pv.getMajorVersion()).append('-').append(pv.getMinorVersion());
+
+ }
- String logWatchConfig = commandLine.getOptionValue(BrokerOptions.WATCH);
- if(logWatchConfig != null)
+ System.out.println(ver + " (" + protocol + ")");
+ }
+ else
{
- options.setLogWatchFrequency(Integer.parseInt(logWatchConfig));
+ try
+ {
+ CurrentActor.set(new BrokerActor(new SystemOutMessageLogger()));
+ startup();
+ CurrentActor.remove();
+ }
+ catch (InitException e)
+ {
+ System.out.println("Initialisation Error : " + e.getMessage());
+ shutdown(1);
+ }
+ catch (Throwable e)
+ {
+ System.out.println("Error initialising message broker: " + e);
+ e.printStackTrace();
+ shutdown(1);
+ }
}
+ }
+
+ protected void shutdown(int status)
+ {
+ ApplicationRegistry.removeAll();
+ System.exit(status);
+ }
+
+ protected void startup() throws Exception
+ {
+
+ FRAME_LOGGER = updateLogger("FRM", "qpid-frame.log");
+ RAW_LOGGER = updateLogger("RAW", "qpid-raw.log");
+
+ final String QpidHome = System.getProperty(QPID_HOME);
+ final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE);
+ final File configFile = new File(commandLine.getOptionValue("c", defaultConfigFile.getPath()));
+ if (!configFile.exists())
+ {
+ String error = "File " + configFile + " could not be found. Check the file exists and is readable.";
- String logConfig = commandLine.getOptionValue(BrokerOptions.LOG_CONFIG);
- if(logConfig != null)
+ if (QpidHome == null)
+ {
+ error = error + "\nNote: " + QPID_HOME + " is not set.";
+ }
+
+ throw new InitException(error, null);
+ }
+ else
{
- options.setLogConfigFile(logConfig);
+ CurrentActor.get().message(BrokerMessages.CONFIG(configFile.getAbsolutePath()));
}
- String jmxPort = commandLine.getOptionValue(BrokerOptions.MANAGEMENT);
- if(jmxPort != null)
+ String logConfig = commandLine.getOptionValue("l");
+ String logWatchConfig = commandLine.getOptionValue("w", "0");
+
+ int logWatchTime = 0;
+ try
+ {
+ logWatchTime = Integer.parseInt(logWatchConfig);
+ }
+ catch (NumberFormatException e)
{
- options.setJmxPort(Integer.parseInt(jmxPort));
+ System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be "
+ + "a non-negative integer. Using default of zero (no watching configured");
}
- String bindAddr = commandLine.getOptionValue(BrokerOptions.BIND);
- if (bindAddr != null)
+ File logConfigFile;
+ if (logConfig != null)
+ {
+ logConfigFile = new File(logConfig);
+ configureLogging(logConfigFile, logWatchTime);
+ }
+ else
{
- options.setBind(bindAddr);
+ File configFileDirectory = configFile.getParentFile();
+ logConfigFile = new File(configFileDirectory, DEFAULT_LOG_CONFIG_FILENAME);
+ configureLogging(logConfigFile, logWatchTime);
}
- String[] portStr = commandLine.getOptionValues(BrokerOptions.PORTS);
- if(portStr != null)
+ ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile);
+ ServerConfiguration serverConfig = config.getConfiguration();
+ updateManagementPort(serverConfig, commandLine.getOptionValue("m"));
+
+ ApplicationRegistry.initialise(config);
+
+ // We have already loaded the BrokerMessages class by this point so we
+ // need to refresh the locale setting incase we had a different value in
+ // the configuration.
+ BrokerMessages.reload();
+
+ // AR.initialise() sets and removes its own actor so we now need to set the actor
+ // for the remainder of the startup, and the default actor if the stack is empty
+ CurrentActor.set(new BrokerActor(config.getCompositeStartupMessageLogger()));
+ CurrentActor.setDefault(new BrokerActor(config.getRootMessageLogger()));
+ GenericActor.setDefaultMessageLogger(config.getRootMessageLogger());
+
+
+ try
{
- parsePortArray(options, portStr, false);
- for(ProtocolExclusion pe : ProtocolExclusion.values())
+ configureLoggingManagementMBean(logConfigFile, logWatchTime);
+
+ ConfigurationManagementMBean configMBean = new ConfigurationManagementMBean();
+ configMBean.register();
+
+ ServerInformationMBean sysInfoMBean =
+ new ServerInformationMBean(QpidProperties.getBuildVersion(), QpidProperties.getReleaseVersion());
+ sysInfoMBean.register();
+
+
+ String[] portStr = commandLine.getOptionValues("p");
+
+ Set<Integer> ports = new HashSet<Integer>();
+ Set<Integer> exclude_0_10 = new HashSet<Integer>();
+ Set<Integer> exclude_0_9_1 = new HashSet<Integer>();
+ Set<Integer> exclude_0_9 = new HashSet<Integer>();
+ Set<Integer> exclude_0_8 = new HashSet<Integer>();
+
+ if(portStr == null || portStr.length == 0)
{
- parsePortArray(options, commandLine.getOptionValues(pe.getExcludeName()), pe);
+
+ parsePortList(ports, serverConfig.getPorts());
+ parsePortList(exclude_0_10, serverConfig.getPortExclude010());
+ parsePortList(exclude_0_9_1, serverConfig.getPortExclude091());
+ parsePortList(exclude_0_9, serverConfig.getPortExclude09());
+ parsePortList(exclude_0_8, serverConfig.getPortExclude08());
+
}
- }
+ else
+ {
+ parsePortArray(ports, portStr);
+ parsePortArray(exclude_0_10, commandLine.getOptionValues("exclude-0-10"));
+ parsePortArray(exclude_0_9_1, commandLine.getOptionValues("exclude-0-9-1"));
+ parsePortArray(exclude_0_9, commandLine.getOptionValues("exclude-0-9"));
+ parsePortArray(exclude_0_8, commandLine.getOptionValues("exclude-0-8"));
- String[] sslPortStr = commandLine.getOptionValues(BrokerOptions.SSL_PORTS);
- if(sslPortStr != null)
- {
- parsePortArray(options, sslPortStr, true);
- for(ProtocolExclusion pe : ProtocolExclusion.values())
+ }
+
+
+
+
+ String bindAddr = commandLine.getOptionValue("b");
+ if (bindAddr == null)
+ {
+ bindAddr = serverConfig.getBind();
+ }
+ InetAddress bindAddress = null;
+
+
+
+ if (bindAddr.equals("wildcard"))
{
- parsePortArray(options, commandLine.getOptionValues(pe.getExcludeName()), pe);
+ bindAddress = new InetSocketAddress(0).getAddress();
}
+ else
+ {
+ bindAddress = InetAddress.getByAddress(parseIP(bindAddr));
+ }
+
+ String hostName = bindAddress.getCanonicalHostName();
+
+
+ String keystorePath = serverConfig.getKeystorePath();
+ String keystorePassword = serverConfig.getKeystorePassword();
+ String certType = serverConfig.getCertType();
+ SSLContextFactory sslFactory = null;
+
+ if (!serverConfig.getSSLOnly())
+ {
+
+ for(int port : ports)
+ {
+
+ NetworkDriver driver = new MINANetworkDriver();
+
+ Set<VERSION> supported = EnumSet.allOf(VERSION.class);
+
+ if(exclude_0_10.contains(port))
+ {
+ supported.remove(VERSION.v0_10);
+ }
+
+ if(exclude_0_9_1.contains(port))
+ {
+ supported.remove(VERSION.v0_9_1);
+ }
+ if(exclude_0_9.contains(port))
+ {
+ supported.remove(VERSION.v0_9);
+ }
+ if(exclude_0_8.contains(port))
+ {
+ supported.remove(VERSION.v0_8);
+ }
+
+ MultiVersionProtocolEngineFactory protocolEngineFactory =
+ new MultiVersionProtocolEngineFactory(hostName, supported);
+
+
+
+ driver.bind(port, new InetAddress[]{bindAddress}, protocolEngineFactory,
+ serverConfig.getNetworkConfiguration(), null);
+ ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
+ new QpidAcceptor(driver,"TCP"));
+ CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port));
+
+ }
+
+ }
+
+ if (serverConfig.getEnableSSL())
+ {
+ sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
+ NetworkDriver driver = new MINANetworkDriver();
+ driver.bind(serverConfig.getSSLPort(), new InetAddress[]{bindAddress},
+ new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory);
+ ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, serverConfig.getSSLPort()),
+ new QpidAcceptor(driver,"TCP"));
+ CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", serverConfig.getSSLPort()));
+ }
+
+ CurrentActor.get().message(BrokerMessages.READY());
+
}
-
- startBroker(options);
+ finally
+ {
+ // Startup is complete so remove the AR initialised Startup actor
+ CurrentActor.remove();
+ }
+
+
+
}
- protected void startBroker(final BrokerOptions options) throws Exception
+ private java.util.logging.Logger updateLogger(final String logType, String logFileName) throws IOException
{
- Broker broker = new Broker();
- broker.startup(options);
+ java.util.logging.Logger logger = java.util.logging.Logger.getLogger(logType);
+ logger.setLevel(Level.FINE);
+ Formatter formatter = new Formatter()
+ {
+ @Override
+ public String format(final LogRecord record)
+ {
+
+ return "[" + record.getMillis() + " "+ logType +"]\t" + record.getMessage() + "\n";
+ }
+ };
+ for(Handler handler : logger.getHandlers())
+ {
+ logger.removeHandler(handler);
+ }
+ Handler handler = new ConsoleHandler();
+
+ handler.setLevel(Level.FINE);
+ handler.setFormatter(formatter);
+
+ logger.addHandler(handler);
+
+
+ handler = new FileHandler(logFileName, true);
+ handler.setLevel(Level.FINE);
+ handler.setFormatter(formatter);
+
+ logger.addHandler(handler);
+ return logger;
}
- protected void shutdown(final int status)
+ private void parsePortArray(Set<Integer> ports, String[] portStr)
+ throws InitException
{
- ApplicationRegistry.remove();
- System.exit(status);
+ if(portStr != null)
+ {
+ for(int i = 0; i < portStr.length; i++)
+ {
+ try
+ {
+ ports.add(Integer.parseInt(portStr[i]));
+ }
+ catch (NumberFormatException e)
+ {
+ throw new InitException("Invalid port: " + portStr[i], e);
+ }
+ }
+ }
}
- private static void parsePortArray(final BrokerOptions options,final Object[] ports,
- final boolean ssl) throws InitException
+ private void parsePortList(Set<Integer> output, List input)
+ throws InitException
{
- if(ports != null)
+ if(input != null)
{
- for(int i = 0; i < ports.length; i++)
+ for(Object port : input)
{
try
{
- if(ssl)
- {
- options.addSSLPort(Integer.parseInt(String.valueOf(ports[i])));
- }
- else
- {
- options.addPort(Integer.parseInt(String.valueOf(ports[i])));
- }
+ output.add(Integer.parseInt(String.valueOf(port)));
}
catch (NumberFormatException e)
{
- throw new InitException("Invalid port: " + ports[i], e);
+ throw new InitException("Invalid port: " + port, e);
}
}
}
}
- private static void parsePortArray(final BrokerOptions options, final Object[] ports,
- final ProtocolExclusion excludedProtocol) throws InitException
+ /**
+ * Update the configuration data with the management port.
+ * @param configuration
+ * @param managementPort The string from the command line
+ */
+ private void updateManagementPort(ServerConfiguration configuration, String managementPort)
{
- if(ports != null)
+ if (managementPort != null)
{
- for(int i = 0; i < ports.length; i++)
+ try
+ {
+ configuration.setJMXManagementPort(Integer.parseInt(managementPort));
+ }
+ catch (NumberFormatException e)
{
+ _logger.warn("Invalid management port: " + managementPort + " will use:" + configuration.getJMXManagementPort(), e);
+ }
+ }
+ }
+
+ public static void main(String[] args)
+ {
+ //if the -Dlog4j.configuration property has not been set, enable the init override
+ //to stop Log4J wondering off and picking up the first log4j.xml/properties file it
+ //finds from the classpath when we get the first Loggers
+ if(System.getProperty("log4j.configuration") == null)
+ {
+ System.setProperty("log4j.defaultInitOverride", "true");
+ }
+
+ //now that the override status is know, we can instantiate the Loggers
+ _logger = Logger.getLogger(Main.class);
+
+ new Main(args);
+ }
+
+ private byte[] parseIP(String address) throws Exception
+ {
+ char[] literalBuffer = address.toCharArray();
+ int byteCount = 0;
+ int currByte = 0;
+ byte[] ip = new byte[IPV4_ADDRESS_LENGTH];
+ for (int i = 0; i < literalBuffer.length; i++)
+ {
+ char currChar = literalBuffer[i];
+ if ((currChar >= '0') && (currChar <= '9'))
+ {
+ currByte = (currByte * 10) + (Character.digit(currChar, 10) & 0xFF);
+ }
+
+ if (currChar == IPV4_LITERAL_SEPARATOR || (i + 1 == literalBuffer.length))
+ {
+ ip[byteCount++] = (byte) currByte;
+ currByte = 0;
+ }
+ }
+
+ if (byteCount != 4)
+ {
+ throw new Exception("Invalid IP address: " + address);
+ }
+ return ip;
+ }
+
+ private void configureLogging(File logConfigFile, int logWatchTime) throws InitException, IOException
+ {
+ if (logConfigFile.exists() && logConfigFile.canRead())
+ {
+ CurrentActor.get().message(BrokerMessages.LOG_CONFIG(logConfigFile.getAbsolutePath()));
+
+ if (logWatchTime > 0)
+ {
+ System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every "
+ + logWatchTime + " seconds");
+ // log4j expects the watch interval in milliseconds
try
{
- options.addExcludedPort(excludedProtocol,
- Integer.parseInt(String.valueOf(ports[i])));
+ QpidLog4JConfigurator.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000);
}
- catch (NumberFormatException e)
+ catch (Exception e)
+ {
+ throw new InitException(e.getMessage(),e);
+ }
+ }
+ else
+ {
+ try
+ {
+ QpidLog4JConfigurator.configure(logConfigFile.getPath());
+ }
+ catch (Exception e)
{
- throw new InitException("Invalid port for exclusion: " + ports[i], e);
+ throw new InitException(e.getMessage(),e);
}
}
}
+ else
+ {
+ System.err.println("Logging configuration error: unable to read file " + logConfigFile.getAbsolutePath());
+ System.err.println("Using the fallback internal log4j.properties configuration");
+
+ InputStream propsFile = this.getClass().getResourceAsStream("/log4j.properties");
+ if(propsFile == null)
+ {
+ throw new IOException("Unable to load the fallback internal log4j.properties configuration file");
+ }
+ else
+ {
+ try
+ {
+ Properties fallbackProps = new Properties();
+ fallbackProps.load(propsFile);
+ PropertyConfigurator.configure(fallbackProps);
+ }
+ finally
+ {
+ propsFile.close();
+ }
+ }
+ }
+ }
+
+ private void configureLoggingManagementMBean(File logConfigFile, int logWatchTime) throws Exception
+ {
+ LoggingManagementMBean blm = new LoggingManagementMBean(logConfigFile.getPath(),logWatchTime);
+
+ blm.register();
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java Sun Aug 14 17:14:51 2011
@@ -20,44 +20,56 @@
*/
package org.apache.qpid.server.protocol;
-import java.util.EnumSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.transport.network.NetworkConnection;
+
+import java.util.Set;
+import java.util.Arrays;
+import java.util.HashSet;
public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
{
- private static final Set<AmqpProtocolVersion> ALL_VERSIONS = EnumSet.allOf(AmqpProtocolVersion.class);
- private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
+ ;
+
+
+ public enum VERSION { v0_8, v0_9, v0_9_1, v0_10, v1_0_0 };
+
+ private static final Set<VERSION> ALL_VERSIONS = new HashSet<VERSION>(Arrays.asList(VERSION.values()));
private final IApplicationRegistry _appRegistry;
private final String _fqdn;
- private final Set<AmqpProtocolVersion> _supported;
+ private final Set<VERSION> _supported;
+
public MultiVersionProtocolEngineFactory()
{
- this("localhost", ALL_VERSIONS);
+ this(1, "localhost", ALL_VERSIONS);
+ }
+
+ public MultiVersionProtocolEngineFactory(String fqdn, Set<VERSION> versions)
+ {
+ this(1, fqdn, versions);
}
+
public MultiVersionProtocolEngineFactory(String fqdn)
{
- this(fqdn, ALL_VERSIONS);
+ this(1, fqdn, ALL_VERSIONS);
}
- public MultiVersionProtocolEngineFactory(String fqdn, Set<AmqpProtocolVersion> supportedVersions)
+ public MultiVersionProtocolEngineFactory(int instance, String fqdn, Set<VERSION> supportedVersions)
{
- _appRegistry = ApplicationRegistry.getInstance();
+ _appRegistry = ApplicationRegistry.getInstance(instance);
_fqdn = fqdn;
_supported = supportedVersions;
}
- public ServerProtocolEngine newProtocolEngine(NetworkConnection network)
+
+ public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver)
{
- return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network, ID_GENERATOR.getAndIncrement());
+ return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, networkDriver);
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Sun Aug 14 17:14:51 2011
@@ -44,7 +44,7 @@ import org.apache.qpid.server.logging.su
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.security.PrincipalHolder;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -83,7 +83,7 @@ public class SimpleAMQQueue implements A
/** null means shared */
private final AMQShortString _owner;
- private AuthorizationHolder _authorizationHolder;
+ private PrincipalHolder _prinicpalHolder;
private boolean _exclusive = false;
private AMQSessionModel _exclusiveOwner;
@@ -102,7 +102,9 @@ public class SimpleAMQQueue implements A
protected final QueueEntryList _entries;
- protected final SubscriptionList _subscriptionList = new SubscriptionList();
+ protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
+
+ private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
private volatile Subscription _exclusiveSubscriber;
@@ -186,7 +188,7 @@ public class SimpleAMQQueue implements A
//TODO : persist creation time
private long _createTime = System.currentTimeMillis();
private ConfigurationPlugin _queueConfiguration;
-
+ private final boolean _isTopic;
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments)
@@ -232,10 +234,12 @@ public class SimpleAMQQueue implements A
_exclusive = exclusive;
_virtualHost = virtualHost;
_entries = entryListFactory.createQueueEntryList(this);
- _arguments = arguments;
+ _arguments = arguments == null ? Collections.EMPTY_MAP : arguments;
_id = virtualHost.getConfigStore().createId();
+ _isTopic = arguments != null && arguments.containsKey("topic");
+
_asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
_logSubject = new QueueLogSubject(this);
@@ -327,7 +331,7 @@ public class SimpleAMQQueue implements A
{
return _exclusive;
}
-
+
public void setExclusive(boolean exclusive) throws AMQException
{
_exclusive = exclusive;
@@ -371,14 +375,14 @@ public class SimpleAMQQueue implements A
return _owner;
}
- public AuthorizationHolder getAuthorizationHolder()
+ public PrincipalHolder getPrincipalHolder()
{
- return _authorizationHolder;
+ return _prinicpalHolder;
}
- public void setAuthorizationHolder(final AuthorizationHolder authorizationHolder)
+ public void setPrincipalHolder(PrincipalHolder prinicpalHolder)
{
- _authorizationHolder = authorizationHolder;
+ _prinicpalHolder = prinicpalHolder;
}
@@ -402,8 +406,8 @@ public class SimpleAMQQueue implements A
{
throw new AMQSecurityException("Permission denied");
}
-
-
+
+
if (hasExclusiveSubscriber())
{
throw new ExistingExclusiveSubscription();
@@ -433,14 +437,14 @@ public class SimpleAMQQueue implements A
subscription.setNoLocal(_nolocal);
}
_subscriptionList.add(subscription);
-
+
//Increment consumerCountHigh if necessary. (un)registerSubscription are both
//synchronized methods so we don't need additional synchronization here
if(_counsumerCountHigh.get() < getConsumerCount())
{
_counsumerCountHigh.incrementAndGet();
}
-
+
if (isDeleted())
{
subscription.queueDeleted(this);
@@ -486,6 +490,11 @@ public class SimpleAMQQueue implements A
// queue. This is because the delete method uses the subscription set which has just been cleared
subscription.queueDeleted(this);
}
+
+ if(_subscriptionList.size() == 0 && _isTopic)
+ {
+ clearQueue();
+ }
}
}
@@ -512,10 +521,10 @@ public class SimpleAMQQueue implements A
break;
}
}
-
+
reconfigure();
}
-
+
private void reconfigure()
{
//Reconfigure the queue for to reflect this new binding.
@@ -541,7 +550,7 @@ public class SimpleAMQQueue implements A
public void removeBinding(final Binding binding)
{
_bindings.remove(binding);
-
+
reconfigure();
}
@@ -568,101 +577,104 @@ public class SimpleAMQQueue implements A
public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
{
- incrementTxnEnqueueStats(message);
- incrementQueueCount();
- incrementQueueSize(message);
_totalMessagesReceived.incrementAndGet();
- QueueEntry entry;
Subscription exclusiveSub = _exclusiveSubscriber;
-
- if (exclusiveSub != null)
+ if(!_isTopic || _subscriptionList.size()!=0)
{
- exclusiveSub.getSendLock();
+ incrementTxnEnqueueStats(message);
+ incrementQueueCount();
+ incrementQueueSize(message);
- try
- {
- entry = _entries.add(message);
+ QueueEntry entry;
- deliverToSubscription(exclusiveSub, entry);
- }
- finally
+ if (exclusiveSub != null)
{
- exclusiveSub.releaseSendLock();
- }
- }
- else
- {
- entry = _entries.add(message);
- /*
-
- iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
+ exclusiveSub.getSendLock();
- */
- SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode();
- SubscriptionList.SubscriptionNode nextNode = node.findNext();
- if (nextNode == null)
- {
- nextNode = _subscriptionList.getHead().findNext();
- }
- while (nextNode != null)
- {
- if (_subscriptionList.updateMarkedNode(node, nextNode))
+ try
{
- break;
+ entry = _entries.add(message);
+
+ deliverToSubscription(exclusiveSub, entry);
}
- else
+ finally
{
- node = _subscriptionList.getMarkedNode();
- nextNode = node.findNext();
- if (nextNode == null)
- {
- nextNode = _subscriptionList.getHead().findNext();
- }
+ exclusiveSub.releaseSendLock();
}
}
+ else
+ {
+ entry = _entries.add(message);
+ /*
- // always do one extra loop after we believe we've finished
- // this catches the case where we *just* miss an update
- int loops = 2;
+ iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
- while (entry.isAvailable() && loops != 0)
- {
+ */
+ SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
+ SubscriptionList.SubscriptionNode nextNode = node.getNext();
if (nextNode == null)
{
- loops--;
- nextNode = _subscriptionList.getHead();
+ nextNode = _subscriptionList.getHead().getNext();
}
- else
+ while (nextNode != null)
{
- // if subscription at end, and active, offer
- Subscription sub = nextNode.getSubscription();
- deliverToSubscription(sub, entry);
+ if (_lastSubscriptionNode.compareAndSet(node, nextNode))
+ {
+ break;
+ }
+ else
+ {
+ node = _lastSubscriptionNode.get();
+ nextNode = node.getNext();
+ if (nextNode == null)
+ {
+ nextNode = _subscriptionList.getHead().getNext();
+ }
+ }
}
- nextNode = nextNode.findNext();
+ // always do one extra loop after we believe we've finished
+ // this catches the case where we *just* miss an update
+ int loops = 2;
+
+ while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0)
+ {
+ if (nextNode == null)
+ {
+ loops--;
+ nextNode = _subscriptionList.getHead();
+ }
+ else
+ {
+ // if subscription at end, and active, offer
+ Subscription sub = nextNode.getSubscription();
+ deliverToSubscription(sub, entry);
+ }
+ nextNode = nextNode.getNext();
+
+ }
}
- }
- if (entry.isAvailable())
- {
- checkSubscriptionsNotAheadOfDelivery(entry);
+ if (!(entry.isAcquired() || entry.isDeleted()))
+ {
+ checkSubscriptionsNotAheadOfDelivery(entry);
- deliverAsync();
- }
+ deliverAsync();
+ }
- if(_managedObject != null)
- {
- _managedObject.checkForNotification(entry.getMessage());
- }
+ if(_managedObject != null)
+ {
+ _managedObject.checkForNotification(entry.getMessage());
+ }
- if(action != null)
- {
- action.onEnqueue(entry);
+ if(action != null)
+ {
+ action.onEnqueue(entry);
+ }
}
-
}
private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
@@ -718,20 +730,20 @@ public class SimpleAMQQueue implements A
{
getAtomicQueueCount().incrementAndGet();
}
-
+
private void incrementTxnEnqueueStats(final ServerMessage message)
{
SessionConfig session = message.getSessionConfig();
-
+
if(session !=null && session.isTransactional())
{
_msgTxnEnqueues.incrementAndGet();
_byteTxnEnqueues.addAndGet(message.getSize());
}
}
-
+
private void incrementTxnDequeueStats(QueueEntry entry)
- {
+ {
_msgTxnDequeues.incrementAndGet();
_byteTxnDequeues.addAndGet(entry.getSize());
}
@@ -745,6 +757,40 @@ public class SimpleAMQQueue implements A
incrementUnackedMsgCount();
sub.send(entry);
+
+ if(_isTopic)
+ {
+ if(allSubscriptionsAhead(entry) && entry.acquire())
+ {
+ entry.discard();
+ }
+ }
+ }
+
+ private boolean allSubscriptionsAhead(final QueueEntry entry)
+ {
+ SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator();
+ while(subIter.advance() && !entry.isAcquired())
+ {
+ final Subscription subscription = subIter.getNode().getSubscription();
+ if(!subscription.isClosed())
+ {
+ QueueContext context = (QueueContext) subscription.getQueueContext();
+ if(context != null)
+ {
+ QueueEntry subnode = context._lastSeenEntry;
+ if(subnode.compareTo(entry)<0)
+ {
+ return false;
+ }
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+ return true;
}
private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException
@@ -803,6 +849,24 @@ public class SimpleAMQQueue implements A
}
+ public void requeue(QueueEntryImpl entry, Subscription subscription)
+ {
+ SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
+ while (subscriberIter.advance())
+ {
+ Subscription sub = subscriberIter.getNode().getSubscription();
+
+ // we don't make browsers send the same stuff twice
+ if (sub.seesRequeues() && (!sub.acquires() && sub == subscription))
+ {
+ updateSubRequeueEntry(sub, entry);
+ }
+ }
+
+ deliverAsync();
+ }
+
public void dequeue(QueueEntry entry, Subscription sub)
{
decrementQueueCount();
@@ -811,7 +875,7 @@ public class SimpleAMQQueue implements A
{
_deliveredMessages.decrementAndGet();
}
-
+
if(sub != null && sub.isSessionTransactional())
{
incrementTxnDequeueStats(entry);
@@ -868,7 +932,7 @@ public class SimpleAMQQueue implements A
{
return _subscriptionList.size();
}
-
+
public int getConsumerCountHigh()
{
return _counsumerCountHigh.get();
@@ -940,7 +1004,7 @@ public class SimpleAMQQueue implements A
while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- if (node != null && !node.isDispensed())
+ if (node != null && !node.isDeleted())
{
entryList.add(node);
}
@@ -1044,7 +1108,7 @@ public class SimpleAMQQueue implements A
while (queueListIterator.advance() && !filter.filterComplete())
{
QueueEntry node = queueListIterator.getNode();
- if (!node.isDispensed() && filter.accept(node))
+ if (!node.isDeleted() && filter.accept(node))
{
entryList.add(node);
}
@@ -1238,6 +1302,7 @@ public class SimpleAMQQueue implements A
if ((messageId >= fromMessageId)
&& (messageId <= toMessageId)
+ && !node.isDeleted()
&& node.acquire())
{
dequeueEntry(node);
@@ -1267,7 +1332,7 @@ public class SimpleAMQQueue implements A
while (noDeletes && queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- if (node.acquire())
+ if (!node.isDeleted() && node.acquire())
{
dequeueEntry(node);
noDeletes = false;
@@ -1277,7 +1342,7 @@ public class SimpleAMQQueue implements A
}
public long clearQueue() throws AMQException
- {
+ {
return clear(0l);
}
@@ -1288,7 +1353,7 @@ public class SimpleAMQQueue implements A
{
throw new AMQSecurityException("Permission denied: queue " + getName());
}
-
+
QueueEntryIterator queueListIterator = _entries.iterator();
long count = 0;
@@ -1297,7 +1362,7 @@ public class SimpleAMQQueue implements A
while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- if (node.acquire())
+ if (!node.isDeleted() && node.acquire())
{
dequeueEntry(node, txn);
if(++count == request)
@@ -1355,7 +1420,7 @@ public class SimpleAMQQueue implements A
{
throw new AMQSecurityException("Permission denied: " + getName());
}
-
+
if (!_deleted.getAndSet(true))
{
@@ -1564,7 +1629,7 @@ public class SimpleAMQQueue implements A
public void deliverAsync()
{
- QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet());
+ Runner runner = new Runner(_stateChangeCount.incrementAndGet());
if (_asynchronousRunner.compareAndSet(null, runner))
{
@@ -1583,6 +1648,52 @@ public class SimpleAMQQueue implements A
_asyncDelivery.execute(flusher);
}
+
+ private class Runner implements ReadWriteRunnable
+ {
+ String _name;
+ public Runner(long count)
+ {
+ _name = "QueueRunner-" + count + "-" + _logActor;
+ }
+
+ public void run()
+ {
+ String originalName = Thread.currentThread().getName();
+ try
+ {
+ Thread.currentThread().setName(_name);
+ CurrentActor.set(_logActor);
+
+ processQueue(this);
+ }
+ catch (AMQException e)
+ {
+ _logger.error(e);
+ }
+ finally
+ {
+ CurrentActor.remove();
+ Thread.currentThread().setName(originalName);
+ }
+ }
+
+ public boolean isRead()
+ {
+ return false;
+ }
+
+ public boolean isWrite()
+ {
+ return true;
+ }
+
+ public String toString()
+ {
+ return _name;
+ }
+ }
+
public void flushSubscription(Subscription sub) throws AMQException
{
// Access control
@@ -1603,12 +1714,9 @@ public class SimpleAMQQueue implements A
{
sub.getSendLock();
atTail = attemptDelivery(sub);
- if (atTail && sub.isAutoClose())
+ if (atTail && getNextAvailableEntry(sub) == null)
{
- unregisterSubscription(sub);
-
- sub.confirmAutoClose();
-
+ sub.queueEmpty();
}
else if (!atTail)
{
@@ -1629,6 +1737,7 @@ public class SimpleAMQQueue implements A
{
advanceAllSubscriptions();
}
+
return atTail;
}
@@ -1651,7 +1760,7 @@ public class SimpleAMQQueue implements A
QueueEntry node = getNextAvailableEntry(sub);
- if (node != null && node.isAvailable())
+ if (node != null && !(node.isAcquired() || node.isDeleted()))
{
if (sub.hasInterest(node))
{
@@ -1712,7 +1821,7 @@ public class SimpleAMQQueue implements A
QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
boolean expired = false;
- while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node)))
+ while (node != null && (node.isAcquired() || node.isDeleted() || (expired = node.expired()) || !sub.hasInterest(node)))
{
if (expired)
{
@@ -1741,40 +1850,14 @@ public class SimpleAMQQueue implements A
}
- /**
- * Used by queue Runners to asynchronously deliver messages to consumers.
- *
- * A queue Runner is started whenever a state change occurs, e.g when a new
- * message arrives on the queue and cannot be immediately delivered to a
- * subscription (i.e. asynchronous delivery is required). Unless there are
- * SubFlushRunners operating (due to subscriptions unsuspending) which are
- * capable of accepting/delivering all messages then these messages would
- * otherwise remain on the queue.
- *
- * processQueue should be running while there are messages on the queue AND
- * there are subscriptions that can deliver them. If there are no
- * subscriptions capable of delivering the remaining messages on the queue
- * then processQueue should stop to prevent spinning.
- *
- * Since processQueue is runs in a fixed size Executor, it should not run
- * indefinitely to prevent starving other tasks of CPU (e.g jobs to process
- * incoming messages may not be able to be scheduled in the thread pool
- * because all threads are working on clearing down large queues). To solve
- * this problem, after an arbitrary number of message deliveries the
- * processQueue job stops iterating, resubmits itself to the executor, and
- * ends the current instance
- *
- * @param runner the Runner to schedule
- * @throws AMQException
- */
- public void processQueue(QueueRunner runner) throws AMQException
+ private void processQueue(Runnable runner) throws AMQException
{
long stateChangeCount;
long previousStateChangeCount = Long.MIN_VALUE;
boolean deliveryIncomplete = true;
- boolean lastLoop = false;
- int iterations = MAX_ASYNC_DELIVERIES;
+ int extraLoops = 1;
+ long iterations = MAX_ASYNC_DELIVERIES;
_asynchronousRunner.compareAndSet(runner, null);
@@ -1791,14 +1874,12 @@ public class SimpleAMQQueue implements A
if (previousStateChangeCount != stateChangeCount)
{
- //further asynchronous delivery is required since the
- //previous loop. keep going if iteration slicing allows.
- lastLoop = false;
+ extraLoops = 1;
}
previousStateChangeCount = stateChangeCount;
- boolean allSubscriptionsDone = true;
- boolean subscriptionDone;
+ deliveryIncomplete = _subscriptionList.size() != 0;
+ boolean done;
SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
//iterate over the subscribers and try to advance their pointer
@@ -1808,25 +1889,29 @@ public class SimpleAMQQueue implements A
sub.getSendLock();
try
{
- //attempt delivery. returns true if no further delivery currently possible to this sub
- subscriptionDone = attemptDelivery(sub);
- if (subscriptionDone)
+
+ done = attemptDelivery(sub);
+
+ if (done)
{
- //close autoClose subscriptions if we are not currently intent on continuing
- if (lastLoop && sub.isAutoClose())
+ if (extraLoops == 0)
{
- unregisterSubscription(sub);
+ if(getNextAvailableEntry(sub) == null)
+ {
+ sub.queueEmpty();
+ }
+ deliveryIncomplete = false;
- sub.confirmAutoClose();
+ }
+ else
+ {
+ extraLoops--;
}
}
else
{
- //this subscription can accept additional deliveries, so we must
- //keep going after this (if iteration slicing allows it)
- allSubscriptionsDone = false;
- lastLoop = false;
iterations--;
+ extraLoops = 1;
}
}
finally
@@ -1834,34 +1919,10 @@ public class SimpleAMQQueue implements A
sub.releaseSendLock();
}
}
-
- if(allSubscriptionsDone && lastLoop)
- {
- //We have done an extra loop already and there are again
- //again no further delivery attempts possible, only
- //keep going if state change demands it.
- deliveryIncomplete = false;
- }
- else if(allSubscriptionsDone)
- {
- //All subscriptions reported being done, but we have to do
- //an extra loop if the iterations are not exhausted and
- //there is still any work to be done
- deliveryIncomplete = _subscriptionList.size() != 0;
- lastLoop = true;
- }
- else
- {
- //some subscriptions can still accept more messages,
- //keep going if iteration count allows.
- lastLoop = false;
- deliveryIncomplete = true;
- }
-
_asynchronousRunner.set(null);
}
- // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit
+ // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit
// therefore we should schedule this runner again (unless someone beats us to it :-) ).
if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
{
@@ -1881,8 +1942,8 @@ public class SimpleAMQQueue implements A
while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- // Only process nodes that are not currently deleted and not dequeued
- if (!node.isDispensed())
+ // Only process nodes that are not currently deleted
+ if (!node.isDeleted())
{
// If the node has exired then aquire it
if (node.expired() && node.acquire())
@@ -2146,22 +2207,22 @@ public class SimpleAMQQueue implements A
{
return _dequeueSize.get();
}
-
+
public long getByteTxnEnqueues()
{
return _byteTxnEnqueues.get();
}
-
+
public long getByteTxnDequeues()
{
return _byteTxnDequeues.get();
}
-
+
public long getMsgTxnEnqueues()
{
return _msgTxnEnqueues.get();
}
-
+
public long getMsgTxnDequeues()
{
return _msgTxnDequeues.get();
@@ -2198,21 +2259,21 @@ public class SimpleAMQQueue implements A
{
return _unackedMsgCountHigh.get();
}
-
+
public long getUnackedMessageCount()
{
return _unackedMsgCount.get();
}
-
+
public void decrementUnackedMsgCount()
{
_unackedMsgCount.decrementAndGet();
}
-
+
private void incrementUnackedMsgCount()
{
long unackedMsgCount = _unackedMsgCount.incrementAndGet();
-
+
long unackedMsgCountHigh;
while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get()))
{
@@ -2222,9 +2283,4 @@ public class SimpleAMQQueue implements A
}
}
}
-
- public LogActor getLogActor()
- {
- return _logActor;
- }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java Sun Aug 14 17:14:51 2011
@@ -20,73 +20,20 @@
*/
package org.apache.qpid.server.security.auth.manager;
-import javax.security.auth.Subject;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
+import org.apache.qpid.amqp_1_0.transport.CallbackHanderSource;
import org.apache.qpid.common.Closeable;
-import org.apache.qpid.server.plugins.Plugin;
import org.apache.qpid.server.security.auth.AuthenticationResult;
-/**
- * Implementations of the AuthenticationManager are responsible for determining
- * the authenticity of a user's credentials.
- *
- * If the authentication is successful, the manager is responsible for producing a populated
- * {@link Subject} containing the user's identity and zero or more principals representing
- * groups to which the user belongs.
- * <p>
- * The {@link #initialise()} method is responsible for registering SASL mechanisms required by
- * the manager. The {@link #close()} method must reverse this registration.
- *
- */
-public interface AuthenticationManager extends Closeable, Plugin
+public interface AuthenticationManager extends Closeable, CallbackHanderSource
{
- /** The name for the required SASL Server mechanisms */
- public static final String PROVIDER_NAME= "AMQSASLProvider-Server";
-
- /**
- * Initialise the authentication plugin.
- *
- */
- void initialise();
-
- /**
- * Gets the SASL mechanisms known to this manager.
- *
- * @return SASL mechanism names, space separated.
- */
String getMechanisms();
- /**
- * Creates a SASL server for the specified mechanism name for the given
- * fully qualified domain name.
- *
- * @param mechanism mechanism name
- * @param localFQDN domain name
- *
- * @return SASL server
- * @throws SaslException
- */
SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException;
- /**
- * Authenticates a user using SASL negotiation.
- *
- * @param server SASL server
- * @param response SASL response to process
- *
- * @return authentication result
- */
AuthenticationResult authenticate(SaslServer server, byte[] response);
- /**
- * Authenticates a user using their username and password.
- *
- * @param username username
- * @param password password
- *
- * @return authentication result
- */
- AuthenticationResult authenticate(String username, String password);
+
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java Sun Aug 14 17:14:51 2011
@@ -20,65 +20,27 @@
*/
package org.apache.qpid.server.security.auth.manager;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.security.Security;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
-import javax.security.auth.Subject;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.login.AccountNotFoundException;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-import javax.security.sasl.SaslServerFactory;
-
+import org.apache.log4j.Logger;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.log4j.Logger;
-import org.apache.qpid.configuration.PropertyException;
-import org.apache.qpid.configuration.PropertyUtils;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
-import org.apache.qpid.server.security.auth.AuthenticationResult;
-import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
-import org.apache.qpid.server.security.auth.management.AMQUserManagementMBean;
-import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
import org.apache.qpid.server.security.auth.sasl.JCAProvider;
-import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
+import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
+import org.apache.qpid.server.security.auth.AuthenticationResult;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.SaslServerFactory;
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.Sasl;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.TreeMap;
+import java.security.Security;
-/**
- * Concrete implementation of the AuthenticationManager that determines if supplied
- * user credentials match those appearing in a PrincipalDatabase. The implementation
- * of the PrincipalDatabase is determined from the configuration.
- *
- * This implementation also registers the JMX UserManagemement MBean.
- *
- * This plugin expects configuration such as:
- *
- * <pre>
- * <pd-auth-manager>
- * <principal-database>
- * <class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class>
- * <attributes>
- * <attribute>
- * <name>passwordFile</name>
- * <value>${conf}/passwd</value>
- * </attribute>
- * </attributes>
- * </principal-database>
- * </pd-auth-manager>
- * </pre>
- */
public class PrincipalDatabaseAuthenticationManager implements AuthenticationManager
{
private static final Logger _logger = Logger.getLogger(PrincipalDatabaseAuthenticationManager.class);
@@ -87,109 +49,55 @@ public class PrincipalDatabaseAuthentica
private String _mechanisms;
/** Maps from the mechanism to the callback handler to use for handling those requests */
- private final Map<String, CallbackHandler> _callbackHandlerMap = new HashMap<String, CallbackHandler>();
+ private Map<String, CallbackHandler> _callbackHandlerMap = new HashMap<String, CallbackHandler>();
/**
* Maps from the mechanism to the properties used to initialise the server. See the method Sasl.createSaslServer for
* details of the use of these properties. This map is populated during initialisation of each provider.
*/
- private final Map<String, Map<String, ?>> _serverCreationProperties = new HashMap<String, Map<String, ?>>();
-
- protected PrincipalDatabase _principalDatabase = null;
+ private Map<String, Map<String, ?>> _serverCreationProperties = new HashMap<String, Map<String, ?>>();
- protected AMQUserManagementMBean _mbean = null;
+ private AuthenticationManager _default = null;
+ /** The name for the required SASL Server mechanisms */
+ public static final String PROVIDER_NAME= "AMQSASLProvider-Server";
- public static final AuthenticationManagerPluginFactory<PrincipalDatabaseAuthenticationManager> FACTORY = new AuthenticationManagerPluginFactory<PrincipalDatabaseAuthenticationManager>()
+ public PrincipalDatabaseAuthenticationManager(String name, VirtualHostConfiguration hostConfig) throws Exception
{
- public PrincipalDatabaseAuthenticationManager newInstance(final ConfigurationPlugin config) throws ConfigurationException
- {
- final PrincipalDatabaseAuthenticationManagerConfiguration configuration = config.getConfiguration(PrincipalDatabaseAuthenticationManagerConfiguration.class.getName());
+ _logger.info("Initialising " + (name == null ? "Default" : "'" + name + "'")
+ + " PrincipalDatabase authentication manager.");
- // If there is no configuration for this plugin then don't load it.
- if (configuration == null)
- {
- _logger.info("No authentication-manager configuration found for PrincipalDatabaseAuthenticationManager");
- return null;
- }
+ // Fixme This should be done per Vhost but allowing global hack isn't right but ...
+ // required as authentication is done before Vhost selection
- final PrincipalDatabaseAuthenticationManager pdam = new PrincipalDatabaseAuthenticationManager();
- pdam.configure(configuration);
- pdam.initialise();
- return pdam;
- }
+ Map<String, Class<? extends SaslServerFactory>> providerMap = new TreeMap<String, Class<? extends SaslServerFactory>>();
- public Class<PrincipalDatabaseAuthenticationManager> getPluginClass()
- {
- return PrincipalDatabaseAuthenticationManager.class;
- }
- public String getPluginName()
+ if (name == null || hostConfig == null)
{
- return PrincipalDatabaseAuthenticationManager.class.getName();
+ initialiseAuthenticationMechanisms(providerMap, ApplicationRegistry.getInstance().getDatabaseManager().getDatabases());
}
- };
-
- public static class PrincipalDatabaseAuthenticationManagerConfiguration extends ConfigurationPlugin {
-
- public static final ConfigurationPluginFactory FACTORY = new ConfigurationPluginFactory()
+ else
{
- public List<String> getParentPaths()
- {
- return Arrays.asList("security.pd-auth-manager");
- }
+ String databaseName = hostConfig.getAuthenticationDatabase();
- public ConfigurationPlugin newInstance(final String path, final Configuration config) throws ConfigurationException
+ if (databaseName == null)
{
- final ConfigurationPlugin instance = new PrincipalDatabaseAuthenticationManagerConfiguration();
-
- instance.setConfiguration(path, config);
- return instance;
- }
- };
-
- public String[] getElementsProcessed()
- {
- return new String[] {"principal-database.class",
- "principal-database.attributes.attribute.name",
- "principal-database.attributes.attribute.value"};
- }
-
- public void validateConfiguration() throws ConfigurationException
- {
- }
-
- public String getPrincipalDatabaseClass()
- {
- return _configuration.getString("principal-database.class");
- }
-
- public Map<String,String> getPdClassAttributeMap() throws ConfigurationException
- {
- final List<String> argumentNames = _configuration.getList("principal-database.attributes.attribute.name");
- final List<String> argumentValues = _configuration.getList("principal-database.attributes.attribute.value");
- final Map<String,String> attributes = new HashMap<String,String>(argumentNames.size());
- for (int i = 0; i < argumentNames.size(); i++)
+ _default = ApplicationRegistry.getInstance().getAuthenticationManager();
+ return;
+ }
+ else
{
- final String argName = argumentNames.get(i);
- final String argValue = argumentValues.get(i);
+ PrincipalDatabase database = ApplicationRegistry.getInstance().getDatabaseManager().getDatabases().get(databaseName);
- attributes.put(argName, argValue);
- }
+ if (database == null)
+ {
+ throw new ConfigurationException("Requested database:" + databaseName + " was not found");
+ }
- return Collections.unmodifiableMap(attributes);
+ initialiseAuthenticationMechanisms(providerMap, database);
+ }
}
- }
-
- protected PrincipalDatabaseAuthenticationManager()
- {
- }
-
- public void initialise()
- {
- final Map<String, Class<? extends SaslServerFactory>> providerMap = new TreeMap<String, Class<? extends SaslServerFactory>>();
-
- initialiseAuthenticationMechanisms(providerMap, _principalDatabase);
if (providerMap.size() > 0)
{
@@ -202,16 +110,33 @@ public class PrincipalDatabaseAuthentica
{
_logger.info("Additional SASL providers successfully registered.");
}
+
}
else
{
_logger.warn("No additional SASL providers registered.");
}
- registerManagement();
}
- private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, PrincipalDatabase database)
+
+ private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, Map<String, PrincipalDatabase> databases) throws Exception
+ {
+ if (databases.size() > 1)
+ {
+ _logger.warn("More than one principle database provided currently authentication mechanism will override each other.");
+ }
+
+ for (Map.Entry<String, PrincipalDatabase> entry : databases.entrySet())
+ {
+ // fixme As the database now provide the mechanisms they support, they will ...
+ // overwrite each other in the map. There should only be one database per vhost.
+ // But currently we must have authentication before vhost definition.
+ initialiseAuthenticationMechanisms(providerMap, entry.getValue());
+ }
+ }
+
+ private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, PrincipalDatabase database) throws Exception
{
if (database == null || database.getMechanisms().size() == 0)
{
@@ -227,6 +152,7 @@ public class PrincipalDatabaseAuthentica
private void initialiseAuthenticationMechanism(String mechanism, AuthenticationProviderInitialiser initialiser,
Map<String, Class<? extends SaslServerFactory>> providerMap)
+ throws Exception
{
if (_mechanisms == null)
{
@@ -247,217 +173,70 @@ public class PrincipalDatabaseAuthentica
_logger.info("Initialised " + mechanism + " SASL provider successfully");
}
- /**
- * @see org.apache.qpid.server.plugins.Plugin#configure(org.apache.qpid.server.configuration.plugins.ConfigurationPlugin)
- */
- public void configure(final ConfigurationPlugin config) throws ConfigurationException
- {
- final PrincipalDatabaseAuthenticationManagerConfiguration pdamConfig = (PrincipalDatabaseAuthenticationManagerConfiguration) config;
- final String pdClazz = pdamConfig.getPrincipalDatabaseClass();
-
- _logger.info("PrincipalDatabase concrete implementation : " + pdClazz);
-
- _principalDatabase = createPrincipalDatabaseImpl(pdClazz);
-
- configPrincipalDatabase(_principalDatabase, pdamConfig);
- }
-
public String getMechanisms()
{
- return _mechanisms;
- }
-
- public SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException
- {
- return Sasl.createSaslServer(mechanism, "AMQP", localFQDN, _serverCreationProperties.get(mechanism),
- _callbackHandlerMap.get(mechanism));
- }
-
- /**
- * @see org.apache.qpid.server.security.auth.manager.AuthenticationManager#authenticate(SaslServer, byte[])
- */
- public AuthenticationResult authenticate(SaslServer server, byte[] response)
- {
- try
+ if (_default != null)
{
- // Process response from the client
- byte[] challenge = server.evaluateResponse(response != null ? response : new byte[0]);
-
- if (server.isComplete())
- {
- final Subject subject = new Subject();
- subject.getPrincipals().add(new UsernamePrincipal(server.getAuthorizationID()));
- return new AuthenticationResult(subject);
- }
- else
- {
- return new AuthenticationResult(challenge, AuthenticationResult.AuthenticationStatus.CONTINUE);
- }
+ // Use the default AuthenticationManager if present
+ return _default.getMechanisms();
}
- catch (SaslException e)
+ else
{
- return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR, e);
+ return _mechanisms;
}
}
- /**
- * @see org.apache.qpid.server.security.auth.manager.AuthenticationManager#authenticate(String, String)
- */
- public AuthenticationResult authenticate(final String username, final String password)
+ public SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException
{
- try
+ if (_default != null)
{
- if (_principalDatabase.verifyPassword(username, password.toCharArray()))
- {
- final Subject subject = new Subject();
- subject.getPrincipals().add(new UsernamePrincipal(username));
- return new AuthenticationResult(subject);
- }
- else
- {
- return new AuthenticationResult(AuthenticationStatus.CONTINUE);
- }
+ // Use the default AuthenticationManager if present
+ return _default.createSaslServer(mechanism, localFQDN);
}
- catch (AccountNotFoundException e)
+ else
{
- return new AuthenticationResult(AuthenticationStatus.CONTINUE);
+ return Sasl.createSaslServer(mechanism, "AMQP", localFQDN, _serverCreationProperties.get(mechanism),
+ _callbackHandlerMap.get(mechanism));
}
- }
-
- public void close()
- {
- _mechanisms = null;
- Security.removeProvider(PROVIDER_NAME);
- unregisterManagement();
}
- private PrincipalDatabase createPrincipalDatabaseImpl(final String pdClazz) throws ConfigurationException
+ public AuthenticationResult authenticate(SaslServer server, byte[] response)
{
- try
- {
- return (PrincipalDatabase) Class.forName(pdClazz).newInstance();
- }
- catch (InstantiationException ie)
- {
- throw new ConfigurationException("Cannot instantiate " + pdClazz, ie);
- }
- catch (IllegalAccessException iae)
- {
- throw new ConfigurationException("Cannot access " + pdClazz, iae);
- }
- catch (ClassNotFoundException cnfe)
+ // Use the default AuthenticationManager if present
+ if (_default != null)
{
- throw new ConfigurationException("Cannot load " + pdClazz + " implementation", cnfe);
+ return _default.authenticate(server, response);
}
- catch (ClassCastException cce)
- {
- throw new ConfigurationException("Expecting a " + PrincipalDatabase.class + " implementation", cce);
- }
- }
- private void configPrincipalDatabase(final PrincipalDatabase principalDatabase, final PrincipalDatabaseAuthenticationManagerConfiguration config)
- throws ConfigurationException
- {
-
- final Map<String,String> attributes = config.getPdClassAttributeMap();
- for (Iterator<Entry<String, String>> iterator = attributes.entrySet().iterator(); iterator.hasNext();)
+ try
{
- final Entry<String, String> nameValuePair = iterator.next();
- final String methodName = generateSetterName(nameValuePair.getKey());
- final Method method;
- try
- {
- method = principalDatabase.getClass().getMethod(methodName, String.class);
- }
- catch (Exception e)
- {
- throw new ConfigurationException("No method " + methodName + " found in class "
- + principalDatabase.getClass()
- + " hence unable to configure principal database. The method must be public and "
- + "have a single String argument with a void return type", e);
- }
- try
- {
- method.invoke(principalDatabase, PropertyUtils.replaceProperties(nameValuePair.getValue()));
- }
- catch (IllegalArgumentException e)
- {
- throw new ConfigurationException(e.getMessage(), e);
- }
- catch (PropertyException e)
- {
- throw new ConfigurationException(e.getMessage(), e);
- }
- catch (IllegalAccessException e)
+ // Process response from the client
+ byte[] challenge = server.evaluateResponse(response != null ? response : new byte[0]);
+
+ if (server.isComplete())
{
- throw new ConfigurationException(e.getMessage(), e);
+ return new AuthenticationResult(challenge, AuthenticationResult.AuthenticationStatus.SUCCESS);
}
- catch (InvocationTargetException e)
+ else
{
- // QPID-1347.. InvocationTargetException wraps the checked exception thrown from the reflective
- // method call. Pull out the underlying message and cause to make these more apparent to the user.
- throw new ConfigurationException(e.getCause().getMessage(), e.getCause());
+ return new AuthenticationResult(challenge, AuthenticationResult.AuthenticationStatus.CONTINUE);
}
}
- }
-
- private String generateSetterName(String argName) throws ConfigurationException
- {
- if ((argName == null) || (argName.length() == 0))
- {
- throw new ConfigurationException("Argument names must have length >= 1 character");
- }
-
- if (Character.isLowerCase(argName.charAt(0)))
+ catch (SaslException e)
{
- argName = Character.toUpperCase(argName.charAt(0)) + argName.substring(1);
+ return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR, e);
}
-
- final String methodName = "set" + argName;
- return methodName;
- }
-
- protected void setPrincipalDatabase(final PrincipalDatabase principalDatabase)
- {
- _principalDatabase = principalDatabase;
}
- protected void registerManagement()
+ public void close()
{
- try
- {
- _logger.info("Registering UserManagementMBean");
-
- _mbean = new AMQUserManagementMBean();
- _mbean.setPrincipalDatabase(_principalDatabase);
- _mbean.register();
- }
- catch (Exception e)
- {
- _logger.warn("User management disabled as unable to create MBean:", e);
- _mbean = null;
- }
+ Security.removeProvider(PROVIDER_NAME);
}
- protected void unregisterManagement()
+ public CallbackHandler getHandler(String mechanism)
{
- try
- {
- if (_mbean != null)
- {
- _logger.info("Unregistering UserManagementMBean");
- _mbean.unregister();
- }
- }
- catch (Exception e)
- {
- _logger.warn("Failed to unregister User management MBean:", e);
- }
- finally
- {
- _mbean = null;
- }
+ return _callbackHandlerMap.get(mechanism);
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java Sun Aug 14 17:14:51 2011
@@ -45,10 +45,9 @@ public class AmqPlainSaslServerFactory i
public String[] getMechanismNames(Map props)
{
- if (props != null &&
- (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
- props.containsKey(Sasl.POLICY_NODICTIONARY) ||
- props.containsKey(Sasl.POLICY_NOACTIVE)))
+ if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+ props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+ props.containsKey(Sasl.POLICY_NOACTIVE)))
{
// returned array must be non null according to interface documentation
return new String[0];
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org