You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC
svn commit: r1187150 [24/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java Fri Oct 21 01:19:00 2011
@@ -20,65 +20,27 @@
*/
package org.apache.qpid.server.security.auth.manager;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.security.Security;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
-import javax.security.auth.Subject;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.login.AccountNotFoundException;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-import javax.security.sasl.SaslServerFactory;
-
+import org.apache.log4j.Logger;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.log4j.Logger;
-import org.apache.qpid.configuration.PropertyException;
-import org.apache.qpid.configuration.PropertyUtils;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
-import org.apache.qpid.server.security.auth.AuthenticationResult;
-import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
-import org.apache.qpid.server.security.auth.management.AMQUserManagementMBean;
-import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
import org.apache.qpid.server.security.auth.sasl.JCAProvider;
-import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
+import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
+import org.apache.qpid.server.security.auth.AuthenticationResult;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.SaslServerFactory;
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.Sasl;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.TreeMap;
+import java.security.Security;
-/**
- * Concrete implementation of the AuthenticationManager that determines if supplied
- * user credentials match those appearing in a PrincipalDatabase. The implementation
- * of the PrincipalDatabase is determined from the configuration.
- *
- * This implementation also registers the JMX UserManagemement MBean.
- *
- * This plugin expects configuration such as:
- *
- * <pre>
- * <pd-auth-manager>
- * <principal-database>
- * <class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class>
- * <attributes>
- * <attribute>
- * <name>passwordFile</name>
- * <value>${conf}/passwd</value>
- * </attribute>
- * </attributes>
- * </principal-database>
- * </pd-auth-manager>
- * </pre>
- */
public class PrincipalDatabaseAuthenticationManager implements AuthenticationManager
{
private static final Logger _logger = Logger.getLogger(PrincipalDatabaseAuthenticationManager.class);
@@ -87,109 +49,55 @@ public class PrincipalDatabaseAuthentica
private String _mechanisms;
/** Maps from the mechanism to the callback handler to use for handling those requests */
- private final Map<String, CallbackHandler> _callbackHandlerMap = new HashMap<String, CallbackHandler>();
+ private Map<String, CallbackHandler> _callbackHandlerMap = new HashMap<String, CallbackHandler>();
/**
* Maps from the mechanism to the properties used to initialise the server. See the method Sasl.createSaslServer for
* details of the use of these properties. This map is populated during initialisation of each provider.
*/
- private final Map<String, Map<String, ?>> _serverCreationProperties = new HashMap<String, Map<String, ?>>();
-
- protected PrincipalDatabase _principalDatabase = null;
+ private Map<String, Map<String, ?>> _serverCreationProperties = new HashMap<String, Map<String, ?>>();
- protected AMQUserManagementMBean _mbean = null;
+ private AuthenticationManager _default = null;
+ /** The name for the required SASL Server mechanisms */
+ public static final String PROVIDER_NAME= "AMQSASLProvider-Server";
- public static final AuthenticationManagerPluginFactory<PrincipalDatabaseAuthenticationManager> FACTORY = new AuthenticationManagerPluginFactory<PrincipalDatabaseAuthenticationManager>()
+ public PrincipalDatabaseAuthenticationManager(String name, VirtualHostConfiguration hostConfig) throws Exception
{
- public PrincipalDatabaseAuthenticationManager newInstance(final ConfigurationPlugin config) throws ConfigurationException
- {
- final PrincipalDatabaseAuthenticationManagerConfiguration configuration = config.getConfiguration(PrincipalDatabaseAuthenticationManagerConfiguration.class.getName());
+ _logger.info("Initialising " + (name == null ? "Default" : "'" + name + "'")
+ + " PrincipalDatabase authentication manager.");
- // If there is no configuration for this plugin then don't load it.
- if (configuration == null)
- {
- _logger.info("No authentication-manager configuration found for PrincipalDatabaseAuthenticationManager");
- return null;
- }
+ // Fixme This should be done per Vhost but allowing global hack isn't right but ...
+ // required as authentication is done before Vhost selection
- final PrincipalDatabaseAuthenticationManager pdam = new PrincipalDatabaseAuthenticationManager();
- pdam.configure(configuration);
- pdam.initialise();
- return pdam;
- }
+ Map<String, Class<? extends SaslServerFactory>> providerMap = new TreeMap<String, Class<? extends SaslServerFactory>>();
- public Class<PrincipalDatabaseAuthenticationManager> getPluginClass()
- {
- return PrincipalDatabaseAuthenticationManager.class;
- }
- public String getPluginName()
+ if (name == null || hostConfig == null)
{
- return PrincipalDatabaseAuthenticationManager.class.getName();
+ initialiseAuthenticationMechanisms(providerMap, ApplicationRegistry.getInstance().getDatabaseManager().getDatabases());
}
- };
-
- public static class PrincipalDatabaseAuthenticationManagerConfiguration extends ConfigurationPlugin {
-
- public static final ConfigurationPluginFactory FACTORY = new ConfigurationPluginFactory()
+ else
{
- public List<String> getParentPaths()
- {
- return Arrays.asList("security.pd-auth-manager");
- }
+ String databaseName = hostConfig.getAuthenticationDatabase();
- public ConfigurationPlugin newInstance(final String path, final Configuration config) throws ConfigurationException
+ if (databaseName == null)
{
- final ConfigurationPlugin instance = new PrincipalDatabaseAuthenticationManagerConfiguration();
-
- instance.setConfiguration(path, config);
- return instance;
- }
- };
- public String[] getElementsProcessed()
- {
- return new String[] {"principal-database.class",
- "principal-database.attributes.attribute.name",
- "principal-database.attributes.attribute.value"};
- }
-
- public void validateConfiguration() throws ConfigurationException
- {
- }
-
- public String getPrincipalDatabaseClass()
- {
- return _configuration.getString("principal-database.class");
- }
-
- public Map<String,String> getPdClassAttributeMap() throws ConfigurationException
- {
- final List<String> argumentNames = _configuration.getList("principal-database.attributes.attribute.name");
- final List<String> argumentValues = _configuration.getList("principal-database.attributes.attribute.value");
- final Map<String,String> attributes = new HashMap<String,String>(argumentNames.size());
-
- for (int i = 0; i < argumentNames.size(); i++)
+ _default = ApplicationRegistry.getInstance().getAuthenticationManager();
+ return;
+ }
+ else
{
- final String argName = argumentNames.get(i);
- final String argValue = argumentValues.get(i);
+ PrincipalDatabase database = ApplicationRegistry.getInstance().getDatabaseManager().getDatabases().get(databaseName);
- attributes.put(argName, argValue);
- }
+ if (database == null)
+ {
+ throw new ConfigurationException("Requested database:" + databaseName + " was not found");
+ }
- return Collections.unmodifiableMap(attributes);
+ initialiseAuthenticationMechanisms(providerMap, database);
+ }
}
- }
-
- protected PrincipalDatabaseAuthenticationManager()
- {
- }
-
- public void initialise()
- {
- final Map<String, Class<? extends SaslServerFactory>> providerMap = new TreeMap<String, Class<? extends SaslServerFactory>>();
-
- initialiseAuthenticationMechanisms(providerMap, _principalDatabase);
if (providerMap.size() > 0)
{
@@ -202,16 +110,33 @@ public class PrincipalDatabaseAuthentica
{
_logger.info("Additional SASL providers successfully registered.");
}
+
}
else
{
_logger.warn("No additional SASL providers registered.");
}
- registerManagement();
}
- private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, PrincipalDatabase database)
+
+ private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, Map<String, PrincipalDatabase> databases) throws Exception
+ {
+ if (databases.size() > 1)
+ {
+ _logger.warn("More than one principle database provided currently authentication mechanism will override each other.");
+ }
+
+ for (Map.Entry<String, PrincipalDatabase> entry : databases.entrySet())
+ {
+ // fixme As the database now provide the mechanisms they support, they will ...
+ // overwrite each other in the map. There should only be one database per vhost.
+ // But currently we must have authentication before vhost definition.
+ initialiseAuthenticationMechanisms(providerMap, entry.getValue());
+ }
+ }
+
+ private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, PrincipalDatabase database) throws Exception
{
if (database == null || database.getMechanisms().size() == 0)
{
@@ -227,6 +152,7 @@ public class PrincipalDatabaseAuthentica
private void initialiseAuthenticationMechanism(String mechanism, AuthenticationProviderInitialiser initialiser,
Map<String, Class<? extends SaslServerFactory>> providerMap)
+ throws Exception
{
if (_mechanisms == null)
{
@@ -247,217 +173,65 @@ public class PrincipalDatabaseAuthentica
_logger.info("Initialised " + mechanism + " SASL provider successfully");
}
- /**
- * @see org.apache.qpid.server.plugins.Plugin#configure(org.apache.qpid.server.configuration.plugins.ConfigurationPlugin)
- */
- public void configure(final ConfigurationPlugin config) throws ConfigurationException
- {
- final PrincipalDatabaseAuthenticationManagerConfiguration pdamConfig = (PrincipalDatabaseAuthenticationManagerConfiguration) config;
- final String pdClazz = pdamConfig.getPrincipalDatabaseClass();
-
- _logger.info("PrincipalDatabase concrete implementation : " + pdClazz);
-
- _principalDatabase = createPrincipalDatabaseImpl(pdClazz);
-
- configPrincipalDatabase(_principalDatabase, pdamConfig);
- }
-
public String getMechanisms()
{
- return _mechanisms;
- }
-
- public SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException
- {
- return Sasl.createSaslServer(mechanism, "AMQP", localFQDN, _serverCreationProperties.get(mechanism),
- _callbackHandlerMap.get(mechanism));
- }
-
- /**
- * @see org.apache.qpid.server.security.auth.manager.AuthenticationManager#authenticate(SaslServer, byte[])
- */
- public AuthenticationResult authenticate(SaslServer server, byte[] response)
- {
- try
+ if (_default != null)
{
- // Process response from the client
- byte[] challenge = server.evaluateResponse(response != null ? response : new byte[0]);
-
- if (server.isComplete())
- {
- final Subject subject = new Subject();
- subject.getPrincipals().add(new UsernamePrincipal(server.getAuthorizationID()));
- return new AuthenticationResult(subject);
- }
- else
- {
- return new AuthenticationResult(challenge, AuthenticationResult.AuthenticationStatus.CONTINUE);
- }
+ // Use the default AuthenticationManager if present
+ return _default.getMechanisms();
}
- catch (SaslException e)
+ else
{
- return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR, e);
+ return _mechanisms;
}
}
- /**
- * @see org.apache.qpid.server.security.auth.manager.AuthenticationManager#authenticate(String, String)
- */
- public AuthenticationResult authenticate(final String username, final String password)
+ public SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException
{
- try
+ if (_default != null)
{
- if (_principalDatabase.verifyPassword(username, password.toCharArray()))
- {
- final Subject subject = new Subject();
- subject.getPrincipals().add(new UsernamePrincipal(username));
- return new AuthenticationResult(subject);
- }
- else
- {
- return new AuthenticationResult(AuthenticationStatus.CONTINUE);
- }
+ // Use the default AuthenticationManager if present
+ return _default.createSaslServer(mechanism, localFQDN);
}
- catch (AccountNotFoundException e)
+ else
{
- return new AuthenticationResult(AuthenticationStatus.CONTINUE);
+ return Sasl.createSaslServer(mechanism, "AMQP", localFQDN, _serverCreationProperties.get(mechanism),
+ _callbackHandlerMap.get(mechanism));
}
- }
- public void close()
- {
- _mechanisms = null;
- Security.removeProvider(PROVIDER_NAME);
-
- unregisterManagement();
}
- private PrincipalDatabase createPrincipalDatabaseImpl(final String pdClazz) throws ConfigurationException
+ public AuthenticationResult authenticate(SaslServer server, byte[] response)
{
- try
- {
- return (PrincipalDatabase) Class.forName(pdClazz).newInstance();
- }
- catch (InstantiationException ie)
- {
- throw new ConfigurationException("Cannot instantiate " + pdClazz, ie);
- }
- catch (IllegalAccessException iae)
- {
- throw new ConfigurationException("Cannot access " + pdClazz, iae);
- }
- catch (ClassNotFoundException cnfe)
- {
- throw new ConfigurationException("Cannot load " + pdClazz + " implementation", cnfe);
- }
- catch (ClassCastException cce)
+ // Use the default AuthenticationManager if present
+ if (_default != null)
{
- throw new ConfigurationException("Expecting a " + PrincipalDatabase.class + " implementation", cce);
+ return _default.authenticate(server, response);
}
- }
-
- private void configPrincipalDatabase(final PrincipalDatabase principalDatabase, final PrincipalDatabaseAuthenticationManagerConfiguration config)
- throws ConfigurationException
- {
- final Map<String,String> attributes = config.getPdClassAttributeMap();
- for (Iterator<Entry<String, String>> iterator = attributes.entrySet().iterator(); iterator.hasNext();)
+ try
{
- final Entry<String, String> nameValuePair = iterator.next();
- final String methodName = generateSetterName(nameValuePair.getKey());
- final Method method;
- try
- {
- method = principalDatabase.getClass().getMethod(methodName, String.class);
- }
- catch (Exception e)
- {
- throw new ConfigurationException("No method " + methodName + " found in class "
- + principalDatabase.getClass()
- + " hence unable to configure principal database. The method must be public and "
- + "have a single String argument with a void return type", e);
- }
- try
- {
- method.invoke(principalDatabase, PropertyUtils.replaceProperties(nameValuePair.getValue()));
- }
- catch (IllegalArgumentException e)
- {
- throw new ConfigurationException(e.getMessage(), e);
- }
- catch (PropertyException e)
- {
- throw new ConfigurationException(e.getMessage(), e);
- }
- catch (IllegalAccessException e)
+ // Process response from the client
+ byte[] challenge = server.evaluateResponse(response != null ? response : new byte[0]);
+
+ if (server.isComplete())
{
- throw new ConfigurationException(e.getMessage(), e);
+ return new AuthenticationResult(challenge, AuthenticationResult.AuthenticationStatus.SUCCESS);
}
- catch (InvocationTargetException e)
+ else
{
- // QPID-1347.. InvocationTargetException wraps the checked exception thrown from the reflective
- // method call. Pull out the underlying message and cause to make these more apparent to the user.
- throw new ConfigurationException(e.getCause().getMessage(), e.getCause());
+ return new AuthenticationResult(challenge, AuthenticationResult.AuthenticationStatus.CONTINUE);
}
}
- }
-
- private String generateSetterName(String argName) throws ConfigurationException
- {
- if ((argName == null) || (argName.length() == 0))
- {
- throw new ConfigurationException("Argument names must have length >= 1 character");
- }
-
- if (Character.isLowerCase(argName.charAt(0)))
- {
- argName = Character.toUpperCase(argName.charAt(0)) + argName.substring(1);
- }
-
- final String methodName = "set" + argName;
- return methodName;
- }
-
- protected void setPrincipalDatabase(final PrincipalDatabase principalDatabase)
- {
- _principalDatabase = principalDatabase;
- }
-
- protected void registerManagement()
- {
- try
- {
- _logger.info("Registering UserManagementMBean");
-
- _mbean = new AMQUserManagementMBean();
- _mbean.setPrincipalDatabase(_principalDatabase);
- _mbean.register();
- }
- catch (Exception e)
+ catch (SaslException e)
{
- _logger.warn("User management disabled as unable to create MBean:", e);
- _mbean = null;
+ return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR, e);
}
}
- protected void unregisterManagement()
+ public void close()
{
- try
- {
- if (_mbean != null)
- {
- _logger.info("Unregistering UserManagementMBean");
- _mbean.unregister();
- }
- }
- catch (Exception e)
- {
- _logger.warn("Failed to unregister User management MBean:", e);
- }
- finally
- {
- _mbean = null;
- }
+ Security.removeProvider(PROVIDER_NAME);
}
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java Fri Oct 21 01:19:00 2011
@@ -20,13 +20,14 @@
*/
package org.apache.qpid.server.security.auth.rmi;
+import java.util.Collections;
+
import javax.management.remote.JMXAuthenticator;
import javax.management.remote.JMXPrincipal;
import javax.security.auth.Subject;
+import javax.security.auth.login.AccountNotFoundException;
-import org.apache.qpid.server.security.auth.AuthenticationResult;
-import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
public class RMIPasswordAuthenticator implements JMXAuthenticator
{
@@ -38,15 +39,15 @@ public class RMIPasswordAuthenticator im
static final String CREDENTIALS_REQUIRED = "User details are required. " +
"Please ensure you are using an up to date management console to connect.";
- private AuthenticationManager _authenticationManager = null;
+ private PrincipalDatabase _db = null;
public RMIPasswordAuthenticator()
{
}
-
- public void setAuthenticationManager(final AuthenticationManager authenticationManager)
+
+ public void setPrincipalDatabase(PrincipalDatabase pd)
{
- _authenticationManager = authenticationManager;
+ this._db = pd;
}
public Subject authenticate(Object credentials) throws SecurityException
@@ -64,39 +65,50 @@ public class RMIPasswordAuthenticator im
}
}
- // Verify that required number of credentials.
+ // Verify that required number of credential's.
final String[] userCredentials = (String[]) credentials;
if (userCredentials.length != 2)
{
throw new SecurityException(SHOULD_HAVE_2_ELEMENTS);
}
- final String username = (String) userCredentials[0];
- final String password = (String) userCredentials[1];
+ String username = (String) userCredentials[0];
+ String password = (String) userCredentials[1];
- // Verify that all required credentials are actually present.
+ // Verify that all required credential's are actually present.
if (username == null || password == null)
{
throw new SecurityException(SHOULD_BE_NON_NULL);
}
- // Verify that an AuthenticationManager has been set.
- if (_authenticationManager == null)
+ // Verify that a PD has been set.
+ if (_db == null)
{
throw new SecurityException(UNABLE_TO_LOOKUP);
}
- final AuthenticationResult result = _authenticationManager.authenticate(username, password);
+
+ boolean authenticated = false;
- if (AuthenticationStatus.ERROR.equals(result.getStatus()))
+ // Perform authentication
+ try
{
- throw new SecurityException("Authentication manager failed", result.getCause());
+ if (_db.verifyPassword(username, password.toCharArray()))
+ {
+ authenticated = true;
+ }
+ }
+ catch (AccountNotFoundException e)
+ {
+ throw new SecurityException(INVALID_CREDENTIALS); // XXX
}
- else if (AuthenticationStatus.SUCCESS.equals(result.getStatus()))
+
+ if (authenticated)
{
- final Subject subject = result.getSubject();
- subject.getPrincipals().add(new JMXPrincipal(username));
- subject.setReadOnly();
- return subject;
+ //credential's check out, return the appropriate JAAS Subject
+ return new Subject(true,
+ Collections.singleton(new JMXPrincipal(username)),
+ Collections.EMPTY_SET,
+ Collections.EMPTY_SET);
}
else
{
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/AuthenticationProviderInitialiser.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/AuthenticationProviderInitialiser.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/AuthenticationProviderInitialiser.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/AuthenticationProviderInitialiser.java Fri Oct 21 01:19:00 2011
@@ -25,6 +25,9 @@ import java.util.Map;
import javax.security.auth.callback.CallbackHandler;
import javax.security.sasl.SaslServerFactory;
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
+
public interface AuthenticationProviderInitialiser
{
/**
@@ -34,6 +37,24 @@ public interface AuthenticationProviderI
String getMechanismName();
/**
+ * Initialise the authentication provider.
+ * @param baseConfigPath the path in the config file that points to any config options for this provider. Each
+ * provider can have its own set of configuration options
+ * @param configuration the Apache Commons Configuration instance used to configure this provider
+ * @param principalDatabases the set of principal databases that are available
+ * @throws Exception needs refined Exception is too broad.
+ */
+ void initialise(String baseConfigPath, Configuration configuration,
+ Map<String, PrincipalDatabase> principalDatabases) throws Exception;
+
+ /**
+ * Initialise the authentication provider.
+ * @param db The principal database to initialise with
+ */
+ void initialise(PrincipalDatabase db);
+
+
+ /**
* @return the callback handler that should be used to process authentication requests for this mechanism. This will
* be called after initialise and will be stored by the authentication manager. The callback handler <b>must</b> be
* fully threadsafe.
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java Fri Oct 21 01:19:00 2011
@@ -21,11 +21,12 @@
package org.apache.qpid.server.security.auth.sasl;
import java.security.Provider;
+import java.security.Security;
import java.util.Map;
import javax.security.sasl.SaslServerFactory;
-public class JCAProvider extends Provider
+public final class JCAProvider extends Provider
{
public JCAProvider(String name, Map<String, Class<? extends SaslServerFactory>> providerMap)
{
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePrincipal.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePrincipal.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePrincipal.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePrincipal.java Fri Oct 21 01:19:00 2011
@@ -21,21 +21,14 @@
package org.apache.qpid.server.security.auth.sasl;
import java.security.Principal;
-import java.util.Set;
-
-import javax.security.auth.Subject;
/** A principal that is just a wrapper for a simple username. */
public class UsernamePrincipal implements Principal
{
- private final String _name;
+ private String _name;
public UsernamePrincipal(String name)
{
- if (name == null)
- {
- throw new IllegalArgumentException("name cannot be null");
- }
_name = name;
}
@@ -48,53 +41,4 @@ public class UsernamePrincipal implement
{
return _name;
}
-
- /**
- * @see java.lang.Object#hashCode()
- */
- @Override
- public int hashCode()
- {
- final int prime = 31;
- return prime * _name.hashCode();
- }
-
- /**
- * @see java.lang.Object#equals(java.lang.Object)
- */
- @Override
- public boolean equals(Object obj)
- {
- if (this == obj)
- {
- return true;
- }
- else
- {
- if (obj instanceof UsernamePrincipal)
- {
- UsernamePrincipal other = (UsernamePrincipal) obj;
- return _name.equals(other._name);
- }
- else
- {
- return false;
- }
- }
- }
-
- public static UsernamePrincipal getUsernamePrincipalFromSubject(final Subject authSubject)
- {
- if (authSubject == null)
- {
- throw new IllegalArgumentException("No authenticated subject.");
- }
-
- final Set<UsernamePrincipal> principals = authSubject.getPrincipals(UsernamePrincipal.class);
- if (principals.size() != 1)
- {
- throw new IllegalArgumentException("Can't find single UsernamePrincipal in authenticated subject");
- }
- return principals.iterator().next();
- }
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java Fri Oct 21 01:19:00 2011
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.security.auth.sasl.amqplain;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
import java.io.IOException;
import javax.security.auth.callback.Callback;
@@ -33,6 +31,7 @@ import javax.security.sasl.AuthorizeCall
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
+import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
@@ -61,7 +60,7 @@ public class AmqPlainSaslServer implemen
{
try
{
- final FieldTable ft = FieldTableFactory.newFieldTable(new DataInputStream(new ByteArrayInputStream(response)), response.length);
+ final FieldTable ft = FieldTableFactory.newFieldTable(ByteBuffer.wrap(response), response.length);
String username = (String) ft.getString("LOGIN");
// we do not care about the prompt but it throws if null
NameCallback nameCb = new NameCallback("prompt", username);
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java Fri Oct 21 01:19:00 2011
@@ -45,10 +45,9 @@ public class AmqPlainSaslServerFactory i
public String[] getMechanismNames(Map props)
{
- if (props != null &&
- (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
- props.containsKey(Sasl.POLICY_NODICTIONARY) ||
- props.containsKey(Sasl.POLICY_NOACTIVE)))
+ if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+ props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+ props.containsKey(Sasl.POLICY_NOACTIVE))
{
// returned array must be non null according to interface documentation
return new String[0];
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java Fri Oct 21 01:19:00 2011
@@ -20,9 +20,21 @@
*/
package org.apache.qpid.server.security.auth.sasl.anonymous;
+import java.io.IOException;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
public class AnonymousSaslServer implements SaslServer
{
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java Fri Oct 21 01:19:00 2011
@@ -47,11 +47,10 @@ public class AnonymousSaslServerFactory
public String[] getMechanismNames(Map props)
{
- if (props != null &&
- (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
- props.containsKey(Sasl.POLICY_NODICTIONARY) ||
- props.containsKey(Sasl.POLICY_NOACTIVE) ||
- props.containsKey(Sasl.POLICY_NOANONYMOUS)))
+ if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+ props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+ props.containsKey(Sasl.POLICY_NOACTIVE) ||
+ props.containsKey(Sasl.POLICY_NOANONYMOUS))
{
// returned array must be non null according to interface documentation
return new String[0];
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HexInitialiser.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HexInitialiser.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HexInitialiser.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HexInitialiser.java Fri Oct 21 01:19:00 2011
@@ -70,7 +70,7 @@ public class CRAMMD5HexInitialiser exten
for (char c : password)
{
//toHexString does not prepend 0 so we have to
- if (((byte) c > -1) && (byte) c < 0x10 )
+ if (((byte) c > -1) && (byte) c < 10)
{
sb.append(0);
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java Fri Oct 21 01:19:00 2011
@@ -45,10 +45,9 @@ public class PlainSaslServerFactory impl
public String[] getMechanismNames(Map props)
{
- if (props != null &&
- (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
- props.containsKey(Sasl.POLICY_NODICTIONARY) ||
- props.containsKey(Sasl.POLICY_NOACTIVE)))
+ if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+ props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+ props.containsKey(Sasl.POLICY_NOACTIVE))
{
// returned array must be non null according to interface documentation
return new String[0];
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Fri Oct 21 01:19:00 2011
@@ -259,7 +259,7 @@ public class AMQStateManager implements
public AMQProtocolSession getProtocolSession()
{
- SecurityManager.setThreadSubject(_protocolSession.getAuthorizedSubject());
+ SecurityManager.setThreadPrincipal(_protocolSession.getPrincipal());
return _protocolSession;
}
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Fri Oct 21 01:19:00 2011
@@ -21,7 +21,6 @@
package org.apache.qpid.server.store;
import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.lang.ref.SoftReference;
@@ -480,15 +479,9 @@ public class DerbyMessageStore implement
FieldTable arguments;
if(dataAsBytes.length > 0)
{
+ org.apache.mina.common.ByteBuffer buffer = org.apache.mina.common.ByteBuffer.wrap(dataAsBytes);
- try
- {
- arguments = new FieldTable(new DataInputStream(new ByteArrayInputStream(dataAsBytes)),dataAsBytes.length);
- }
- catch (IOException e)
- {
- throw new RuntimeException("IO Exception should not be thrown",e);
- }
+ arguments = new FieldTable(buffer,buffer.limit());
}
else
{
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java Fri Oct 21 01:19:00 2011
@@ -20,21 +20,13 @@
*/
package org.apache.qpid.server.subscription;
-import java.util.Map;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.flow.FlowCreditManager_0_10;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.transport.ServerSession;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageFlowMode;
/**
* Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This factory
@@ -64,23 +56,4 @@ public interface SubscriptionFactory
RecordDeliveryMethod recordMethod
)
throws AMQException;
-
-
- SubscriptionImpl.GetNoAckSubscription createBasicGetNoAckSubscription(AMQChannel channel,
- AMQProtocolSession session,
- AMQShortString consumerTag,
- FieldTable filters,
- boolean noLocal,
- FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod) throws AMQException;
-
- Subscription_0_10 createSubscription(final ServerSession session,
- final String destination,
- final MessageAcceptMode acceptMode,
- final MessageAcquireMode acquireMode,
- final MessageFlowMode flowMode,
- final FlowCreditManager_0_10 creditManager,
- final FilterManager filterManager,
- final Map<String,Object> arguments);
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java Fri Oct 21 01:19:00 2011
@@ -20,28 +20,17 @@
*/
package org.apache.qpid.server.subscription;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.flow.FlowCreditManager_0_10;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.transport.ServerSession;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageFlowMode;
public class SubscriptionFactoryImpl implements SubscriptionFactory
{
- private static final AtomicLong SUB_ID_GENERATOR = new AtomicLong(0);
-
public Subscription createSubscription(int channelId, AMQProtocolSession protocolSession,
AMQShortString consumerTag, boolean acks, FieldTable filters,
boolean noLocal, FlowCreditManager creditManager) throws AMQException
@@ -89,47 +78,18 @@ public class SubscriptionFactoryImpl imp
if(isBrowser)
{
- return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId());
+ return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
}
else if(acks)
{
- return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId());
+ return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
}
else
{
- return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId());
+ return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
}
}
- public SubscriptionImpl.GetNoAckSubscription createBasicGetNoAckSubscription(final AMQChannel channel,
- final AMQProtocolSession session,
- final AMQShortString consumerTag,
- final FieldTable filters,
- final boolean noLocal,
- final FlowCreditManager creditManager,
- final ClientDeliveryMethod deliveryMethod,
- final RecordDeliveryMethod recordMethod) throws AMQException
- {
- return new SubscriptionImpl.GetNoAckSubscription(channel, session, null, null, false, creditManager, deliveryMethod, recordMethod, getNextSubscriptionId());
- }
-
- public Subscription_0_10 createSubscription(final ServerSession session,
- final String destination,
- final MessageAcceptMode acceptMode,
- final MessageAcquireMode acquireMode,
- final MessageFlowMode flowMode,
- final FlowCreditManager_0_10 creditManager,
- final FilterManager filterManager,
- final Map<String,Object> arguments)
- {
- return new Subscription_0_10(session, destination, acceptMode, acquireMode,
- flowMode, creditManager, filterManager, arguments, getNextSubscriptionId());
- }
public static final SubscriptionFactoryImpl INSTANCE = new SubscriptionFactoryImpl();
-
- private static long getNextSubscriptionId()
- {
- return SUB_ID_GENERATOR.getAndIncrement();
- }
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Fri Oct 21 01:19:00 2011
@@ -88,7 +88,9 @@ public abstract class SubscriptionImpl i
private final Lock _stateChangeLock;
- private final long _subscriptionID;
+ private static final AtomicLong idGenerator = new AtomicLong(0);
+ // Create a simple ID that increments for ever new Subscription
+ private final long _subscriptionID = idGenerator.getAndIncrement();
private LogSubject _logSubject;
private LogActor _logActor;
private UUID _id;
@@ -102,11 +104,10 @@ public abstract class SubscriptionImpl i
AMQShortString consumerTag, FieldTable filters,
boolean noLocal, FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod,
- long subscriptionID)
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
}
@@ -150,11 +151,10 @@ public abstract class SubscriptionImpl i
AMQShortString consumerTag, FieldTable filters,
boolean noLocal, FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod,
- long subscriptionID)
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
}
@@ -211,45 +211,16 @@ public abstract class SubscriptionImpl i
}
- /**
- * NoAck Subscription for use with BasicGet method.
- */
- public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription
- {
- public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
- AMQShortString consumerTag, FieldTable filters,
- boolean noLocal, FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod,
- long subscriptionID)
- throws AMQException
- {
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
- }
-
- public boolean isTransient()
- {
- return true;
- }
-
- public boolean wouldSuspend(QueueEntry msg)
- {
- return !getCreditManager().useCreditForMessage(msg.getMessage());
- }
-
- }
-
static final class AckSubscription extends SubscriptionImpl
{
public AckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
AMQShortString consumerTag, FieldTable filters,
boolean noLocal, FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod,
- long subscriptionID)
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
}
@@ -325,11 +296,10 @@ public abstract class SubscriptionImpl i
AMQShortString consumerTag, FieldTable arguments,
boolean noLocal, FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod,
- long subscriptionID)
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
- _subscriptionID = subscriptionID;
+
_channel = channel;
_consumerTag = consumerTag;
@@ -475,7 +445,7 @@ public abstract class SubscriptionImpl i
//check that the message hasn't been rejected
- if (entry.isRejectedBy(getSubscriptionID()))
+ if (entry.isRejectedBy(this))
{
if (_logger.isDebugEnabled())
{
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java Fri Oct 21 01:19:00 2011
@@ -20,108 +20,121 @@
*/
package org.apache.qpid.server.subscription;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.subscription.Subscription;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.nio.ByteBuffer;
public class SubscriptionList
{
+
private final SubscriptionNode _head = new SubscriptionNode();
- private final AtomicReference<SubscriptionNode> _tail = new AtomicReference<SubscriptionNode>(_head);
- private final AtomicReference<SubscriptionNode> _subNodeMarker = new AtomicReference<SubscriptionNode>(_head);
- private final AtomicInteger _size = new AtomicInteger();
+ private AtomicReference<SubscriptionNode> _tail = new AtomicReference<SubscriptionNode>(_head);
+ private AtomicInteger _size = new AtomicInteger();
+
- public static final class SubscriptionNode
+ public final class SubscriptionNode
{
private final AtomicBoolean _deleted = new AtomicBoolean();
private final AtomicReference<SubscriptionNode> _next = new AtomicReference<SubscriptionNode>();
private final Subscription _sub;
+
public SubscriptionNode()
{
- //used for sentinel head and dummy node construction
+
_sub = null;
_deleted.set(true);
}
public SubscriptionNode(final Subscription sub)
{
- //used for regular node construction
_sub = sub;
}
- /**
- * Retrieves the first non-deleted node following the current node.
- * Any deleted non-tail nodes encountered during the search are unlinked.
- *
- * @return the next non-deleted node, or null if none was found.
- */
- public SubscriptionNode findNext()
+
+ public SubscriptionNode getNext()
{
+
SubscriptionNode next = nextNode();
while(next != null && next.isDeleted())
{
+
final SubscriptionNode newNext = next.nextNode();
if(newNext != null)
{
- //try to move our _next reference forward to the 'newNext'
- //node to unlink the deleted node
_next.compareAndSet(next, newNext);
next = nextNode();
}
else
{
- //'newNext' is null, meaning 'next' is the current tail. Can't unlink
- //the tail node for thread safety reasons, just use the null.
next = null;
}
- }
+ }
return next;
}
- /**
- * Gets the immediately next referenced node in the structure.
- *
- * @return the immediately next node in the structure, or null if at the tail.
- */
- protected SubscriptionNode nextNode()
+ private SubscriptionNode nextNode()
{
return _next.get();
}
- /**
- * Used to initialise the 'next' reference. Will only succeed if the reference was not previously set.
- *
- * @param node the SubscriptionNode to set as 'next'
- * @return whether the operation succeeded
- */
- private boolean setNext(final SubscriptionNode node)
- {
- return _next.compareAndSet(null, node);
- }
-
public boolean isDeleted()
{
return _deleted.get();
}
+
public boolean delete()
{
- return _deleted.compareAndSet(false,true);
+ if(_deleted.compareAndSet(false,true))
+ {
+ _size.decrementAndGet();
+ advanceHead();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
}
+
public Subscription getSubscription()
{
return _sub;
}
}
- private void insert(final SubscriptionNode node, final boolean count)
+
+ public SubscriptionList(AMQQueue queue)
{
+ }
+
+ private void advanceHead()
+ {
+ SubscriptionNode head = _head.nextNode();
+ while(head._next.get() != null && head.isDeleted())
+ {
+
+ final SubscriptionNode newhead = head.nextNode();
+ if(newhead != null)
+ {
+ _head._next.compareAndSet(head, newhead);
+ }
+ head = _head.nextNode();
+ }
+ }
+
+
+ public SubscriptionNode add(Subscription sub)
+ {
+ SubscriptionNode node = new SubscriptionNode(sub);
for (;;)
{
SubscriptionNode tail = _tail.get();
@@ -130,14 +143,11 @@ public class SubscriptionList
{
if (next == null)
{
- if (tail.setNext(node))
+ if (tail._next.compareAndSet(null, node))
{
_tail.compareAndSet(tail, node);
- if(count)
- {
- _size.incrementAndGet();
- }
- return;
+ _size.incrementAndGet();
+ return node;
}
}
else
@@ -146,101 +156,27 @@ public class SubscriptionList
}
}
}
- }
- public void add(final Subscription sub)
- {
- SubscriptionNode node = new SubscriptionNode(sub);
- insert(node, true);
}
- public boolean remove(final Subscription sub)
+ public boolean remove(Subscription sub)
{
- SubscriptionNode prevNode = _head;
- SubscriptionNode node = _head.nextNode();
-
+ SubscriptionNode node = _head.getNext();
while(node != null)
{
- if(sub.equals(node.getSubscription()) && node.delete())
+ if(sub.equals(node._sub) && node.delete())
{
- _size.decrementAndGet();
-
- SubscriptionNode tail = _tail.get();
- if(node == tail)
- {
- //we cant remove the last node from the structure for
- //correctness reasons, however we have just 'deleted'
- //the tail. Inserting an empty dummy node after it will
- //let us scavenge the node containing the Subscription.
- insert(new SubscriptionNode(), false);
- }
-
- //advance the next node reference in the 'prevNode' to scavange
- //the newly 'deleted' node for the Subscription.
- prevNode.findNext();
-
- nodeMarkerCleanup(node);
-
return true;
}
-
- prevNode = node;
- node = node.findNext();
+ node = node.getNext();
}
-
return false;
}
- private void nodeMarkerCleanup(final SubscriptionNode node)
- {
- SubscriptionNode markedNode = _subNodeMarker.get();
- if(node == markedNode)
- {
- //if the marked node is the one we are removing, then
- //replace it with a dummy pointing at the next node.
- //this is OK as the marked node is only used to index
- //into the list and find the next node to use.
- //Because we inserted a dummy if node was the
- //tail, markedNode.nextNode() can never be null.
- SubscriptionNode dummy = new SubscriptionNode();
- dummy.setNext(markedNode.nextNode());
-
- //if the CAS fails the marked node has changed, thus
- //we don't care about the dummy and just forget it
- _subNodeMarker.compareAndSet(markedNode, dummy);
- }
- else if(markedNode != null)
- {
- //if the marked node was already deleted then it could
- //hold subsequently removed nodes after it in the list
- //in memory. Scavenge it to ensure their actual removal.
- if(markedNode != _head && markedNode.isDeleted())
- {
- markedNode.findNext();
- }
- }
- }
-
- public boolean updateMarkedNode(final SubscriptionNode expected, final SubscriptionNode nextNode)
- {
- return _subNodeMarker.compareAndSet(expected, nextNode);
- }
-
- /**
- * Get the current marked SubscriptionNode. This should only be used only to index into the list and find the next node
- * after the mark, since if the previously marked node was subsequently deleted the item returned may be a dummy node
- * with reference to the next node.
- *
- * @return the previously marked node (or a dummy if it was subsequently deleted)
- */
- public SubscriptionNode getMarkedNode()
- {
- return _subNodeMarker.get();
- }
-
public static class SubscriptionNodeIterator
{
+
private SubscriptionNode _lastNode;
SubscriptionNodeIterator(SubscriptionNode startNode)
@@ -248,25 +184,49 @@ public class SubscriptionList
_lastNode = startNode;
}
+
+ public boolean atTail()
+ {
+ return _lastNode.nextNode() == null;
+ }
+
public SubscriptionNode getNode()
{
+
return _lastNode;
+
}
public boolean advance()
{
- SubscriptionNode nextNode = _lastNode.findNext();
- _lastNode = nextNode;
- return _lastNode != null;
+ if(!atTail())
+ {
+ SubscriptionNode nextNode = _lastNode.nextNode();
+ while(nextNode.isDeleted() && nextNode.nextNode() != null)
+ {
+ nextNode = nextNode.nextNode();
+ }
+ _lastNode = nextNode;
+ return true;
+
+ }
+ else
+ {
+ return false;
+ }
+
}
+
}
+
public SubscriptionNodeIterator iterator()
{
return new SubscriptionNodeIterator(_head);
}
+
public SubscriptionNode getHead()
{
return _head;
@@ -276,6 +236,9 @@ public class SubscriptionList
{
return _size.get();
}
+
+
+
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Fri Oct 21 01:19:00 2011
@@ -40,6 +40,7 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.SubscriptionActor;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.message.AMQMessage;
@@ -79,7 +80,10 @@ import java.nio.ByteBuffer;
public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject
{
- private final long _subscriptionID;
+
+ private static final AtomicLong idGenerator = new AtomicLong(0);
+ // Create a simple ID that increments for ever new Subscription
+ private final long _subscriptionID = idGenerator.getAndIncrement();
private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this);
@@ -93,6 +97,7 @@ public class Subscription_0_10 implement
private FlowCreditManager_0_10 _creditManager;
+
private StateListener _stateListener = new StateListener()
{
@@ -109,15 +114,16 @@ public class Subscription_0_10 implement
private final MessageAcquireMode _acquireMode;
private MessageFlowMode _flowMode;
private final ServerSession _session;
- private final AtomicBoolean _stopped = new AtomicBoolean(true);
+ private AtomicBoolean _stopped = new AtomicBoolean(true);
+ private ConcurrentHashMap<Integer, QueueEntry> _sentMap = new ConcurrentHashMap<Integer, QueueEntry>();
private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0];
private LogActor _logActor;
- private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
+ private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
private UUID _id;
private String _traceExclude;
private String _trace;
- private final long _createTime = System.currentTimeMillis();
+ private long _createTime = System.currentTimeMillis();
private final AtomicLong _deliveredCount = new AtomicLong(0);
private final Map<String, Object> _arguments;
@@ -126,9 +132,8 @@ public class Subscription_0_10 implement
MessageAcquireMode acquireMode,
MessageFlowMode flowMode,
FlowCreditManager_0_10 creditManager,
- FilterManager filters,Map<String, Object> arguments, long subscriptionId)
+ FilterManager filters,Map<String, Object> arguments)
{
- _subscriptionID = subscriptionId;
_session = session;
_destination = destination;
_acceptMode = acceptMode;
@@ -194,7 +199,7 @@ public class Subscription_0_10 implement
public boolean isSuspended()
{
- return !isActive() || _deleted.get() || _session.isClosing(); // TODO check for Session suspension
+ return !isActive() || _deleted.get(); // TODO check for Session suspension
}
public boolean hasInterest(QueueEntry entry)
@@ -203,7 +208,7 @@ public class Subscription_0_10 implement
//check that the message hasn't been rejected
- if (entry.isRejectedBy(getSubscriptionID()))
+ if (entry.isRejectedBy(this))
{
return false;
@@ -437,7 +442,7 @@ public class Subscription_0_10 implement
Struct[] headers = new Struct[] { deliveryProps, messageProps };
BasicContentHeaderProperties properties =
- (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties();
+ (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().properties;
final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange();
if(exchange != null)
{
@@ -727,22 +732,13 @@ public class Subscription_0_10 implement
public void stop()
{
- try
- {
- getSendLock();
-
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
- {
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
- }
- _stopped.set(true);
- FlowCreditManager_0_10 creditManager = getCreditManager();
- creditManager.clearCredit();
- }
- finally
+ if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
{
- releaseSendLock();
+ _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
}
+ _stopped.set(true);
+ FlowCreditManager_0_10 creditManager = getCreditManager();
+ creditManager.clearCredit();
}
public void addCredit(MessageCreditUnit unit, long value)
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java Fri Oct 21 01:19:00 2011
@@ -20,21 +20,21 @@
*/
package org.apache.qpid.server.transport;
-import org.apache.qpid.transport.network.NetworkTransport;
+import org.apache.qpid.transport.NetworkDriver;
public class QpidAcceptor
{
- NetworkTransport _transport;
+ NetworkDriver _driver;
String _protocol;
- public QpidAcceptor(NetworkTransport transport, String protocol)
+ public QpidAcceptor(NetworkDriver driver, String protocol)
{
- _transport = transport;
+ _driver = driver;
_protocol = protocol;
}
- public NetworkTransport getNetworkTransport()
+ public NetworkDriver getNetworkDriver()
{
- return _transport;
+ return _driver;
}
public String toString()
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Fri Oct 21 01:19:00 2011
@@ -20,19 +20,11 @@
*/
package org.apache.qpid.server.transport;
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*;
-import java.security.Principal;
import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
-import javax.security.auth.Subject;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.configuration.ConnectionConfig;
@@ -43,39 +35,23 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
-import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionCloseCode;
import org.apache.qpid.transport.ExecutionErrorCode;
import org.apache.qpid.transport.ExecutionException;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolEvent;
-import org.apache.qpid.transport.Session;
-public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject, AuthorizationHolder
+public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject
{
private ConnectionConfig _config;
private Runnable _onOpenTask;
private AtomicBoolean _logClosed = new AtomicBoolean(false);
private LogActor _actor = GenericActor.getInstance(this);
- private Subject _authorizedSubject = null;
- private Principal _authorizedPrincipal = null;
- private boolean _statisticsEnabled = false;
- private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
- private final long _connectionId;
-
- public ServerConnection(final long connectionId)
+ public ServerConnection()
{
- _connectionId = connectionId;
- }
- public UUID getId()
- {
- return _config.getId();
}
@Override
@@ -96,18 +72,8 @@ public class ServerConnection extends Co
_onOpenTask.run();
}
_actor.message(ConnectionMessages.OPEN(getClientId(), "0-10", true, true));
-
- getVirtualHost().getConnectionRegistry().registerConnection(this);
}
-
- if (state == State.CLOSE_RCVD || state == State.CLOSED || state == State.CLOSING)
- {
- if(_virtualHost != null)
- {
- _virtualHost.getConnectionRegistry().deregisterConnection(this);
- }
- }
-
+
if (state == State.CLOSED)
{
logClosed();
@@ -144,8 +110,6 @@ public class ServerConnection extends Co
public void setVirtualHost(VirtualHost virtualHost)
{
_virtualHost = virtualHost;
-
- initialiseStatistics();
}
public void setConnectionConfig(final ConnectionConfig config)
@@ -181,11 +145,6 @@ public class ServerConnection extends Co
((ServerSession)session).close();
}
-
- public LogSubject getLogSubject()
- {
- return (LogSubject) this;
- }
@Override
public void received(ProtocolEvent event)
@@ -220,9 +179,9 @@ public class ServerConnection extends Co
public String toLogString()
{
boolean hasVirtualHost = (null != this.getVirtualHost());
- boolean hasClientId = (null != getClientId());
+ boolean hasPrincipal = (null != getAuthorizationID());
- if (hasClientId && hasVirtualHost)
+ if (hasPrincipal && hasVirtualHost)
{
return "[" +
MessageFormat.format(CONNECTION_FORMAT,
@@ -232,7 +191,7 @@ public class ServerConnection extends Co
getVirtualHost().getName())
+ "] ";
}
- else if (hasClientId)
+ else if (hasPrincipal)
{
return "[" +
MessageFormat.format(USER_FORMAT,
@@ -256,147 +215,4 @@ public class ServerConnection extends Co
{
return _actor;
}
-
- public void close(AMQConstant cause, String message) throws AMQException
- {
- ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
- try
- {
- replyCode = ConnectionCloseCode.get(cause.getCode());
- }
- catch (IllegalArgumentException iae)
- {
- // Ignore
- }
- close(replyCode, message);
- }
-
- public List<AMQSessionModel> getSessionModels()
- {
- List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
- for (Session ssn : getChannels())
- {
- sessions.add((AMQSessionModel) ssn);
- }
- return sessions;
- }
-
- public void registerMessageDelivered(long messageSize)
- {
- if (isStatisticsEnabled())
- {
- _messagesDelivered.registerEvent(1L);
- _dataDelivered.registerEvent(messageSize);
- }
- _virtualHost.registerMessageDelivered(messageSize);
- }
-
- public void registerMessageReceived(long messageSize, long timestamp)
- {
- if (isStatisticsEnabled())
- {
- _messagesReceived.registerEvent(1L, timestamp);
- _dataReceived.registerEvent(messageSize, timestamp);
- }
- _virtualHost.registerMessageReceived(messageSize, timestamp);
- }
-
- public StatisticsCounter getMessageReceiptStatistics()
- {
- return _messagesReceived;
- }
-
- public StatisticsCounter getDataReceiptStatistics()
- {
- return _dataReceived;
- }
-
- public StatisticsCounter getMessageDeliveryStatistics()
- {
- return _messagesDelivered;
- }
-
- public StatisticsCounter getDataDeliveryStatistics()
- {
- return _dataDelivered;
- }
-
- public void resetStatistics()
- {
- _messagesDelivered.reset();
- _dataDelivered.reset();
- _messagesReceived.reset();
- _dataReceived.reset();
- }
-
- public void initialiseStatistics()
- {
- setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
- _virtualHost.getApplicationRegistry().getConfiguration().isStatisticsGenerationConnectionsEnabled());
-
- _messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId());
- _dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId());
- _messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId());
- _dataReceived = new StatisticsCounter("data-received-" + getConnectionId());
- }
-
- public boolean isStatisticsEnabled()
- {
- return _statisticsEnabled;
- }
-
- public void setStatisticsEnabled(boolean enabled)
- {
- _statisticsEnabled = enabled;
- }
-
- /**
- * @return authorizedSubject
- */
- public Subject getAuthorizedSubject()
- {
- return _authorizedSubject;
- }
-
- /**
- * Sets the authorized subject. It also extracts the UsernamePrincipal from the subject
- * and caches it for optimisation purposes.
- *
- * @param authorizedSubject
- */
- public void setAuthorizedSubject(final Subject authorizedSubject)
- {
- if (authorizedSubject == null)
- {
- _authorizedSubject = null;
- _authorizedPrincipal = null;
- }
- else
- {
- _authorizedSubject = authorizedSubject;
- _authorizedPrincipal = UsernamePrincipal.getUsernamePrincipalFromSubject(_authorizedSubject);
- }
- }
-
- public Principal getAuthorizedPrincipal()
- {
- return _authorizedPrincipal;
- }
-
- public long getConnectionId()
- {
- return _connectionId;
- }
-
- @Override
- public boolean isSessionNameUnique(String name)
- {
- return !super.hasSessionWithName(name);
- }
-
- @Override
- public String getUserName()
- {
- return _authorizedPrincipal.getName();
- }
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org