You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/09/19 17:13:38 UTC
svn commit: r1172657 [10/21] - in /qpid/branches/qpid-3346/qpid: ./ cpp/
cpp/bindings/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qpid/dotnet/
cpp/bindings/qpid/dotnet/examples/csharp.direct.receiver/Properties/
cpp/bindings/qpid/dotnet/examples/csha...
Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/management/AMQUserManagementMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/management/AMQUserManagementMBean.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/management/AMQUserManagementMBean.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/management/AMQUserManagementMBean.java Mon Sep 19 15:13:18 2011
@@ -91,15 +91,10 @@ public class AMQUserManagementMBean exte
public boolean setPassword(String username, String password)
{
- return setPassword(username, password.toCharArray());
- }
-
- public boolean setPassword(String username, char[] password)
- {
try
{
//delegate password changes to the Principal Database
- return _principalDatabase.updatePassword(new UsernamePrincipal(username), password);
+ return _principalDatabase.updatePassword(new UsernamePrincipal(username), password.toCharArray());
}
catch (AccountNotFoundException e)
{
@@ -108,11 +103,6 @@ public class AMQUserManagementMBean exte
}
}
- public boolean setRights(String username, boolean read, boolean write, boolean admin)
- {
- throw new UnsupportedOperationException("Support for setting access rights no longer supported.");
- }
-
public boolean createUser(String username, String password)
{
if (_principalDatabase.createPrincipal(new UsernamePrincipal(username), password.toCharArray()))
@@ -122,20 +112,6 @@ public class AMQUserManagementMBean exte
return false;
}
-
- public boolean createUser(String username, String password, boolean read, boolean write, boolean admin)
- {
- if (read || write || admin)
- {
- throw new UnsupportedOperationException("Support for setting access rights to true no longer supported.");
- }
- return createUser(username, password);
- }
-
- public boolean createUser(String username, char[] password, boolean read, boolean write, boolean admin)
- {
- return createUser(username, new String(password), read, write, admin);
- }
public boolean deleteUser(String username)
{
@@ -181,7 +157,6 @@ public class AMQUserManagementMBean exte
for (Principal user : users)
{
// Create header attributes list
-
// Read,Write,Admin items are depcreated and we return always false.
Object[] itemData = {user.getName(), false, false, false};
CompositeData messageData = new CompositeDataSupport(_userDataType, COMPOSITE_ITEM_NAMES.toArray(new String[COMPOSITE_ITEM_NAMES.size()]), itemData);
Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java Mon Sep 19 15:13:18 2011
@@ -20,18 +20,36 @@
*/
package org.apache.qpid.server.security.auth.manager;
+import javax.security.auth.Subject;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.qpid.common.Closeable;
+import org.apache.qpid.server.plugins.Plugin;
import org.apache.qpid.server.security.auth.AuthenticationResult;
/**
- * The AuthenticationManager class is the entity responsible for
- * determining the authenticity of user credentials.
+ * Implementations of the AuthenticationManager are responsible for determining
+ * the authenticity of a user's credentials.
+ *
+ * If the authentication is successful, the manager is responsible for producing a populated
+ * {@link Subject} containing the user's identity and zero or more principals representing
+ * groups to which the user belongs.
+ * <p>
+ * The {@link #initialise()} method is responsible for registering SASL mechanisms required by
+ * the manager. The {@link #close()} method must reverse this registration.
+ *
*/
-public interface AuthenticationManager extends Closeable
+public interface AuthenticationManager extends Closeable, Plugin
{
+ /** The name for the required SASL Server mechanisms */
+ public static final String PROVIDER_NAME= "AMQSASLProvider-Server";
+
+ /**
+ * Initialise the authentication plugin.
+ *
+ */
+ void initialise();
/**
* Gets the SASL mechanisms known to this manager.
Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java Mon Sep 19 15:13:18 2011
@@ -20,32 +20,64 @@
*/
package org.apache.qpid.server.security.auth.manager;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.security.Security;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.login.AccountNotFoundException;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslServerFactory;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.configuration.PropertyException;
+import org.apache.qpid.configuration.PropertyUtils;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
+import org.apache.qpid.server.security.auth.AuthenticationResult;
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
-import org.apache.qpid.server.security.auth.sasl.JCAProvider;
+import org.apache.qpid.server.security.auth.management.AMQUserManagementMBean;
import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
+import org.apache.qpid.server.security.auth.sasl.JCAProvider;
import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
-import org.apache.qpid.server.security.auth.AuthenticationResult;
-import javax.security.auth.Subject;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.login.AccountNotFoundException;
-import javax.security.sasl.SaslServerFactory;
-import javax.security.sasl.SaslServer;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.Sasl;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.TreeMap;
-import java.security.Security;
/**
* Concrete implementation of the AuthenticationManager that determines if supplied
- * user credentials match those appearing in a PrincipalDatabase.
+ * user credentials match those appearing in a PrincipalDatabase. The implementation
+ * of the PrincipalDatabase is determined from the configuration.
+ *
+ * This implementation also registers the JMX UserManagemement MBean.
+ *
+ * This plugin expects configuration such as:
*
+ * <pre>
+ * <pd-auth-manager>
+ * <principal-database>
+ * <class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class>
+ * <attributes>
+ * <attribute>
+ * <name>passwordFile</name>
+ * <value>${conf}/passwd</value>
+ * </attribute>
+ * </attributes>
+ * </principal-database>
+ * </pd-auth-manager>
+ * </pre>
*/
public class PrincipalDatabaseAuthenticationManager implements AuthenticationManager
{
@@ -55,25 +87,109 @@ public class PrincipalDatabaseAuthentica
private String _mechanisms;
/** Maps from the mechanism to the callback handler to use for handling those requests */
- private Map<String, CallbackHandler> _callbackHandlerMap = new HashMap<String, CallbackHandler>();
+ private final Map<String, CallbackHandler> _callbackHandlerMap = new HashMap<String, CallbackHandler>();
/**
* Maps from the mechanism to the properties used to initialise the server. See the method Sasl.createSaslServer for
* details of the use of these properties. This map is populated during initialisation of each provider.
*/
- private Map<String, Map<String, ?>> _serverCreationProperties = new HashMap<String, Map<String, ?>>();
+ private final Map<String, Map<String, ?>> _serverCreationProperties = new HashMap<String, Map<String, ?>>();
+
+ protected PrincipalDatabase _principalDatabase = null;
- /** The name for the required SASL Server mechanisms */
- public static final String PROVIDER_NAME= "AMQSASLProvider-Server";
+ protected AMQUserManagementMBean _mbean = null;
- public PrincipalDatabaseAuthenticationManager()
+ public static final AuthenticationManagerPluginFactory<PrincipalDatabaseAuthenticationManager> FACTORY = new AuthenticationManagerPluginFactory<PrincipalDatabaseAuthenticationManager>()
{
- _logger.info("Initialising PrincipalDatabase authentication manager.");
+ public PrincipalDatabaseAuthenticationManager newInstance(final ConfigurationPlugin config) throws ConfigurationException
+ {
+ final PrincipalDatabaseAuthenticationManagerConfiguration configuration = config.getConfiguration(PrincipalDatabaseAuthenticationManagerConfiguration.class.getName());
+
+ // If there is no configuration for this plugin then don't load it.
+ if (configuration == null)
+ {
+ _logger.info("No authentication-manager configuration found for PrincipalDatabaseAuthenticationManager");
+ return null;
+ }
+
+ final PrincipalDatabaseAuthenticationManager pdam = new PrincipalDatabaseAuthenticationManager();
+ pdam.configure(configuration);
+ pdam.initialise();
+ return pdam;
+ }
+
+ public Class<PrincipalDatabaseAuthenticationManager> getPluginClass()
+ {
+ return PrincipalDatabaseAuthenticationManager.class;
+ }
+
+ public String getPluginName()
+ {
+ return PrincipalDatabaseAuthenticationManager.class.getName();
+ }
+ };
+
+ public static class PrincipalDatabaseAuthenticationManagerConfiguration extends ConfigurationPlugin {
+
+ public static final ConfigurationPluginFactory FACTORY = new ConfigurationPluginFactory()
+ {
+ public List<String> getParentPaths()
+ {
+ return Arrays.asList("security.pd-auth-manager");
+ }
+
+ public ConfigurationPlugin newInstance(final String path, final Configuration config) throws ConfigurationException
+ {
+ final ConfigurationPlugin instance = new PrincipalDatabaseAuthenticationManagerConfiguration();
+
+ instance.setConfiguration(path, config);
+ return instance;
+ }
+ };
+
+ public String[] getElementsProcessed()
+ {
+ return new String[] {"principal-database.class",
+ "principal-database.attributes.attribute.name",
+ "principal-database.attributes.attribute.value"};
+ }
+
+ public void validateConfiguration() throws ConfigurationException
+ {
+ }
+
+ public String getPrincipalDatabaseClass()
+ {
+ return _configuration.getString("principal-database.class");
+ }
+
+ public Map<String,String> getPdClassAttributeMap() throws ConfigurationException
+ {
+ final List<String> argumentNames = _configuration.getList("principal-database.attributes.attribute.name");
+ final List<String> argumentValues = _configuration.getList("principal-database.attributes.attribute.value");
+ final Map<String,String> attributes = new HashMap<String,String>(argumentNames.size());
+
+ for (int i = 0; i < argumentNames.size(); i++)
+ {
+ final String argName = argumentNames.get(i);
+ final String argValue = argumentValues.get(i);
+
+ attributes.put(argName, argValue);
+ }
+
+ return Collections.unmodifiableMap(attributes);
+ }
+ }
- Map<String, Class<? extends SaslServerFactory>> providerMap = new TreeMap<String, Class<? extends SaslServerFactory>>();
+ protected PrincipalDatabaseAuthenticationManager()
+ {
+ }
+ public void initialise()
+ {
+ final Map<String, Class<? extends SaslServerFactory>> providerMap = new TreeMap<String, Class<? extends SaslServerFactory>>();
- initialiseAuthenticationMechanisms(providerMap, ApplicationRegistry.getInstance().getDatabaseManager().getDatabases());
+ initialiseAuthenticationMechanisms(providerMap, _principalDatabase);
if (providerMap.size() > 0)
{
@@ -86,28 +202,13 @@ public class PrincipalDatabaseAuthentica
{
_logger.info("Additional SASL providers successfully registered.");
}
-
}
else
{
_logger.warn("No additional SASL providers registered.");
}
- }
- private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, Map<String, PrincipalDatabase> databases)
- {
- if (databases.size() > 1)
- {
- _logger.warn("More than one principle database provided currently authentication mechanism will override each other.");
- }
-
- for (Map.Entry<String, PrincipalDatabase> entry : databases.entrySet())
- {
- // fixme As the database now provide the mechanisms they support, they will ...
- // overwrite each other in the map. There should only be one database per vhost.
- // But currently we must have authentication before vhost definition.
- initialiseAuthenticationMechanisms(providerMap, entry.getValue());
- }
+ registerManagement();
}
private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, PrincipalDatabase database)
@@ -126,7 +227,6 @@ public class PrincipalDatabaseAuthentica
private void initialiseAuthenticationMechanism(String mechanism, AuthenticationProviderInitialiser initialiser,
Map<String, Class<? extends SaslServerFactory>> providerMap)
-
{
if (_mechanisms == null)
{
@@ -147,6 +247,21 @@ public class PrincipalDatabaseAuthentica
_logger.info("Initialised " + mechanism + " SASL provider successfully");
}
+ /**
+ * @see org.apache.qpid.server.plugins.Plugin#configure(org.apache.qpid.server.configuration.plugins.ConfigurationPlugin)
+ */
+ public void configure(final ConfigurationPlugin config) throws ConfigurationException
+ {
+ final PrincipalDatabaseAuthenticationManagerConfiguration pdamConfig = (PrincipalDatabaseAuthenticationManagerConfiguration) config;
+ final String pdClazz = pdamConfig.getPrincipalDatabaseClass();
+
+ _logger.info("PrincipalDatabase concrete implementation : " + pdClazz);
+
+ _principalDatabase = createPrincipalDatabaseImpl(pdClazz);
+
+ configPrincipalDatabase(_principalDatabase, pdamConfig);
+ }
+
public String getMechanisms()
{
return _mechanisms;
@@ -158,6 +273,9 @@ public class PrincipalDatabaseAuthentica
_callbackHandlerMap.get(mechanism));
}
+ /**
+ * @see org.apache.qpid.server.security.auth.manager.AuthenticationManager#authenticate(SaslServer, byte[])
+ */
public AuthenticationResult authenticate(SaslServer server, byte[] response)
{
try
@@ -182,23 +300,14 @@ public class PrincipalDatabaseAuthentica
}
}
- public void close()
- {
- _mechanisms = null;
- Security.removeProvider(PROVIDER_NAME);
- }
-
/**
* @see org.apache.qpid.server.security.auth.manager.AuthenticationManager#authenticate(String, String)
*/
- @Override
public AuthenticationResult authenticate(final String username, final String password)
{
- final PrincipalDatabase db = ApplicationRegistry.getInstance().getDatabaseManager().getDatabases().values().iterator().next();
-
try
{
- if (db.verifyPassword(username, password.toCharArray()))
+ if (_principalDatabase.verifyPassword(username, password.toCharArray()))
{
final Subject subject = new Subject();
subject.getPrincipals().add(new UsernamePrincipal(username));
@@ -214,4 +323,141 @@ public class PrincipalDatabaseAuthentica
return new AuthenticationResult(AuthenticationStatus.CONTINUE);
}
}
+
+ public void close()
+ {
+ _mechanisms = null;
+ Security.removeProvider(PROVIDER_NAME);
+
+ unregisterManagement();
+ }
+
+ private PrincipalDatabase createPrincipalDatabaseImpl(final String pdClazz) throws ConfigurationException
+ {
+ try
+ {
+ return (PrincipalDatabase) Class.forName(pdClazz).newInstance();
+ }
+ catch (InstantiationException ie)
+ {
+ throw new ConfigurationException("Cannot instantiate " + pdClazz, ie);
+ }
+ catch (IllegalAccessException iae)
+ {
+ throw new ConfigurationException("Cannot access " + pdClazz, iae);
+ }
+ catch (ClassNotFoundException cnfe)
+ {
+ throw new ConfigurationException("Cannot load " + pdClazz + " implementation", cnfe);
+ }
+ catch (ClassCastException cce)
+ {
+ throw new ConfigurationException("Expecting a " + PrincipalDatabase.class + " implementation", cce);
+ }
+ }
+
+ private void configPrincipalDatabase(final PrincipalDatabase principalDatabase, final PrincipalDatabaseAuthenticationManagerConfiguration config)
+ throws ConfigurationException
+ {
+
+ final Map<String,String> attributes = config.getPdClassAttributeMap();
+
+ for (Iterator<Entry<String, String>> iterator = attributes.entrySet().iterator(); iterator.hasNext();)
+ {
+ final Entry<String, String> nameValuePair = iterator.next();
+ final String methodName = generateSetterName(nameValuePair.getKey());
+ final Method method;
+ try
+ {
+ method = principalDatabase.getClass().getMethod(methodName, String.class);
+ }
+ catch (Exception e)
+ {
+ throw new ConfigurationException("No method " + methodName + " found in class "
+ + principalDatabase.getClass()
+ + " hence unable to configure principal database. The method must be public and "
+ + "have a single String argument with a void return type", e);
+ }
+ try
+ {
+ method.invoke(principalDatabase, PropertyUtils.replaceProperties(nameValuePair.getValue()));
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new ConfigurationException(e.getMessage(), e);
+ }
+ catch (PropertyException e)
+ {
+ throw new ConfigurationException(e.getMessage(), e);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new ConfigurationException(e.getMessage(), e);
+ }
+ catch (InvocationTargetException e)
+ {
+ // QPID-1347.. InvocationTargetException wraps the checked exception thrown from the reflective
+ // method call. Pull out the underlying message and cause to make these more apparent to the user.
+ throw new ConfigurationException(e.getCause().getMessage(), e.getCause());
+ }
+ }
+ }
+
+ private String generateSetterName(String argName) throws ConfigurationException
+ {
+ if ((argName == null) || (argName.length() == 0))
+ {
+ throw new ConfigurationException("Argument names must have length >= 1 character");
+ }
+
+ if (Character.isLowerCase(argName.charAt(0)))
+ {
+ argName = Character.toUpperCase(argName.charAt(0)) + argName.substring(1);
+ }
+
+ final String methodName = "set" + argName;
+ return methodName;
+ }
+
+ protected void setPrincipalDatabase(final PrincipalDatabase principalDatabase)
+ {
+ _principalDatabase = principalDatabase;
+ }
+
+ protected void registerManagement()
+ {
+ try
+ {
+ _logger.info("Registering UserManagementMBean");
+
+ _mbean = new AMQUserManagementMBean();
+ _mbean.setPrincipalDatabase(_principalDatabase);
+ _mbean.register();
+ }
+ catch (Exception e)
+ {
+ _logger.warn("User management disabled as unable to create MBean:", e);
+ _mbean = null;
+ }
+ }
+
+ protected void unregisterManagement()
+ {
+ try
+ {
+ if (_mbean != null)
+ {
+ _logger.info("Unregistering UserManagementMBean");
+ _mbean.unregister();
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.warn("Failed to unregister User management MBean:", e);
+ }
+ finally
+ {
+ _mbean = null;
+ }
+ }
}
Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/AuthenticationProviderInitialiser.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/AuthenticationProviderInitialiser.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/AuthenticationProviderInitialiser.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/AuthenticationProviderInitialiser.java Mon Sep 19 15:13:18 2011
@@ -25,9 +25,6 @@ import java.util.Map;
import javax.security.auth.callback.CallbackHandler;
import javax.security.sasl.SaslServerFactory;
-import org.apache.commons.configuration.Configuration;
-import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
-
public interface AuthenticationProviderInitialiser
{
/**
@@ -37,24 +34,6 @@ public interface AuthenticationProviderI
String getMechanismName();
/**
- * Initialise the authentication provider.
- * @param baseConfigPath the path in the config file that points to any config options for this provider. Each
- * provider can have its own set of configuration options
- * @param configuration the Apache Commons Configuration instance used to configure this provider
- * @param principalDatabases the set of principal databases that are available
- * @throws Exception needs refined Exception is too broad.
- */
- void initialise(String baseConfigPath, Configuration configuration,
- Map<String, PrincipalDatabase> principalDatabases) throws Exception;
-
- /**
- * Initialise the authentication provider.
- * @param db The principal database to initialise with
- */
- void initialise(PrincipalDatabase db);
-
-
- /**
* @return the callback handler that should be used to process authentication requests for this mechanism. This will
* be called after initialise and will be stored by the authentication manager. The callback handler <b>must</b> be
* fully threadsafe.
Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java Mon Sep 19 15:13:18 2011
@@ -21,12 +21,11 @@
package org.apache.qpid.server.security.auth.sasl;
import java.security.Provider;
-import java.security.Security;
import java.util.Map;
import javax.security.sasl.SaslServerFactory;
-public final class JCAProvider extends Provider
+public class JCAProvider extends Provider
{
public JCAProvider(String name, Map<String, Class<? extends SaslServerFactory>> providerMap)
{
Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java Mon Sep 19 15:13:18 2011
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.security.auth.sasl.amqplain;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
import javax.security.auth.callback.Callback;
@@ -31,7 +33,6 @@ import javax.security.sasl.AuthorizeCall
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
@@ -60,7 +61,7 @@ public class AmqPlainSaslServer implemen
{
try
{
- final FieldTable ft = FieldTableFactory.newFieldTable(ByteBuffer.wrap(response), response.length);
+ final FieldTable ft = FieldTableFactory.newFieldTable(new DataInputStream(new ByteArrayInputStream(response)), response.length);
String username = (String) ft.getString("LOGIN");
// we do not care about the prompt but it throws if null
NameCallback nameCb = new NameCallback("prompt", username);
Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java Mon Sep 19 15:13:18 2011
@@ -20,21 +20,9 @@
*/
package org.apache.qpid.server.security.auth.sasl.anonymous;
-import java.io.IOException;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.framing.AMQFrameDecodingException;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
public class AnonymousSaslServer implements SaslServer
{
Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Mon Sep 19 15:13:18 2011
@@ -259,7 +259,7 @@ public class AMQStateManager implements
public AMQProtocolSession getProtocolSession()
{
- SecurityManager.setThreadPrincipal(_protocolSession.getPrincipal());
+ SecurityManager.setThreadSubject(_protocolSession.getAuthorizedSubject());
return _protocolSession;
}
}
Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Mon Sep 19 15:13:18 2011
@@ -21,6 +21,7 @@
package org.apache.qpid.server.store;
import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.lang.ref.SoftReference;
@@ -479,9 +480,15 @@ public class DerbyMessageStore implement
FieldTable arguments;
if(dataAsBytes.length > 0)
{
- org.apache.mina.common.ByteBuffer buffer = org.apache.mina.common.ByteBuffer.wrap(dataAsBytes);
- arguments = new FieldTable(buffer,buffer.limit());
+ try
+ {
+ arguments = new FieldTable(new DataInputStream(new ByteArrayInputStream(dataAsBytes)),dataAsBytes.length);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("IO Exception should not be thrown",e);
+ }
}
else
{
Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java Mon Sep 19 15:13:18 2011
@@ -20,13 +20,21 @@
*/
package org.apache.qpid.server.subscription;
+import java.util.Map;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager_0_10;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.transport.ServerSession;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageFlowMode;
/**
* Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This factory
@@ -56,4 +64,23 @@ public interface SubscriptionFactory
RecordDeliveryMethod recordMethod
)
throws AMQException;
+
+
+ SubscriptionImpl.GetNoAckSubscription createBasicGetNoAckSubscription(AMQChannel channel,
+ AMQProtocolSession session,
+ AMQShortString consumerTag,
+ FieldTable filters,
+ boolean noLocal,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod) throws AMQException;
+
+ Subscription_0_10 createSubscription(final ServerSession session,
+ final String destination,
+ final MessageAcceptMode acceptMode,
+ final MessageAcquireMode acquireMode,
+ final MessageFlowMode flowMode,
+ final FlowCreditManager_0_10 creditManager,
+ final FilterManager filterManager,
+ final Map<String,Object> arguments);
}
Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java Mon Sep 19 15:13:18 2011
@@ -20,17 +20,28 @@
*/
package org.apache.qpid.server.subscription;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager_0_10;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageFlowMode;
public class SubscriptionFactoryImpl implements SubscriptionFactory
{
+ private static final AtomicLong SUB_ID_GENERATOR = new AtomicLong(0);
+
public Subscription createSubscription(int channelId, AMQProtocolSession protocolSession,
AMQShortString consumerTag, boolean acks, FieldTable filters,
boolean noLocal, FlowCreditManager creditManager) throws AMQException
@@ -78,18 +89,47 @@ public class SubscriptionFactoryImpl imp
if(isBrowser)
{
- return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
+ return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId());
}
else if(acks)
{
- return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
+ return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId());
}
else
{
- return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
+ return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId());
}
}
+ public SubscriptionImpl.GetNoAckSubscription createBasicGetNoAckSubscription(final AMQChannel channel,
+ final AMQProtocolSession session,
+ final AMQShortString consumerTag,
+ final FieldTable filters,
+ final boolean noLocal,
+ final FlowCreditManager creditManager,
+ final ClientDeliveryMethod deliveryMethod,
+ final RecordDeliveryMethod recordMethod) throws AMQException
+ {
+ return new SubscriptionImpl.GetNoAckSubscription(channel, session, null, null, false, creditManager, deliveryMethod, recordMethod, getNextSubscriptionId());
+ }
+
+ public Subscription_0_10 createSubscription(final ServerSession session,
+ final String destination,
+ final MessageAcceptMode acceptMode,
+ final MessageAcquireMode acquireMode,
+ final MessageFlowMode flowMode,
+ final FlowCreditManager_0_10 creditManager,
+ final FilterManager filterManager,
+ final Map<String,Object> arguments)
+ {
+ return new Subscription_0_10(session, destination, acceptMode, acquireMode,
+ flowMode, creditManager, filterManager, arguments, getNextSubscriptionId());
+ }
public static final SubscriptionFactoryImpl INSTANCE = new SubscriptionFactoryImpl();
+
+ private static long getNextSubscriptionId()
+ {
+ return SUB_ID_GENERATOR.getAndIncrement();
+ }
}
Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Mon Sep 19 15:13:18 2011
@@ -88,9 +88,7 @@ public abstract class SubscriptionImpl i
private final Lock _stateChangeLock;
- private static final AtomicLong idGenerator = new AtomicLong(0);
- // Create a simple ID that increments for ever new Subscription
- private final long _subscriptionID = idGenerator.getAndIncrement();
+ private final long _subscriptionID;
private LogSubject _logSubject;
private LogActor _logActor;
private UUID _id;
@@ -104,10 +102,11 @@ public abstract class SubscriptionImpl i
AMQShortString consumerTag, FieldTable filters,
boolean noLocal, FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
+ RecordDeliveryMethod recordMethod,
+ long subscriptionID)
throws AMQException
{
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
}
@@ -151,10 +150,11 @@ public abstract class SubscriptionImpl i
AMQShortString consumerTag, FieldTable filters,
boolean noLocal, FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
+ RecordDeliveryMethod recordMethod,
+ long subscriptionID)
throws AMQException
{
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
}
@@ -211,16 +211,45 @@ public abstract class SubscriptionImpl i
}
+ /**
+ * NoAck Subscription for use with BasicGet method.
+ */
+ public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription
+ {
+ public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
+ AMQShortString consumerTag, FieldTable filters,
+ boolean noLocal, FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod,
+ long subscriptionID)
+ throws AMQException
+ {
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
+ }
+
+ public boolean isTransient()
+ {
+ return true;
+ }
+
+ public boolean wouldSuspend(QueueEntry msg)
+ {
+ return !getCreditManager().useCreditForMessage(msg.getMessage());
+ }
+
+ }
+
static final class AckSubscription extends SubscriptionImpl
{
public AckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
AMQShortString consumerTag, FieldTable filters,
boolean noLocal, FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
+ RecordDeliveryMethod recordMethod,
+ long subscriptionID)
throws AMQException
{
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
}
@@ -296,10 +325,11 @@ public abstract class SubscriptionImpl i
AMQShortString consumerTag, FieldTable arguments,
boolean noLocal, FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
+ RecordDeliveryMethod recordMethod,
+ long subscriptionID)
throws AMQException
{
-
+ _subscriptionID = subscriptionID;
_channel = channel;
_consumerTag = consumerTag;
@@ -445,7 +475,7 @@ public abstract class SubscriptionImpl i
//check that the message hasn't been rejected
- if (entry.isRejectedBy(this))
+ if (entry.isRejectedBy(getSubscriptionID()))
{
if (_logger.isDebugEnabled())
{
Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java Mon Sep 19 15:13:18 2011
@@ -20,121 +20,108 @@
*/
package org.apache.qpid.server.subscription;
-import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.subscription.Subscription;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.nio.ByteBuffer;
public class SubscriptionList
{
-
private final SubscriptionNode _head = new SubscriptionNode();
- private AtomicReference<SubscriptionNode> _tail = new AtomicReference<SubscriptionNode>(_head);
- private AtomicInteger _size = new AtomicInteger();
-
+ private final AtomicReference<SubscriptionNode> _tail = new AtomicReference<SubscriptionNode>(_head);
+ private final AtomicReference<SubscriptionNode> _subNodeMarker = new AtomicReference<SubscriptionNode>(_head);
+ private final AtomicInteger _size = new AtomicInteger();
- public final class SubscriptionNode
+ public static final class SubscriptionNode
{
private final AtomicBoolean _deleted = new AtomicBoolean();
private final AtomicReference<SubscriptionNode> _next = new AtomicReference<SubscriptionNode>();
private final Subscription _sub;
-
public SubscriptionNode()
{
-
+ //used for sentinel head and dummy node construction
_sub = null;
_deleted.set(true);
}
public SubscriptionNode(final Subscription sub)
{
+ //used for regular node construction
_sub = sub;
}
-
- public SubscriptionNode getNext()
+ /**
+ * Retrieves the first non-deleted node following the current node.
+ * Any deleted non-tail nodes encountered during the search are unlinked.
+ *
+ * @return the next non-deleted node, or null if none was found.
+ */
+ public SubscriptionNode findNext()
{
-
SubscriptionNode next = nextNode();
while(next != null && next.isDeleted())
{
-
final SubscriptionNode newNext = next.nextNode();
if(newNext != null)
{
+ //try to move our _next reference forward to the 'newNext'
+ //node to unlink the deleted node
_next.compareAndSet(next, newNext);
next = nextNode();
}
else
{
+ //'newNext' is null, meaning 'next' is the current tail. Can't unlink
+ //the tail node for thread safety reasons, just use the null.
next = null;
}
-
}
+
return next;
}
- private SubscriptionNode nextNode()
+ /**
+ * Gets the immediately next referenced node in the structure.
+ *
+ * @return the immediately next node in the structure, or null if at the tail.
+ */
+ protected SubscriptionNode nextNode()
{
return _next.get();
}
+ /**
+ * Used to initialise the 'next' reference. Will only succeed if the reference was not previously set.
+ *
+ * @param node the SubscriptionNode to set as 'next'
+ * @return whether the operation succeeded
+ */
+ private boolean setNext(final SubscriptionNode node)
+ {
+ return _next.compareAndSet(null, node);
+ }
+
public boolean isDeleted()
{
return _deleted.get();
}
-
public boolean delete()
{
- if(_deleted.compareAndSet(false,true))
- {
- _size.decrementAndGet();
- advanceHead();
- return true;
- }
- else
- {
- return false;
- }
+ return _deleted.compareAndSet(false,true);
}
-
public Subscription getSubscription()
{
return _sub;
}
}
-
- public SubscriptionList(AMQQueue queue)
+ private void insert(final SubscriptionNode node, final boolean count)
{
- }
-
- private void advanceHead()
- {
- SubscriptionNode head = _head.nextNode();
- while(head._next.get() != null && head.isDeleted())
- {
-
- final SubscriptionNode newhead = head.nextNode();
- if(newhead != null)
- {
- _head._next.compareAndSet(head, newhead);
- }
- head = _head.nextNode();
- }
- }
-
-
- public SubscriptionNode add(Subscription sub)
- {
- SubscriptionNode node = new SubscriptionNode(sub);
for (;;)
{
SubscriptionNode tail = _tail.get();
@@ -143,11 +130,14 @@ public class SubscriptionList
{
if (next == null)
{
- if (tail._next.compareAndSet(null, node))
+ if (tail.setNext(node))
{
_tail.compareAndSet(tail, node);
- _size.incrementAndGet();
- return node;
+ if(count)
+ {
+ _size.incrementAndGet();
+ }
+ return;
}
}
else
@@ -156,27 +146,101 @@ public class SubscriptionList
}
}
}
+ }
+ public void add(final Subscription sub)
+ {
+ SubscriptionNode node = new SubscriptionNode(sub);
+ insert(node, true);
}
- public boolean remove(Subscription sub)
+ public boolean remove(final Subscription sub)
{
- SubscriptionNode node = _head.getNext();
+ SubscriptionNode prevNode = _head;
+ SubscriptionNode node = _head.nextNode();
+
while(node != null)
{
- if(sub.equals(node._sub) && node.delete())
+ if(sub.equals(node.getSubscription()) && node.delete())
{
+ _size.decrementAndGet();
+
+ SubscriptionNode tail = _tail.get();
+ if(node == tail)
+ {
+ //we cant remove the last node from the structure for
+ //correctness reasons, however we have just 'deleted'
+ //the tail. Inserting an empty dummy node after it will
+ //let us scavenge the node containing the Subscription.
+ insert(new SubscriptionNode(), false);
+ }
+
+ //advance the next node reference in the 'prevNode' to scavange
+ //the newly 'deleted' node for the Subscription.
+ prevNode.findNext();
+
+ nodeMarkerCleanup(node);
+
return true;
}
- node = node.getNext();
+
+ prevNode = node;
+ node = node.findNext();
}
+
return false;
}
+ private void nodeMarkerCleanup(final SubscriptionNode node)
+ {
+ SubscriptionNode markedNode = _subNodeMarker.get();
+ if(node == markedNode)
+ {
+ //if the marked node is the one we are removing, then
+ //replace it with a dummy pointing at the next node.
+ //this is OK as the marked node is only used to index
+ //into the list and find the next node to use.
+ //Because we inserted a dummy if node was the
+ //tail, markedNode.nextNode() can never be null.
+ SubscriptionNode dummy = new SubscriptionNode();
+ dummy.setNext(markedNode.nextNode());
+
+ //if the CAS fails the marked node has changed, thus
+ //we don't care about the dummy and just forget it
+ _subNodeMarker.compareAndSet(markedNode, dummy);
+ }
+ else if(markedNode != null)
+ {
+ //if the marked node was already deleted then it could
+ //hold subsequently removed nodes after it in the list
+ //in memory. Scavenge it to ensure their actual removal.
+ if(markedNode != _head && markedNode.isDeleted())
+ {
+ markedNode.findNext();
+ }
+ }
+ }
- public static class SubscriptionNodeIterator
+ public boolean updateMarkedNode(final SubscriptionNode expected, final SubscriptionNode nextNode)
+ {
+ return _subNodeMarker.compareAndSet(expected, nextNode);
+ }
+
+ /**
+ * Get the current marked SubscriptionNode. This should only be used only to index into the list and find the next node
+ * after the mark, since if the previously marked node was subsequently deleted the item returned may be a dummy node
+ * with reference to the next node.
+ *
+ * @return the previously marked node (or a dummy if it was subsequently deleted)
+ */
+ public SubscriptionNode getMarkedNode()
{
+ return _subNodeMarker.get();
+ }
+
+ public static class SubscriptionNodeIterator
+ {
private SubscriptionNode _lastNode;
SubscriptionNodeIterator(SubscriptionNode startNode)
@@ -184,49 +248,25 @@ public class SubscriptionList
_lastNode = startNode;
}
-
- public boolean atTail()
- {
- return _lastNode.nextNode() == null;
- }
-
public SubscriptionNode getNode()
{
-
return _lastNode;
-
}
public boolean advance()
{
+ SubscriptionNode nextNode = _lastNode.findNext();
+ _lastNode = nextNode;
- if(!atTail())
- {
- SubscriptionNode nextNode = _lastNode.nextNode();
- while(nextNode.isDeleted() && nextNode.nextNode() != null)
- {
- nextNode = nextNode.nextNode();
- }
- _lastNode = nextNode;
- return true;
-
- }
- else
- {
- return false;
- }
-
+ return _lastNode != null;
}
-
}
-
public SubscriptionNodeIterator iterator()
{
return new SubscriptionNodeIterator(_head);
}
-
public SubscriptionNode getHead()
{
return _head;
@@ -236,9 +276,6 @@ public class SubscriptionList
{
return _size.get();
}
-
-
-
}
Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Mon Sep 19 15:13:18 2011
@@ -40,7 +40,6 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.SubscriptionActor;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.message.AMQMessage;
@@ -80,10 +79,7 @@ import java.nio.ByteBuffer;
public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject
{
-
- private static final AtomicLong idGenerator = new AtomicLong(0);
- // Create a simple ID that increments for ever new Subscription
- private final long _subscriptionID = idGenerator.getAndIncrement();
+ private final long _subscriptionID;
private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this);
@@ -113,16 +109,15 @@ public class Subscription_0_10 implement
private final MessageAcquireMode _acquireMode;
private MessageFlowMode _flowMode;
private final ServerSession _session;
- private AtomicBoolean _stopped = new AtomicBoolean(true);
- private ConcurrentHashMap<Integer, QueueEntry> _sentMap = new ConcurrentHashMap<Integer, QueueEntry>();
+ private final AtomicBoolean _stopped = new AtomicBoolean(true);
private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0];
private LogActor _logActor;
- private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
+ private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
private UUID _id;
private String _traceExclude;
private String _trace;
- private long _createTime = System.currentTimeMillis();
+ private final long _createTime = System.currentTimeMillis();
private final AtomicLong _deliveredCount = new AtomicLong(0);
private final Map<String, Object> _arguments;
@@ -131,8 +126,9 @@ public class Subscription_0_10 implement
MessageAcquireMode acquireMode,
MessageFlowMode flowMode,
FlowCreditManager_0_10 creditManager,
- FilterManager filters,Map<String, Object> arguments)
+ FilterManager filters,Map<String, Object> arguments, long subscriptionId)
{
+ _subscriptionID = subscriptionId;
_session = session;
_destination = destination;
_acceptMode = acceptMode;
@@ -198,7 +194,7 @@ public class Subscription_0_10 implement
public boolean isSuspended()
{
- return !isActive() || _deleted.get(); // TODO check for Session suspension
+ return !isActive() || _deleted.get() || _session.isClosing(); // TODO check for Session suspension
}
public boolean hasInterest(QueueEntry entry)
@@ -207,7 +203,7 @@ public class Subscription_0_10 implement
//check that the message hasn't been rejected
- if (entry.isRejectedBy(this))
+ if (entry.isRejectedBy(getSubscriptionID()))
{
return false;
@@ -731,13 +727,22 @@ public class Subscription_0_10 implement
public void stop()
{
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+ try
{
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+ getSendLock();
+
+ if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+ {
+ _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+ }
+ _stopped.set(true);
+ FlowCreditManager_0_10 creditManager = getCreditManager();
+ creditManager.clearCredit();
+ }
+ finally
+ {
+ releaseSendLock();
}
- _stopped.set(true);
- FlowCreditManager_0_10 creditManager = getCreditManager();
- creditManager.clearCredit();
}
public void addCredit(MessageCreditUnit unit, long value)
Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Mon Sep 19 15:13:18 2011
@@ -20,13 +20,18 @@
*/
package org.apache.qpid.server.transport;
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT;
+import java.security.Principal;
import java.text.MessageFormat;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.security.auth.Subject;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
@@ -38,7 +43,8 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Connection;
@@ -49,20 +55,22 @@ import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolEvent;
import org.apache.qpid.transport.Session;
-public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject
+public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject, AuthorizationHolder
{
private ConnectionConfig _config;
private Runnable _onOpenTask;
private AtomicBoolean _logClosed = new AtomicBoolean(false);
private LogActor _actor = GenericActor.getInstance(this);
- private ApplicationRegistry _registry;
+ private Subject _authorizedSubject = null;
+ private Principal _authorizedPrincipal = null;
private boolean _statisticsEnabled = false;
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+ private final long _connectionId;
- public ServerConnection()
+ public ServerConnection(final long connectionId)
{
-
+ _connectionId = connectionId;
}
public UUID getId()
@@ -212,9 +220,9 @@ public class ServerConnection extends Co
public String toLogString()
{
boolean hasVirtualHost = (null != this.getVirtualHost());
- boolean hasPrincipal = (null != getAuthorizationID());
+ boolean hasClientId = (null != getClientId());
- if (hasPrincipal && hasVirtualHost)
+ if (hasClientId && hasVirtualHost)
{
return "[" +
MessageFormat.format(CONNECTION_FORMAT,
@@ -224,7 +232,7 @@ public class ServerConnection extends Co
getVirtualHost().getName())
+ "] ";
}
- else if (hasPrincipal)
+ else if (hasClientId)
{
return "[" +
MessageFormat.format(USER_FORMAT,
@@ -341,4 +349,54 @@ public class ServerConnection extends Co
{
_statisticsEnabled = enabled;
}
+
+ /**
+ * @return authorizedSubject
+ */
+ public Subject getAuthorizedSubject()
+ {
+ return _authorizedSubject;
+ }
+
+ /**
+ * Sets the authorized subject. It also extracts the UsernamePrincipal from the subject
+ * and caches it for optimisation purposes.
+ *
+ * @param authorizedSubject
+ */
+ public void setAuthorizedSubject(final Subject authorizedSubject)
+ {
+ if (authorizedSubject == null)
+ {
+ _authorizedSubject = null;
+ _authorizedPrincipal = null;
+ }
+ else
+ {
+ _authorizedSubject = authorizedSubject;
+ _authorizedPrincipal = UsernamePrincipal.getUsernamePrincipalFromSubject(_authorizedSubject);
+ }
+ }
+
+ public Principal getAuthorizedPrincipal()
+ {
+ return _authorizedPrincipal;
+ }
+
+ public long getConnectionId()
+ {
+ return _connectionId;
+ }
+
+ @Override
+ public boolean isSessionNameUnique(String name)
+ {
+ return !super.hasSessionWithName(name);
+ }
+
+ @Override
+ public String getUserName()
+ {
+ return _authorizedPrincipal.getName();
+ }
}
Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Mon Sep 19 15:13:18 2011
@@ -21,8 +21,10 @@
package org.apache.qpid.server.transport;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
@@ -31,11 +33,28 @@ import javax.security.sasl.SaslException
import javax.security.sasl.SaslServer;
import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
+import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.*;
+import org.apache.qpid.transport.Binary;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionClose;
+import org.apache.qpid.transport.ConnectionCloseCode;
+import org.apache.qpid.transport.ConnectionOpen;
+import org.apache.qpid.transport.ConnectionOpenOk;
+import org.apache.qpid.transport.ConnectionTuneOk;
+import org.apache.qpid.transport.ServerDelegate;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionAttach;
+import org.apache.qpid.transport.SessionDelegate;
+import org.apache.qpid.transport.SessionDetach;
+import org.apache.qpid.transport.SessionDetachCode;
+import org.apache.qpid.transport.SessionDetached;
public class ServerConnectionDelegate extends ServerDelegate
{
@@ -70,24 +89,42 @@ public class ServerConnectionDelegate ex
return list;
}
- @Override
public ServerSession getSession(Connection conn, SessionAttach atc)
{
- SessionDelegate serverSessionDelegate = new ServerSessionDelegate(_appRegistry);
+ SessionDelegate serverSessionDelegate = new ServerSessionDelegate();
ServerSession ssn = new ServerSession(conn, serverSessionDelegate, new Binary(atc.getName()), 0);
return ssn;
}
- @Override
protected SaslServer createSaslServer(String mechanism) throws SaslException
{
return _appRegistry.getAuthenticationManager().createSaslServer(mechanism, _localFQDN);
}
- @Override
+ protected void secure(final SaslServer ss, final Connection conn, final byte[] response)
+ {
+ final AuthenticationResult authResult = _appRegistry.getAuthenticationManager().authenticate(ss, response);
+ final ServerConnection sconn = (ServerConnection) conn;
+
+
+ if (AuthenticationStatus.SUCCESS.equals(authResult.getStatus()))
+ {
+ tuneAuthorizedConnection(sconn);
+ sconn.setAuthorizedSubject(authResult.getSubject());
+ }
+ else if (AuthenticationStatus.CONTINUE.equals(authResult.getStatus()))
+ {
+ connectionAuthContinue(sconn, authResult.getChallenge());
+ }
+ else
+ {
+ connectionAuthFailed(sconn, authResult.getCause());
+ }
+ }
+
public void connectionClose(Connection conn, ConnectionClose close)
{
try
@@ -101,10 +138,9 @@ public class ServerConnectionDelegate ex
}
- @Override
public void connectionOpen(Connection conn, ConnectionOpen open)
{
- ServerConnection sconn = (ServerConnection) conn;
+ final ServerConnection sconn = (ServerConnection) conn;
VirtualHost vhost;
String vhostName;
@@ -118,7 +154,7 @@ public class ServerConnectionDelegate ex
}
vhost = _appRegistry.getVirtualHostRegistry().getVirtualHost(vhostName);
- SecurityManager.setThreadPrincipal(conn.getAuthorizationID());
+ SecurityManager.setThreadSubject(sconn.getAuthorizedSubject());
if(vhost != null)
{
@@ -142,6 +178,26 @@ public class ServerConnectionDelegate ex
}
}
+
+ @Override
+ public void connectionTuneOk(final Connection conn, final ConnectionTuneOk ok)
+ {
+ ServerConnection sconn = (ServerConnection) conn;
+ int okChannelMax = ok.getChannelMax();
+
+ if (okChannelMax > getChannelMax())
+ {
+ _logger.error("Connection '" + sconn.getConnectionId() + "' being severed, " +
+ "client connectionTuneOk returned a channelMax (" + okChannelMax +
+ ") above the servers offered limit (" + getChannelMax() +")");
+
+ //Due to the error we must forcefully close the connection without negotiation
+ sconn.getSender().close();
+ return;
+ }
+
+ setConnectionTuneOkChannelMax(sconn, okChannelMax);
+ }
@Override
protected int getHeartbeatMax()
@@ -155,4 +211,59 @@ public class ServerConnectionDelegate ex
{
return ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
}
+
+ @Override public void sessionDetach(Connection conn, SessionDetach dtc)
+ {
+ // To ensure a clean detach, we unregister any remaining subscriptions. Unregister ensures
+ // that any in-progress delivery (SubFlushRunner/QueueRunner) is completed before the unregister
+ // completes.
+ unregisterAllSubscriptions(conn, dtc);
+ super.sessionDetach(conn, dtc);
+ }
+
+ private void unregisterAllSubscriptions(Connection conn, SessionDetach dtc)
+ {
+ final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel());
+ final Collection<Subscription_0_10> subs = ssn.getSubscriptions();
+ for (Subscription_0_10 subscription_0_10 : subs)
+ {
+ ssn.unregister(subscription_0_10);
+ }
+ }
+
+ @Override
+ public void sessionAttach(final Connection conn, final SessionAttach atc)
+ {
+ final String clientId = new String(atc.getName());
+ final Session ssn = getSession(conn, atc);
+
+ if(isSessionNameUnique(clientId,conn))
+ {
+ conn.registerSession(ssn);
+ super.sessionAttach(conn, atc);
+ }
+ else
+ {
+ ssn.invoke(new SessionDetached(atc.getName(), SessionDetachCode.SESSION_BUSY));
+ ssn.closed();
+ }
+ }
+
+ private boolean isSessionNameUnique(final String name, final Connection conn)
+ {
+ final ServerConnection sconn = (ServerConnection) conn;
+ final String userId = sconn.getUserName();
+
+ final Iterator<AMQConnectionModel> connections =
+ ((ServerConnection)conn).getVirtualHost().getConnectionRegistry().getConnections().iterator();
+ while(connections.hasNext())
+ {
+ final AMQConnectionModel amqConnectionModel = (AMQConnectionModel) connections.next();
+ if (userId.equals(amqConnectionModel.getUserName()) && !amqConnectionModel.isSessionNameUnique(name))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
}
Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Mon Sep 19 15:13:18 2011
@@ -20,8 +20,8 @@
*/
package org.apache.qpid.server.transport;
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*;
-import static org.apache.qpid.util.Serial.*;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
+import static org.apache.qpid.util.Serial.gt;
import java.lang.ref.WeakReference;
import java.security.Principal;
@@ -38,6 +38,8 @@ import java.util.concurrent.ConcurrentSk
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
+import javax.security.auth.Subject;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.ProtocolEngine;
@@ -57,7 +59,7 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.security.PrincipalHolder;
+import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -75,9 +77,7 @@ import org.apache.qpid.transport.Session
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.sun.security.auth.UserPrincipal;
-
-public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel, LogSubject
+public class ServerSession extends Session implements AuthorizationHolder, SessionConfig, AMQSessionModel, LogSubject
{
private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
@@ -118,8 +118,6 @@ public class ServerSession extends Sessi
private final AtomicLong _txnCount = new AtomicLong(0);
private final AtomicLong _txnUpdateTime = new AtomicLong(0);
- private Principal _principal;
-
private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>();
private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
@@ -131,27 +129,27 @@ public class ServerSession extends Sessi
this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig());
}
- protected void setState(State state)
- {
- super.setState(state);
-
- if (state == State.OPEN)
- {
- _actor.message(ChannelMessages.CREATE());
- }
- }
-
public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig)
{
super(connection, delegate, name, expiry);
_connectionConfig = connConfig;
_transaction = new AutoCommitTransaction(this.getMessageStore());
- _principal = new UserPrincipal(connection.getAuthorizationID());
+
_reference = new WeakReference<Session>(this);
_id = getConfigStore().createId();
getConfigStore().addConfiguredObject(this);
}
+ protected void setState(State state)
+ {
+ super.setState(state);
+
+ if (state == State.OPEN)
+ {
+ _actor.message(ChannelMessages.CREATE());
+ }
+ }
+
private ConfigStore getConfigStore()
{
return getConnectionConfig().getConfigStore();
@@ -202,8 +200,8 @@ public class ServerSession extends Sessi
public void sendMessage(MessageTransfer xfr,
Runnable postIdSettingAction)
{
- invoke(xfr, postIdSettingAction);
getConnectionModel().registerMessageDelivered(xfr.getBodySize());
+ invoke(xfr, postIdSettingAction);
}
public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener)
@@ -419,7 +417,7 @@ public class ServerSession extends Sessi
catch (AMQException e)
{
// TODO
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ _logger.error("Failed to unregister subscription", e);
}
finally
{
@@ -516,9 +514,14 @@ public class ServerSession extends Sessi
return _txnCount.get();
}
- public Principal getPrincipal()
+ public Principal getAuthorizedPrincipal()
{
- return _principal;
+ return ((ServerConnection) getConnection()).getAuthorizedPrincipal();
+ }
+
+ public Subject getAuthorizedSubject()
+ {
+ return ((ServerConnection) getConnection()).getAuthorizedSubject();
}
public void addSessionCloseTask(Task task)
@@ -667,11 +670,25 @@ public class ServerSession extends Sessi
{
return "[" +
MessageFormat.format(CHANNEL_FORMAT,
- getConnection().getConnectionId(),
+ ((ServerConnection) getConnection()).getConnectionId(),
getClientID(),
((ProtocolEngine) _connectionConfig).getRemoteAddress().toString(),
getVirtualHost().getName(),
getChannel())
+ "] ";
}
+
+ @Override
+ public void close()
+ {
+ // unregister subscriptions in order to prevent sending of new messages
+ // to subscriptions with closing session
+ final Collection<Subscription_0_10> subscriptions = getSubscriptions();
+ for (Subscription_0_10 subscription_0_10 : subscriptions)
+ {
+ unregister(subscription_0_10);
+ }
+
+ super.close();
+ }
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org