You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2013/02/19 10:35:33 UTC
svn commit: r1447646 [7/16] - in /qpid/trunk/qpid/java: ./ bdbstore/
bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/
bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/serv...
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java?rev=1447646&r1=1447645&r2=1447646&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java Tue Feb 19 09:35:28 2013
@@ -21,89 +21,211 @@
package org.apache.qpid.server.model.adapter;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
+import java.util.UUID;
+
+import javax.net.ssl.KeyManagerFactory;
+
+import org.apache.log4j.Logger;
import org.apache.qpid.common.QpidProperties;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.logging.LogRecorder;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.BrokerActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.GroupProvider;
+import org.apache.qpid.server.model.KeyStore;
import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Plugin;
import org.apache.qpid.server.model.Port;
-import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
-import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.TrustStore;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.security.auth.manager.IAuthenticationManagerRegistry;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.security.group.FileGroupManager;
import org.apache.qpid.server.security.group.GroupManager;
-import org.apache.qpid.server.transport.QpidAcceptor;
+import org.apache.qpid.server.security.group.GroupPrincipalAccessor;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.security.SubjectCreator;
+import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-public class BrokerAdapter extends AbstractAdapter implements Broker, VirtualHostRegistry.RegistryChangeListener,
- IApplicationRegistry.PortBindingListener,
- IAuthenticationManagerRegistry.RegistryChangeListener,
- IApplicationRegistry.GroupManagerChangeListener
+public class BrokerAdapter extends AbstractAdapter implements Broker, ConfigurationChangeListener
{
+ private static final Logger LOGGER = Logger.getLogger(BrokerAdapter.class);
- private final IApplicationRegistry _applicationRegistry;
- private String _name;
- private final Map<org.apache.qpid.server.virtualhost.VirtualHost, VirtualHostAdapter> _vhostAdapters =
- new HashMap<org.apache.qpid.server.virtualhost.VirtualHost, VirtualHostAdapter>();
- private final StatisticsAdapter _statistics;
- private final Map<QpidAcceptor, PortAdapter> _portAdapters = new HashMap<QpidAcceptor, PortAdapter>();
- private Collection<HTTPPortAdapter> _httpManagementPorts;
-
- private final Map<AuthenticationManager, AuthenticationProviderAdapter> _authManagerAdapters =
- new HashMap<AuthenticationManager, AuthenticationProviderAdapter>();
- private final Map<GroupManager, GroupProviderAdapter> _groupManagerAdapters =
- new HashMap<GroupManager, GroupProviderAdapter>();
-
-
- public BrokerAdapter(final IApplicationRegistry instance)
- {
- super(UUIDGenerator.generateRandomUUID());
- _applicationRegistry = instance;
- _name = "Broker";
- _statistics = new StatisticsAdapter(instance);
-
- instance.getVirtualHostRegistry().addRegistryChangeListener(this);
- populateVhosts();
- instance.addPortBindingListener(this);
- populatePorts();
- instance.addAuthenticationManagerRegistryChangeListener(this);
- populateAuthenticationManagers();
- instance.addGroupManagerChangeListener(this);
- populateGroupManagers();
- }
-
- private void populateVhosts()
- {
- synchronized(_vhostAdapters)
- {
- Collection<org.apache.qpid.server.virtualhost.VirtualHost> actualVhosts =
- _applicationRegistry.getVirtualHostRegistry().getVirtualHosts();
- for(org.apache.qpid.server.virtualhost.VirtualHost vh : actualVhosts)
- {
- if(!_vhostAdapters.containsKey(vh))
- {
- _vhostAdapters.put(vh, new VirtualHostAdapter(this, vh));
- }
- }
-
+ @SuppressWarnings("serial")
+ public static final Map<String, Class<?>> ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap<String, Class<?>>(){{
+ put(ALERT_THRESHOLD_MESSAGE_AGE, Long.class);
+ put(ALERT_THRESHOLD_MESSAGE_COUNT, Long.class);
+ put(ALERT_THRESHOLD_QUEUE_DEPTH, Long.class);
+ put(ALERT_THRESHOLD_MESSAGE_SIZE, Long.class);
+ put(ALERT_REPEAT_GAP, Long.class);
+ put(FLOW_CONTROL_SIZE_BYTES, Long.class);
+ put(FLOW_CONTROL_RESUME_SIZE_BYTES, Long.class);
+ put(HOUSEKEEPING_CHECK_PERIOD, Long.class);
+
+ put(DEAD_LETTER_QUEUE_ENABLED, Boolean.class);
+ put(STATISTICS_REPORTING_RESET_ENABLED, Boolean.class);
+
+ put(MAXIMUM_DELIVERY_ATTEMPTS, Integer.class);
+ put(SESSION_COUNT_LIMIT, Integer.class);
+ put(HEART_BEAT_DELAY, Integer.class);
+ put(STATISTICS_REPORTING_PERIOD, Integer.class);
+
+ put(ACL_FILE, String.class);
+ put(NAME, String.class);
+ put(DEFAULT_VIRTUAL_HOST, String.class);
+ put(DEFAULT_AUTHENTICATION_PROVIDER, String.class);
+
+ put(KEY_STORE_PATH, String.class);
+ put(KEY_STORE_PASSWORD, String.class);
+ put(KEY_STORE_CERT_ALIAS, String.class);
+ put(TRUST_STORE_PATH, String.class);
+ put(TRUST_STORE_PASSWORD, String.class);
+ put(GROUP_FILE, String.class);
+ }});
+
+ public static final int DEFAULT_STATISTICS_REPORTING_PERIOD = 0;
+ public static final boolean DEFAULT_STATISTICS_REPORTING_RESET_ENABLED = false;
+ public static final long DEFAULT_ALERT_REPEAT_GAP = 30000l;
+ public static final long DEFAULT_ALERT_THRESHOLD_MESSAGE_AGE = 0l;
+ public static final long DEFAULT_ALERT_THRESHOLD_MESSAGE_COUNT = 0l;
+ public static final long DEFAULT_ALERT_THRESHOLD_MESSAGE_SIZE = 0l;
+ public static final long DEFAULT_ALERT_THRESHOLD_QUEUE_DEPTH = 0l;
+ public static final boolean DEFAULT_DEAD_LETTER_QUEUE_ENABLED = false;
+ public static final int DEFAULT_MAXIMUM_DELIVERY_ATTEMPTS = 0;
+ public static final long DEFAULT_FLOW_CONTROL_RESUME_SIZE_BYTES = 0l;
+ public static final long DEFAULT_FLOW_CONTROL_SIZE_BYTES = 0l;
+ public static final long DEFAULT_HOUSEKEEPING_CHECK_PERIOD = 30000l;
+ public static final int DEFAULT_HEART_BEAT_DELAY = 0;
+ public static final int DEFAULT_SESSION_COUNT_LIMIT = 256;
+ public static final String DEFAULT_NAME = "QpidBroker";
+ private static final String DEFAULT_KEY_STORE_NAME = "defaultKeyStore";
+ private static final String DEFAULT_TRUST_STORE_NAME = "defaultTrustStore";
+ private static final String DEFAULT_GROUP_PROFIDER_NAME = "defaultGroupProvider";
+
+ private static final String DUMMY_PASSWORD_MASK = "********";
+
+ @SuppressWarnings("serial")
+ private static final Map<String, Object> DEFAULTS = Collections.unmodifiableMap(new HashMap<String, Object>(){{
+ put(Broker.STATISTICS_REPORTING_PERIOD, DEFAULT_STATISTICS_REPORTING_PERIOD);
+ put(Broker.STATISTICS_REPORTING_RESET_ENABLED, DEFAULT_STATISTICS_REPORTING_RESET_ENABLED);
+ put(Broker.ALERT_REPEAT_GAP, DEFAULT_ALERT_REPEAT_GAP);
+ put(Broker.ALERT_THRESHOLD_MESSAGE_AGE, DEFAULT_ALERT_THRESHOLD_MESSAGE_AGE);
+ put(Broker.ALERT_THRESHOLD_MESSAGE_COUNT, DEFAULT_ALERT_THRESHOLD_MESSAGE_COUNT);
+ put(Broker.ALERT_THRESHOLD_MESSAGE_SIZE, DEFAULT_ALERT_THRESHOLD_MESSAGE_SIZE);
+ put(Broker.ALERT_THRESHOLD_QUEUE_DEPTH, DEFAULT_ALERT_THRESHOLD_QUEUE_DEPTH);
+ put(Broker.DEAD_LETTER_QUEUE_ENABLED, DEFAULT_DEAD_LETTER_QUEUE_ENABLED);
+ put(Broker.MAXIMUM_DELIVERY_ATTEMPTS, DEFAULT_MAXIMUM_DELIVERY_ATTEMPTS);
+ put(Broker.FLOW_CONTROL_RESUME_SIZE_BYTES, DEFAULT_FLOW_CONTROL_RESUME_SIZE_BYTES);
+ put(Broker.FLOW_CONTROL_SIZE_BYTES, DEFAULT_FLOW_CONTROL_SIZE_BYTES);
+ put(Broker.HOUSEKEEPING_CHECK_PERIOD, DEFAULT_HOUSEKEEPING_CHECK_PERIOD);
+ put(Broker.HEART_BEAT_DELAY, DEFAULT_HEART_BEAT_DELAY);
+ put(Broker.SESSION_COUNT_LIMIT, DEFAULT_SESSION_COUNT_LIMIT);
+ put(Broker.NAME, DEFAULT_NAME);
+ }});
+
+
+
+
+ private final StatisticsGatherer _statisticsGatherer;
+ private final VirtualHostRegistry _virtualHostRegistry;
+ private final LogRecorder _logRecorder;
+ private final RootMessageLogger _rootMessageLogger;
+ private StatisticsAdapter _statistics;
+
+ private final Map<String, VirtualHost> _vhostAdapters = new HashMap<String, VirtualHost>();
+ private final Map<Integer, Port> _portAdapters = new HashMap<Integer, Port>();
+ private final Map<String, AuthenticationProvider> _authenticationProviders = new HashMap<String, AuthenticationProvider>();
+ private final Map<String, GroupProvider> _groupProviders = new HashMap<String, GroupProvider>();
+ private final Map<UUID, ConfiguredObject> _plugins = new HashMap<UUID, ConfiguredObject>();
+ private final Map<UUID, KeyStore> _keyStores = new HashMap<UUID, KeyStore>();
+ private final Map<UUID, TrustStore> _trustStores = new HashMap<UUID, TrustStore>();
+
+ private final AuthenticationProviderFactory _authenticationProviderFactory;
+ private AuthenticationProvider _defaultAuthenticationProvider;
+
+ private final PortFactory _portFactory;
+ private final SecurityManager _securityManager;
+ private final UUID _defaultKeyStoreId;
+ private final UUID _defaultTrustStoreId;
+
+ public BrokerAdapter(UUID id, Map<String, Object> attributes, StatisticsGatherer statisticsGatherer, VirtualHostRegistry virtualHostRegistry,
+ LogRecorder logRecorder, RootMessageLogger rootMessageLogger, AuthenticationProviderFactory authenticationProviderFactory,
+ PortFactory portFactory, TaskExecutor taskExecutor)
+ {
+ super(id, DEFAULTS, MapValueConverter.convert(attributes, ATTRIBUTE_TYPES), taskExecutor);
+ _statisticsGatherer = statisticsGatherer;
+ _virtualHostRegistry = virtualHostRegistry;
+ _logRecorder = logRecorder;
+ _rootMessageLogger = rootMessageLogger;
+ _statistics = new StatisticsAdapter(statisticsGatherer);
+ _authenticationProviderFactory = authenticationProviderFactory;
+ _portFactory = portFactory;
+ _securityManager = new SecurityManager((String)getAttribute(ACL_FILE));
+
+ _defaultKeyStoreId = UUIDGenerator.generateBrokerChildUUID(KeyStore.class.getSimpleName(), DEFAULT_KEY_STORE_NAME);
+ _defaultTrustStoreId = UUIDGenerator.generateBrokerChildUUID(TrustStore.class.getSimpleName(), DEFAULT_TRUST_STORE_NAME);
+ createBrokerChildrenFromAttributes();
+ }
+
+ /*
+ * A temporary method to create broker children that can be only configured via broker attributes
+ */
+ private void createBrokerChildrenFromAttributes()
+ {
+ String groupFile = (String) getAttribute(GROUP_FILE);
+ if (groupFile != null)
+ {
+ GroupManager groupManager = new FileGroupManager(groupFile);
+ UUID groupProviderId = UUIDGenerator.generateBrokerChildUUID(GroupProvider.class.getSimpleName(),
+ DEFAULT_GROUP_PROFIDER_NAME);
+ GroupProviderAdapter groupProviderAdapter = new GroupProviderAdapter(groupProviderId, groupManager, this);
+ addGroupProvider(groupProviderAdapter);
+ }
+ Map<String, Object> actualAttributes = getActualAttributes();
+ String keyStorePath = (String) getAttribute(KEY_STORE_PATH);
+ if (keyStorePath != null)
+ {
+ Map<String, Object> keyStoreAttributes = new HashMap<String, Object>();
+ keyStoreAttributes.put(KeyStore.NAME, DEFAULT_KEY_STORE_NAME);
+ keyStoreAttributes.put(KeyStore.PATH, keyStorePath);
+ keyStoreAttributes.put(KeyStore.PASSWORD, (String) actualAttributes.get(KEY_STORE_PASSWORD));
+ keyStoreAttributes.put(KeyStore.TYPE, java.security.KeyStore.getDefaultType());
+ keyStoreAttributes.put(KeyStore.CERTIFICATE_ALIAS, getAttribute(KEY_STORE_CERT_ALIAS));
+ keyStoreAttributes.put(KeyStore.KEY_MANAGER_FACTORY_ALGORITHM, KeyManagerFactory.getDefaultAlgorithm());
+ KeyStoreAdapter KeyStoreAdapter = new KeyStoreAdapter(_defaultKeyStoreId, this, keyStoreAttributes);
+ addKeyStore(KeyStoreAdapter);
+ }
+ String trustStorePath = (String) getAttribute(TRUST_STORE_PATH);
+ if (trustStorePath != null)
+ {
+ Map<String, Object> trsustStoreAttributes = new HashMap<String, Object>();
+ trsustStoreAttributes.put(TrustStore.NAME, DEFAULT_TRUST_STORE_NAME);
+ trsustStoreAttributes.put(TrustStore.PATH, trustStorePath);
+ trsustStoreAttributes.put(TrustStore.PASSWORD, (String) actualAttributes.get(TRUST_STORE_PASSWORD));
+ trsustStoreAttributes.put(TrustStore.TYPE, java.security.KeyStore.getDefaultType());
+ trsustStoreAttributes.put(TrustStore.KEY_MANAGER_FACTORY_ALGORITHM, KeyManagerFactory.getDefaultAlgorithm());
+ TrustStoreAdapter trustStore = new TrustStoreAdapter(_defaultTrustStoreId, this, trsustStoreAttributes);
+ addTrustStore(trustStore);
}
}
-
public Collection<VirtualHost> getVirtualHosts()
{
synchronized(_vhostAdapters)
@@ -112,107 +234,55 @@ public class BrokerAdapter extends Abstr
}
}
- private void populatePorts()
- {
- synchronized (_portAdapters)
- {
- Map<InetSocketAddress, QpidAcceptor> acceptors = _applicationRegistry.getAcceptors();
-
- for(Map.Entry<InetSocketAddress, QpidAcceptor> entry : acceptors.entrySet())
- {
- if(!_portAdapters.containsKey(entry.getValue()))
- {
- _portAdapters.put(entry.getValue(), new PortAdapter(this, entry.getValue(), entry.getKey()));
- }
- }
- if(_applicationRegistry.useHTTPManagement() || _applicationRegistry.useHTTPSManagement())
- {
- ArrayList<HTTPPortAdapter> httpPorts = new ArrayList<HTTPPortAdapter>();
- if (_applicationRegistry.useHTTPManagement())
- {
- httpPorts.add(new HTTPPortAdapter(this, _applicationRegistry.getHTTPManagementPort()));
- }
- if (_applicationRegistry.useHTTPSManagement())
- {
- httpPorts.add(new HTTPPortAdapter(this, _applicationRegistry.getHTTPSManagementPort(), Protocol.HTTPS, Transport.SSL));
- }
- _httpManagementPorts = Collections.unmodifiableCollection(httpPorts);
- }
- }
- }
public Collection<Port> getPorts()
{
synchronized (_portAdapters)
{
final ArrayList<Port> ports = new ArrayList<Port>(_portAdapters.values());
- if(_httpManagementPorts != null)
- {
- ports.addAll(_httpManagementPorts);
- }
return ports;
}
}
- private void populateAuthenticationManagers()
+ public Collection<AuthenticationProvider> getAuthenticationProviders()
{
- synchronized (_authManagerAdapters)
+ synchronized (_authenticationProviders)
{
- IAuthenticationManagerRegistry authenticationManagerRegistry =
- _applicationRegistry.getAuthenticationManagerRegistry();
- if(authenticationManagerRegistry != null)
- {
- Map<String, AuthenticationManager> authenticationManagers =
- authenticationManagerRegistry.getAvailableAuthenticationManagers();
-
- for(Map.Entry<String, AuthenticationManager> entry : authenticationManagers.entrySet())
- {
- if(!_authManagerAdapters.containsKey(entry.getValue()))
- {
- _authManagerAdapters.put(entry.getValue(),
- AuthenticationProviderAdapter.createAuthenticationProviderAdapter(this,
- entry.getValue()));
- }
- }
- }
+ return new ArrayList<AuthenticationProvider>(_authenticationProviders.values());
}
}
- private void populateGroupManagers()
+ public AuthenticationProvider getAuthenticationProviderByName(String authenticationProviderName)
{
- synchronized (_groupManagerAdapters)
+ Collection<AuthenticationProvider> providers = getAuthenticationProviders();
+ for (AuthenticationProvider authenticationProvider : providers)
{
- List<GroupManager> groupManagers = _applicationRegistry.getGroupManagers();
- if(groupManagers != null)
+ if (authenticationProvider.getName().equals(authenticationProviderName))
{
- for (GroupManager groupManager : groupManagers)
- {
- if(!_groupManagerAdapters.containsKey(groupManager))
- {
- _groupManagerAdapters.put(groupManager,
- GroupProviderAdapter.createGroupProviderAdapter(this, groupManager));
- }
- }
+ return authenticationProvider;
}
}
+ return null;
}
- public Collection<AuthenticationProvider> getAuthenticationProviders()
+ @Override
+ public AuthenticationProvider getDefaultAuthenticationProvider()
{
- synchronized (_authManagerAdapters)
- {
- final ArrayList<AuthenticationProvider> authManagers =
- new ArrayList<AuthenticationProvider>(_authManagerAdapters.values());
- return authManagers;
- }
+ return _defaultAuthenticationProvider;
}
+ public void setDefaultAuthenticationProvider(AuthenticationProvider provider)
+ {
+ _defaultAuthenticationProvider = provider;
+ }
+
+ @Override
public Collection<GroupProvider> getGroupProviders()
{
- synchronized (_groupManagerAdapters)
+ synchronized (_groupProviders)
{
final ArrayList<GroupProvider> groupManagers =
- new ArrayList<GroupProvider>(_groupManagerAdapters.values());
+ new ArrayList<GroupProvider>(_groupProviders.values());
return groupManagers;
}
}
@@ -228,22 +298,29 @@ public class BrokerAdapter extends Abstr
return null; //TODO
}
- public VirtualHost createVirtualHost(final Map<String, Object> attributes)
+ private VirtualHost createVirtualHost(final Map<String, Object> attributes)
throws AccessControlException, IllegalArgumentException
{
- return null; //TODO
+ final VirtualHostAdapter virtualHostAdapter = new VirtualHostAdapter(UUID.randomUUID(), attributes, this,
+ _statisticsGatherer, getTaskExecutor());
+ addVirtualHost(virtualHostAdapter);
+ virtualHostAdapter.setDesiredState(State.INITIALISING, State.ACTIVE);
+ return virtualHostAdapter;
}
- public void deleteVirtualHost(final VirtualHost vhost)
- throws AccessControlException, IllegalStateException
+ private boolean deleteVirtualHost(final VirtualHost vhost) throws AccessControlException, IllegalStateException
{
- //TODO
- throw new UnsupportedOperationException("Not yet implemented");
+ synchronized (_vhostAdapters)
+ {
+ _vhostAdapters.remove(vhost);
+ }
+ vhost.removeChangeListener(this);
+ return true;
}
public String getName()
{
- return _name;
+ return (String)getAttribute(NAME);
}
public String setName(final String currentName, final String desiredName)
@@ -297,6 +374,7 @@ public class BrokerAdapter extends Abstr
return _statistics;
}
+ @SuppressWarnings("unchecked")
@Override
public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
{
@@ -316,12 +394,26 @@ public class BrokerAdapter extends Abstr
{
return (Collection<C>) getGroupProviders();
}
+ else if(clazz == KeyStore.class)
+ {
+ return (Collection<C>) getKeyStores();
+ }
+ else if(clazz == TrustStore.class)
+ {
+ return (Collection<C>) getTrustStores();
+ }
+ else if(clazz == Plugin.class)
+ {
+ return (Collection<C>) getPlugins();
+ }
return Collections.emptySet();
}
+ //TODO: ACL
+ @SuppressWarnings("unchecked")
@Override
- public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
+ public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
{
if(childClass == VirtualHost.class)
{
@@ -341,111 +433,107 @@ public class BrokerAdapter extends Abstr
}
}
- private Port createPort(Map<String, Object> attributes)
+ private void addPort(Port port)
{
- // TODO
- return null;
+ synchronized (_portAdapters)
+ {
+ int portNumber = port.getPort();
+ if(_portAdapters.containsKey(portNumber))
+ {
+ throw new IllegalArgumentException("Cannot add port " + port + " because port number " + portNumber + " already configured");
+ }
+ _portAdapters.put(portNumber, port);
+ }
+ port.addChangeListener(this);
}
- private AuthenticationProvider createAuthenticationProvider(Map<String,Object> attributes)
+ private Port createPort(Map<String, Object> attributes)
{
- // TODO
- return null;
+ Port port = _portFactory.createPort(UUID.randomUUID(), this, attributes);
+ addPort(port);
+ return port;
}
+ private AuthenticationProvider createAuthenticationProvider(Map<String, Object> attributes)
+ {
+ // it's cheap to create the groupPrincipalAccessor on the fly
+ GroupPrincipalAccessor groupPrincipalAccessor = new GroupPrincipalAccessor(_groupProviders.values());
+
+ AuthenticationProvider authenticationProvider = _authenticationProviderFactory.create(UUID.randomUUID(), this, attributes, groupPrincipalAccessor);
+ addAuthenticationProvider(authenticationProvider);
+ return authenticationProvider;
+ }
- public void virtualHostRegistered(org.apache.qpid.server.virtualhost.VirtualHost virtualHost)
+ /**
+ * @throws IllegalConfigurationException if an AuthenticationProvider with the same name already exists
+ */
+ private void addAuthenticationProvider(AuthenticationProvider authenticationProvider)
{
- VirtualHostAdapter adapter = null;
- synchronized (_vhostAdapters)
+ String name = authenticationProvider.getName();
+ synchronized (_authenticationProviders)
{
- if(!_vhostAdapters.containsKey(virtualHost))
+ if(_authenticationProviders.containsKey(name))
{
- adapter = new VirtualHostAdapter(this, virtualHost);
- _vhostAdapters.put(virtualHost, adapter);
+ throw new IllegalConfigurationException("Cannot add AuthenticationProvider because one with name " + name + " already exists");
}
+ _authenticationProviders.put(name, authenticationProvider);
}
- if(adapter != null)
- {
- childAdded(adapter);
- }
+ authenticationProvider.addChangeListener(this);
}
- public void virtualHostUnregistered(org.apache.qpid.server.virtualhost.VirtualHost virtualHost)
+ private void addGroupProvider(GroupProvider groupProvider)
{
- VirtualHostAdapter adapter = null;
-
- synchronized (_vhostAdapters)
+ synchronized (_groupProviders)
{
- adapter = _vhostAdapters.remove(virtualHost);
- }
- if(adapter != null)
- {
- childRemoved(adapter);
+ String name = groupProvider.getName();
+ if(_groupProviders.containsKey(name))
+ {
+ throw new IllegalConfigurationException("Cannot add GroupProvider because one with name " + name + " already exists");
+ }
+ _groupProviders.put(name, groupProvider);
}
+ groupProvider.addChangeListener(this);
}
- @Override
- public void authenticationManagerRegistered(AuthenticationManager authenticationManager)
+ private boolean deleteGroupProvider(GroupProvider object)
+ {
+ throw new UnsupportedOperationException("Not implemented yet!");
+ }
+
+ private void addKeyStore(KeyStore keyStore)
{
- AuthenticationProviderAdapter adapter = null;
- synchronized (_authManagerAdapters)
+ synchronized (_keyStores)
{
- if(!_authManagerAdapters.containsKey(authenticationManager))
+ if(_keyStores.containsKey(keyStore.getId()))
{
- adapter =
- AuthenticationProviderAdapter.createAuthenticationProviderAdapter(this, authenticationManager);
- _authManagerAdapters.put(authenticationManager, adapter);
+ throw new IllegalConfigurationException("Cannot add KeyStore because one with id " + keyStore.getId() + " already exists");
}
+ _keyStores.put(keyStore.getId(), keyStore);
}
- if(adapter != null)
- {
- childAdded(adapter);
- }
+ keyStore.addChangeListener(this);
}
- @Override
- public void authenticationManagerUnregistered(AuthenticationManager authenticationManager)
+ private boolean deleteKeyStore(KeyStore object)
{
- AuthenticationProviderAdapter adapter;
- synchronized (_authManagerAdapters)
- {
- adapter = _authManagerAdapters.remove(authenticationManager);
- }
- if(adapter != null)
- {
- childRemoved(adapter);
- }
+ throw new UnsupportedOperationException("Not implemented yet!");
}
-
- @Override
- public void bound(QpidAcceptor acceptor, InetSocketAddress bindAddress)
+ private void addTrustStore(TrustStore trustStore)
{
- synchronized (_portAdapters)
+ synchronized (_trustStores)
{
- if(!_portAdapters.containsKey(acceptor))
+ if(_trustStores.containsKey(trustStore.getId()))
{
- PortAdapter adapter = new PortAdapter(this, acceptor, bindAddress);
- _portAdapters.put(acceptor, adapter);
- childAdded(adapter);
+ throw new IllegalConfigurationException("Cannot add TrustStore because one with id " + trustStore.getId() + " already exists");
}
+ _trustStores.put(trustStore.getId(), trustStore);
}
+ trustStore.addChangeListener(this);
}
- @Override
- public void unbound(QpidAcceptor acceptor)
+ private boolean deleteTrustStore(TrustStore object)
{
- PortAdapter adapter = null;
-
- synchronized (_portAdapters)
- {
- adapter = _portAdapters.remove(acceptor);
- }
- if(adapter != null)
- {
- childRemoved(adapter);
- }
+ throw new UnsupportedOperationException("Not implemented yet!");
}
@Override
@@ -461,10 +549,6 @@ public class BrokerAdapter extends Abstr
{
return getId();
}
- else if(NAME.equals(name))
- {
- return getName();
- }
else if(STATE.equals(name))
{
return State.ACTIVE;
@@ -520,46 +604,308 @@ public class BrokerAdapter extends Abstr
{
// TODO
}
+ else if (DEFAULT_AUTHENTICATION_PROVIDER.equals(name))
+ {
+ return _defaultAuthenticationProvider == null ? null : _defaultAuthenticationProvider.getName();
+ }
+ else if (KEY_STORE_PASSWORD.equals(name))
+ {
+ return DUMMY_PASSWORD_MASK;
+ }
+ else if (TRUST_STORE_PASSWORD.equals(name))
+ {
+ return DUMMY_PASSWORD_MASK;
+ }
+ return super.getAttribute(name);
+ }
+
+ private boolean deletePort(Port portAdapter)
+ {
+ Port removedPort = null;
+ synchronized (_portAdapters)
+ {
+ removedPort = _portAdapters.remove(portAdapter.getPort());
+ }
+ return removedPort != null;
+ }
+
+ private boolean deleteAuthenticationProvider(AuthenticationProvider authenticationProvider)
+ {
+ AuthenticationProvider removedAuthenticationProvider = null;
+ synchronized (_authenticationProviders)
+ {
+ removedAuthenticationProvider = _authenticationProviders.remove(authenticationProvider.getName());
+ }
+ return removedAuthenticationProvider != null;
+ }
+
+ private void addVirtualHost(VirtualHost virtualHost)
+ {
+ synchronized (_vhostAdapters)
+ {
+ String name = virtualHost.getName();
+ if (_vhostAdapters.containsKey(name))
+ {
+ throw new IllegalConfigurationException("Virtual host with name " + name + " is already specified!");
+ }
+ _vhostAdapters.put(name, virtualHost);
+ }
+ virtualHost.addChangeListener(this);
+ }
+
+ @Override
+ public boolean setState(State currentState, State desiredState)
+ {
+ if (desiredState == State.ACTIVE)
+ {
+ changeState(_groupProviders, currentState, State.ACTIVE, false);
+ changeState(_authenticationProviders, currentState, State.ACTIVE, false);
+
+ CurrentActor.set(new BrokerActor(getRootMessageLogger()));
+ try
+ {
+ changeState(_vhostAdapters, currentState, State.ACTIVE, false);
+ }
+ finally
+ {
+ CurrentActor.remove();
+ }
+
+ changeState(_portAdapters, currentState,State.ACTIVE, false);
+ changeState(_plugins, currentState,State.ACTIVE, false);
+ return true;
+ }
+ else if (desiredState == State.STOPPED)
+ {
+ changeState(_plugins, currentState,State.STOPPED, true);
+ changeState(_portAdapters, currentState, State.STOPPED, true);
+ changeState(_vhostAdapters,currentState, State.STOPPED, true);
+ changeState(_authenticationProviders, currentState, State.STOPPED, true);
+ changeState(_groupProviders, currentState, State.STOPPED, true);
+ return true;
+ }
+ return false;
+ }
+
+ private void changeState(Map<?, ? extends ConfiguredObject> configuredObjectMap, State currentState, State desiredState, boolean swallowException)
+ {
+ synchronized(configuredObjectMap)
+ {
+ Collection<? extends ConfiguredObject> adapters = configuredObjectMap.values();
+ for (ConfiguredObject configuredObject : adapters)
+ {
+ try
+ {
+ configuredObject.setDesiredState(currentState, desiredState);
+ }
+ catch(RuntimeException e)
+ {
+ if (swallowException)
+ {
+ LOGGER.error("Failed to stop " + configuredObject, e);
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void stateChanged(ConfiguredObject object, State oldState, State newState)
+ {
+ if(newState == State.DELETED)
+ {
+ boolean childDeleted = false;
+ if(object instanceof AuthenticationProvider)
+ {
+ childDeleted = deleteAuthenticationProvider((AuthenticationProvider)object);
+ }
+ else if(object instanceof Port)
+ {
+ childDeleted = deletePort((Port)object);
+ }
+ else if(object instanceof VirtualHost)
+ {
+ childDeleted = deleteVirtualHost((VirtualHost)object);
+ }
+ else if(object instanceof GroupProvider)
+ {
+ childDeleted = deleteGroupProvider((GroupProvider)object);
+ }
+ else if(object instanceof KeyStore)
+ {
+ childDeleted = deleteKeyStore((KeyStore)object);
+ }
+ else if(object instanceof TrustStore)
+ {
+ childDeleted = deleteTrustStore((TrustStore)object);
+ }
+ if(childDeleted)
+ {
+ childRemoved(object);
+ }
+ }
+ }
- return super.getAttribute(name); //TODO - Implement.
+ @Override
+ public void childAdded(ConfiguredObject object, ConfiguredObject child)
+ {
+ // no-op
}
@Override
- public Object setAttribute(String name, Object expected, Object desired)
- throws IllegalStateException, AccessControlException, IllegalArgumentException
+ public void childRemoved(ConfiguredObject object, ConfiguredObject child)
{
- return super.setAttribute(name, expected, desired); //TODO - Implement.
+ // no-op
}
@Override
- public void groupManagerRegistered(GroupManager groupManager)
+ public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, Object newAttributeValue)
+ {
+ // no-op
+ }
+
+ private void addPlugin(ConfiguredObject plugin)
{
- GroupProviderAdapter adapter = null;
- synchronized (_groupManagerAdapters)
+ synchronized(_plugins)
{
- if(!_groupManagerAdapters.containsKey(groupManager))
+ if (_plugins.containsKey(plugin.getId()))
{
- adapter = GroupProviderAdapter.createGroupProviderAdapter(this, groupManager);
- _groupManagerAdapters.put(groupManager, adapter);
+ throw new IllegalConfigurationException("Plugin with id '" + plugin.getId() + "' is already registered!");
}
+ _plugins.put(plugin.getId(), plugin);
+ }
+ plugin.addChangeListener(this);
+ }
+
+
+ private Collection<ConfiguredObject> getPlugins()
+ {
+ synchronized(_plugins)
+ {
+ return Collections.unmodifiableCollection(_plugins.values());
+ }
+ }
+
+ public void recoverChild(ConfiguredObject object)
+ {
+ if(object instanceof AuthenticationProvider)
+ {
+ addAuthenticationProvider((AuthenticationProvider)object);
+ }
+ else if(object instanceof Port)
+ {
+ addPort((Port)object);
+ }
+ else if(object instanceof VirtualHost)
+ {
+ addVirtualHost((VirtualHost)object);
+ }
+ else if(object instanceof GroupProvider)
+ {
+ addGroupProvider((GroupProvider)object);
+ }
+ else if(object instanceof KeyStore)
+ {
+ addKeyStore((KeyStore)object);
+ }
+ else if(object instanceof TrustStore)
+ {
+ addTrustStore((TrustStore)object);
+ }
+ else if(object instanceof Plugin)
+ {
+ addPlugin(object);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Attempted to recover unexpected type of configured object: " + object.getClass().getName());
}
- if(adapter != null)
+ }
+
+ @Override
+ public RootMessageLogger getRootMessageLogger()
+ {
+ return _rootMessageLogger;
+ }
+
+ @Override
+ public SecurityManager getSecurityManager()
+ {
+ return _securityManager;
+ }
+
+ @Override
+ public LogRecorder getLogRecorder()
+ {
+ return _logRecorder;
+ }
+
+ @Override
+ public VirtualHost findVirtualHostByName(String name)
+ {
+ return _vhostAdapters.get(name);
+ }
+
+ @Override
+ public SubjectCreator getSubjectCreator(SocketAddress localAddress)
+ {
+ InetSocketAddress inetSocketAddress = (InetSocketAddress)localAddress;
+ AuthenticationProvider provider = _defaultAuthenticationProvider;
+ Collection<Port> ports = getPorts();
+ for (Port p : ports)
{
- childAdded(adapter);
+ if (inetSocketAddress.getPort() == p.getPort())
+ {
+ provider = p.getAuthenticationProvider();
+ break;
+ }
}
+ return provider.getSubjectCreator();
}
@Override
- public void groupManagerUnregistered(GroupManager groupManager)
+ public Collection<KeyStore> getKeyStores()
{
- GroupProviderAdapter adapter;
- synchronized (_groupManagerAdapters)
+ synchronized(_trustStores)
{
- adapter = _groupManagerAdapters.remove(groupManager);
+ return Collections.unmodifiableCollection(_keyStores.values());
}
- if(adapter != null)
+ }
+
+ @Override
+ public Collection<TrustStore> getTrustStores()
+ {
+ synchronized(_trustStores)
{
- childRemoved(adapter);
+ return Collections.unmodifiableCollection(_trustStores.values());
}
}
+
+ @Override
+ public VirtualHostRegistry getVirtualHostRegistry()
+ {
+ return _virtualHostRegistry;
+ }
+
+ @Override
+ public KeyStore getDefaultKeyStore()
+ {
+ return _keyStores.get(_defaultKeyStoreId);
+ }
+
+ @Override
+ public TrustStore getDefaultTrustStore()
+ {
+ return _trustStores.get(_defaultTrustStoreId);
+ }
+
+ @Override
+ public TaskExecutor getTaskExecutor()
+ {
+ return super.getTaskExecutor();
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java?rev=1447646&r1=1447645&r2=1447646&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java Tue Feb 19 09:35:28 2013
@@ -38,6 +38,7 @@ import org.apache.qpid.server.model.Sess
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.stats.StatisticsGatherer;
@@ -50,9 +51,9 @@ final class ConnectionAdapter extends Ab
new HashMap<AMQSessionModel, SessionAdapter>();
private final Statistics _statistics;
- public ConnectionAdapter(final AMQConnectionModel conn)
+ public ConnectionAdapter(final AMQConnectionModel conn, TaskExecutor taskExecutor)
{
- super(UUIDGenerator.generateRandomUUID());
+ super(UUIDGenerator.generateRandomUUID(), taskExecutor);
_connection = conn;
_statistics = new ConnectionStatisticsAdapter(conn);
}
@@ -74,7 +75,7 @@ final class ConnectionAdapter extends Ab
{
if(!_sessionAdapters.containsKey(session))
{
- _sessionAdapters.put(session, new SessionAdapter(session));
+ _sessionAdapters.put(session, new SessionAdapter(session, getTaskExecutor()));
}
}
return new ArrayList<Session>(_sessionAdapters.values());
@@ -199,52 +200,6 @@ final class ConnectionAdapter extends Ab
}
@Override
- public Object setAttribute(String name, Object expected, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException
- {
- if(name.equals(CLIENT_ID))
- {
-
- }
- else if(name.equals(CLIENT_VERSION))
- {
-
- }
- else if(name.equals(INCOMING))
- {
-
- }
- else if(name.equals(LOCAL_ADDRESS))
- {
-
- }
- else if(name.equals(PRINCIPAL))
- {
-
- }
- else if(name.equals(PROPERTIES))
- {
-
- }
- else if(name.equals(REMOTE_ADDRESS))
- {
-
- }
- else if(name.equals(REMOTE_PROCESS_NAME))
- {
-
- }
- else if(name.equals(REMOTE_PROCESS_PID))
- {
-
- }
- else if(name.equals(SESSION_COUNT_LIMIT))
- {
-
- }
- return super.setAttribute(name, expected, desired);
- }
-
- @Override
public Collection<String> getAttributeNames()
{
final HashSet<String> attrNames = new HashSet<String>(super.getAttributeNames());
@@ -270,7 +225,8 @@ final class ConnectionAdapter extends Ab
}
}
- public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
+ @Override
+ public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
{
if(childClass == Session.class)
{
@@ -310,4 +266,11 @@ final class ConnectionAdapter extends Ab
return super.getStatistic(name);
}
}
+
+ @Override
+ protected boolean setState(State currentState, State desiredState)
+ {
+ // TODO: add state management
+ return false;
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java?rev=1447646&r1=1447645&r2=1447646&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java Tue Feb 19 09:35:28 2013
@@ -45,7 +45,7 @@ public class ConsumerAdapter extends Abs
queueAdapter.getName(),
subscription.getSessionModel().getConnectionModel().getRemoteAddressString(),
String.valueOf(subscription.getSessionModel().getChannelId()),
- subscription.getConsumerName()));
+ subscription.getConsumerName()), queueAdapter.getTaskExecutor());
_subscription = subscription;
_queue = queueAdapter;
_statistics = new ConsumerStatistics();
@@ -108,13 +108,6 @@ public class ConsumerAdapter extends Abs
}
@Override
- public Object setAttribute(final String name, final Object expected, final Object desired)
- throws IllegalStateException, AccessControlException, IllegalArgumentException
- {
- return super.setAttribute(name, expected, desired); //TODO
- }
-
- @Override
public Object getAttribute(final String name)
{
if(ID.equals(name))
@@ -222,4 +215,11 @@ public class ConsumerAdapter extends Abs
return null; // TODO - Implement
}
}
+
+ @Override
+ protected boolean setState(State currentState, State desiredState)
+ {
+ // TODO : Add state management
+ return false;
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java?rev=1447646&r1=1447645&r2=1447646&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java Tue Feb 19 09:35:28 2013
@@ -35,14 +35,13 @@ import org.apache.qpid.server.binding.Bi
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Exchange;
-import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Publisher;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
-import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.virtualhost.VirtualHost;
final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apache.qpid.server.exchange.Exchange.BindingListener
@@ -57,7 +56,7 @@ final class ExchangeAdapter extends Abst
public ExchangeAdapter(final VirtualHostAdapter virtualHostAdapter,
final org.apache.qpid.server.exchange.Exchange exchange)
{
- super(exchange.getId());
+ super(exchange.getId(), virtualHostAdapter.getTaskExecutor());
_statistics = new ExchangeStatistics();
_vhost = virtualHostAdapter;
_exchange = exchange;
@@ -113,8 +112,8 @@ final class ExchangeAdapter extends Abst
throws AccessControlException, IllegalStateException
{
attributes = new HashMap<String, Object>(attributes);
- String bindingKey = getStringAttribute(org.apache.qpid.server.model.Binding.NAME, attributes, "");
- Map<String, Object> bindingArgs = getMapAttribute(org.apache.qpid.server.model.Binding.ARGUMENTS, attributes, Collections.EMPTY_MAP);
+ String bindingKey = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Binding.NAME, attributes, "");
+ Map<String, Object> bindingArgs = MapValueConverter.getMapAttribute(org.apache.qpid.server.model.Binding.ARGUMENTS, attributes, Collections.<String,Object>emptyMap());
attributes.remove(org.apache.qpid.server.model.Binding.NAME);
attributes.remove(org.apache.qpid.server.model.Binding.ARGUMENTS);
@@ -257,7 +256,7 @@ final class ExchangeAdapter extends Abst
}
@Override
- public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
+ public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
{
if(childClass == org.apache.qpid.server.model.Binding.class)
{
@@ -369,28 +368,20 @@ final class ExchangeAdapter extends Abst
}
@Override
- public Object setAttribute(String name, Object expected, Object desired)
- throws IllegalStateException, AccessControlException, IllegalArgumentException
- {
- return super.setAttribute(name, expected, desired); //TODO - Implement
- }
-
- @Override
public Collection<String> getAttributeNames()
{
return AVAILABLE_ATTRIBUTES;
}
@Override
- public State setDesiredState(State currentState, State desiredState) throws IllegalStateTransitionException,
- AccessControlException
+ protected boolean setState(State currentState, State desiredState)
{
if (desiredState == State.DELETED)
{
delete();
- return State.DELETED;
+ return true;
}
- return super.setDesiredState(currentState, desiredState);
+ return false;
}
private class ExchangeStatistics implements Statistics
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/GroupProviderAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/GroupProviderAdapter.java?rev=1447646&r1=1447645&r2=1447646&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/GroupProviderAdapter.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/GroupProviderAdapter.java Tue Feb 19 09:35:28 2013
@@ -26,6 +26,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
@@ -37,33 +38,27 @@ import org.apache.qpid.server.model.Life
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.security.group.GroupManager;
+import org.apache.qpid.server.security.SecurityManager;
public class GroupProviderAdapter extends AbstractAdapter implements
GroupProvider
{
private final GroupManager _groupManager;
-
- protected GroupProviderAdapter(GroupManager groupManager)
+ private final Broker _broker;
+ public GroupProviderAdapter(UUID id, GroupManager groupManager, Broker broker)
{
- super(UUIDGenerator.generateRandomUUID());
+ super(id, broker.getTaskExecutor());
if (groupManager == null)
{
throw new IllegalArgumentException("GroupManager must not be null");
}
_groupManager = groupManager;
- }
-
- public static GroupProviderAdapter createGroupProviderAdapter(
- BrokerAdapter brokerAdapter, GroupManager groupManager)
- {
- final GroupProviderAdapter groupProviderAdapter = new GroupProviderAdapter(
- groupManager);
- groupProviderAdapter.addParent(Broker.class, brokerAdapter);
- return groupProviderAdapter;
+ _broker = broker;
+ addParent(Broker.class, broker);
}
@Override
@@ -180,7 +175,7 @@ public class GroupProviderAdapter extend
}
@Override
- public <C extends ConfiguredObject> C createChild(Class<C> childClass,
+ public <C extends ConfiguredObject> C addChild(Class<C> childClass,
Map<String, Object> attributes, ConfiguredObject... otherParents)
{
if (childClass == Group.class)
@@ -190,7 +185,7 @@ public class GroupProviderAdapter extend
if (getSecurityManager().authoriseGroupOperation(Operation.CREATE, groupName))
{
_groupManager.createGroup(groupName);
- return (C) new GroupAdapter(groupName);
+ return (C) new GroupAdapter(groupName, getTaskExecutor());
}
else
{
@@ -214,7 +209,7 @@ public class GroupProviderAdapter extend
Collection<Group> principals = new ArrayList<Group>(groups.size());
for (Principal group : groups)
{
- principals.add(new GroupAdapter(group.getName()));
+ principals.add(new GroupAdapter(group.getName(), getTaskExecutor()));
}
return (Collection<C>) Collections
.unmodifiableCollection(principals);
@@ -225,19 +220,18 @@ public class GroupProviderAdapter extend
}
}
- private org.apache.qpid.server.security.SecurityManager getSecurityManager()
+ private SecurityManager getSecurityManager()
{
- return ApplicationRegistry.getInstance().getSecurityManager();
+ return _broker.getSecurityManager();
}
private class GroupAdapter extends AbstractAdapter implements Group
{
private final String _group;
- public GroupAdapter(String group)
+ public GroupAdapter(String group, TaskExecutor taskExecutor)
{
- super(UUIDGenerator.generateGroupUUID(
- GroupProviderAdapter.this.getName(), group));
+ super(UUIDGenerator.generateGroupUUID(GroupProviderAdapter.this.getName(), group), taskExecutor);
_group = group;
}
@@ -319,7 +313,7 @@ public class GroupProviderAdapter extend
Collection<GroupMember> members = new ArrayList<GroupMember>();
for (Principal principal : usersInGroup)
{
- members.add(new GroupMemberAdapter(principal.getName()));
+ members.add(new GroupMemberAdapter(principal.getName(), getTaskExecutor()));
}
return (Collection<C>) Collections
.unmodifiableCollection(members);
@@ -332,7 +326,7 @@ public class GroupProviderAdapter extend
}
@Override
- public <C extends ConfiguredObject> C createChild(Class<C> childClass,
+ public <C extends ConfiguredObject> C addChild(Class<C> childClass,
Map<String, Object> attributes,
ConfiguredObject... otherParents)
{
@@ -343,7 +337,7 @@ public class GroupProviderAdapter extend
if (getSecurityManager().authoriseGroupOperation(Operation.UPDATE, _group))
{
_groupManager.addUserToGroup(memberName, _group);
- return (C) new GroupMemberAdapter(memberName);
+ return (C) new GroupMemberAdapter(memberName, getTaskExecutor());
}
else
{
@@ -378,15 +372,7 @@ public class GroupProviderAdapter extend
}
@Override
- public Object setAttribute(String name, Object expected, Object desired)
- throws IllegalStateException, AccessControlException,
- IllegalArgumentException
- {
- return super.setAttribute(name, expected, desired);
- }
-
- @Override
- public State setDesiredState(State currentState, State desiredState)
+ protected boolean setState(State currentState, State desiredState)
throws IllegalStateTransitionException, AccessControlException
{
if (desiredState == State.DELETED)
@@ -394,16 +380,15 @@ public class GroupProviderAdapter extend
if (getSecurityManager().authoriseGroupOperation(Operation.DELETE, _group))
{
_groupManager.removeGroup(_group);
- return State.DELETED;
+ return true;
}
else
{
- throw new AccessControlException("Do not have permission" +
- " to delete group");
+ throw new AccessControlException("Do not have permission to delete group");
}
}
- return super.setDesiredState(currentState, desiredState);
+ return false;
}
private class GroupMemberAdapter extends AbstractAdapter implements
@@ -411,12 +396,9 @@ public class GroupProviderAdapter extend
{
private String _memberName;
- public GroupMemberAdapter(String memberName)
+ public GroupMemberAdapter(String memberName, TaskExecutor taskExecutor)
{
- super(UUIDGenerator
- .generateGroupMemberUUID(
- GroupProviderAdapter.this.getName(), _group,
- memberName));
+ super(UUIDGenerator.generateGroupMemberUUID(GroupProviderAdapter.this.getName(), _group, memberName), taskExecutor);
_memberName = memberName;
}
@@ -522,7 +504,7 @@ public class GroupProviderAdapter extend
}
@Override
- public State setDesiredState(State currentState, State desiredState)
+ protected boolean setState(State currentState, State desiredState)
throws IllegalStateTransitionException,
AccessControlException
{
@@ -531,18 +513,38 @@ public class GroupProviderAdapter extend
if (getSecurityManager().authoriseGroupOperation(Operation.UPDATE, _group))
{
_groupManager.removeUserFromGroup(_memberName, _group);
- return State.DELETED;
+ return true;
}
else
{
- throw new AccessControlException("Do not have permission" +
- " to remove group member");
+ throw new AccessControlException("Do not have permission to remove group member");
}
}
-
- return super.setDesiredState(currentState, desiredState);
+ return false;
}
}
}
+
+ @Override
+ protected boolean setState(State currentState, State desiredState)
+ {
+ if (desiredState == State.ACTIVE)
+ {
+ return true;
+ }
+ else if (desiredState == State.STOPPED)
+ {
+ return true;
+ }
+ // TODO: DELETE state is ignored for now
+ // in case if we need to delete group provider, then we need AuthenticationProvider to be a change listener of it
+ // in order to remove deleted group provider from its group provider list
+ return false;
+ }
+
+ public Set<Principal> getGroupPrincipalsForUser(String username)
+ {
+ return _groupManager.getGroupPrincipalsForUser(username);
+ }
}
Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/KeyStoreAdapter.java (from r1447519, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AnonymousAuthenticationManagerFactory.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/KeyStoreAdapter.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/KeyStoreAdapter.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AnonymousAuthenticationManagerFactory.java&r1=1447519&r2=1447646&rev=1447646&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AnonymousAuthenticationManagerFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/KeyStoreAdapter.java Tue Feb 19 09:35:28 2013
@@ -1,4 +1,5 @@
/*
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,22 +18,31 @@
* under the License.
*
*/
-package org.apache.qpid.server.security.auth.manager;
+package org.apache.qpid.server.model.adapter;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
-import org.apache.commons.configuration.Configuration;
-import org.apache.qpid.server.plugin.AuthenticationManagerFactory;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.KeyStore;
-public class AnonymousAuthenticationManagerFactory implements AuthenticationManagerFactory
+public class KeyStoreAdapter extends AbstractKeyStoreAdapter implements KeyStore
{
- @Override
- public AuthenticationManager createInstance(Configuration configuration)
+
+ public KeyStoreAdapter(UUID id, Broker broker, Map<String, Object> attributes)
{
- if (configuration.subset("anonymous-auth-manager").isEmpty())
+ super(id, broker, attributes);
+ if (attributes.get(CERTIFICATE_ALIAS) != null)
{
- return null;
+ changeAttribute(CERTIFICATE_ALIAS, null, attributes.get(CERTIFICATE_ALIAS));
}
+ }
- return new AnonymousAuthenticationManager();
+ @Override
+ public Collection<String> getAttributeNames()
+ {
+ return AVAILABLE_ATTRIBUTES;
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java?rev=1447646&r1=1447645&r2=1447646&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java Tue Feb 19 09:35:28 2013
@@ -21,7 +21,16 @@
package org.apache.qpid.server.model.adapter;
+import java.security.AccessControlException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.LifetimePolicy;
@@ -30,119 +39,82 @@ import org.apache.qpid.server.model.Prot
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostAlias;
-import org.apache.qpid.server.protocol.AmqpProtocolVersion;
-import org.apache.qpid.server.transport.QpidAcceptor;
-
-import java.net.InetSocketAddress;
-import java.security.AccessControlException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
public class PortAdapter extends AbstractAdapter implements Port
{
- private final BrokerAdapter _broker;
- private final QpidAcceptor _acceptor;
- private final InetSocketAddress _address;
- private final Collection<Protocol> _protocols;
-
- public PortAdapter(BrokerAdapter brokerAdapter, QpidAcceptor acceptor, InetSocketAddress address)
- {
- super(UUIDGenerator.generateRandomUUID());
- _broker = brokerAdapter;
- _acceptor = acceptor;
- _address = address;
-
- List<Protocol> protocols = new ArrayList<Protocol>();
-
- for(AmqpProtocolVersion pv : _acceptor.getSupported())
- {
- switch(pv)
- {
- case v0_8:
- protocols.add(Protocol.AMQP_0_8);
- break;
- case v0_9:
- protocols.add(Protocol.AMQP_0_9);
- break;
- case v0_9_1:
- protocols.add(Protocol.AMQP_0_9_1);
- break;
- case v0_10:
- protocols.add(Protocol.AMQP_0_10);
- break;
- case v1_0_0:
- protocols.add(Protocol.AMQP_1_0);
- break;
- }
- }
- _protocols = Collections.unmodifiableCollection(protocols);
+ private final Broker _broker;
+ private AuthenticationProvider _authenticationProvider;
+
+ /*
+ * TODO register PortAceptor as a listener. For supporting multiple
+ * protocols on the same port we need to introduce a special entity like
+ * PortAceptor which will be responsible for port binding/unbinding
+ */
+ public PortAdapter(UUID id, Broker broker, Map<String, Object> attributes, Map<String, Object> defaults, TaskExecutor taskExecutor)
+ {
+ super(id, defaults, attributes, taskExecutor);
+ _broker = broker;
+
+ addParent(Broker.class, broker);
}
@Override
public String getBindingAddress()
{
- return _address.getHostName();
+ return (String)getAttribute(BINDING_ADDRESS);
}
@Override
public int getPort()
{
- return _address.getPort();
+ return (Integer)getAttribute(PORT);
}
+ @SuppressWarnings("unchecked")
@Override
public Collection<Transport> getTransports()
{
- switch (_acceptor.getTransport())
- {
- case TCP:
- return Collections.singleton(Transport.TCP);
- case SSL:
- return Collections.singleton(Transport.SSL);
- }
-
- return null; // TODO - Implement
+ return (Collection<Transport>)getAttribute(TRANSPORTS);
}
@Override
public void addTransport(Transport transport)
throws IllegalStateException, AccessControlException, IllegalArgumentException
{
- throw new IllegalStateException(); // TODO - Implement
+ throw new IllegalStateException();
}
@Override
public Transport removeTransport(Transport transport)
throws IllegalStateException, AccessControlException, IllegalArgumentException
{
- throw new IllegalStateException(); // TODO - Implement
+ throw new IllegalStateException();
}
+ @SuppressWarnings("unchecked")
@Override
public Collection<Protocol> getProtocols()
{
- return _protocols;
+ return (Collection<Protocol>)getAttribute(PROTOCOLS);
}
@Override
public void addProtocol(Protocol protocol)
throws IllegalStateException, AccessControlException, IllegalArgumentException
{
- throw new IllegalStateException(); // TODO - Implement
+ throw new IllegalStateException();
}
@Override
public Protocol removeProtocol(Protocol protocol)
throws IllegalStateException, AccessControlException, IllegalArgumentException
{
- throw new IllegalStateException(); // TODO - Implement
+ throw new IllegalStateException();
}
@Override
@@ -165,19 +137,19 @@ public class PortAdapter extends Abstrac
@Override
public Collection<Connection> getConnections()
{
- return null; // TODO - Implement
+ return null;
}
@Override
public String getName()
{
- return getBindingAddress() + ":" + getPort(); // TODO - Implement
+ return (String)getAttribute(NAME);
}
@Override
public String setName(String currentName, String desiredName) throws IllegalStateException, AccessControlException
{
- throw new IllegalStateException(); // TODO - Implement
+ throw new IllegalStateException();
}
@Override
@@ -189,7 +161,7 @@ public class PortAdapter extends Abstrac
@Override
public boolean isDurable()
{
- return false; // TODO - Implement
+ return false;
}
@Override
@@ -209,20 +181,20 @@ public class PortAdapter extends Abstrac
public LifetimePolicy setLifetimePolicy(LifetimePolicy expected, LifetimePolicy desired)
throws IllegalStateException, AccessControlException, IllegalArgumentException
{
- throw new IllegalStateException(); // TODO - Implement
+ throw new IllegalStateException();
}
@Override
public long getTimeToLive()
{
- return 0; // TODO - Implement
+ return 0;
}
@Override
public long setTimeToLive(long expected, long desired)
throws IllegalStateException, AccessControlException, IllegalArgumentException
{
- throw new IllegalStateException(); // TODO - Implement
+ throw new IllegalStateException();
}
@Override
@@ -257,10 +229,6 @@ public class PortAdapter extends Abstrac
{
return getId();
}
- else if(NAME.equals(name))
- {
- return getName();
- }
else if(STATE.equals(name))
{
return getActualState();
@@ -285,36 +253,54 @@ public class PortAdapter extends Abstrac
{
}
- else if(BINDING_ADDRESS.equals(name))
- {
- return getBindingAddress();
- }
- else if(PORT.equals(name))
+ return super.getAttribute(name);
+ }
+
+ @Override
+ public Collection<String> getAttributeNames()
+ {
+ return AVAILABLE_ATTRIBUTES;
+ }
+
+ @Override
+ public boolean setState(State currentState, State desiredState)
+ {
+ if (desiredState == State.DELETED)
{
- return getPort();
+ return true;
}
- else if(PROTOCOLS.equals(name))
+ else if (desiredState == State.ACTIVE)
{
- return getProtocols();
+ onActivate();
+ return true;
}
- else if(TRANSPORTS.equals(name))
+ else if (desiredState == State.STOPPED)
{
- return getTransports();
+ onStop();
+ return true;
}
+ return false;
+ }
- return super.getAttribute(name); //TODO - Implement
+ protected void onActivate()
+ {
+ // no-op: expected to be overridden by subclass
}
- @Override
- public Collection<String> getAttributeNames()
+ protected void onStop()
{
- return AVAILABLE_ATTRIBUTES;
+ // no-op: expected to be overridden by subclass
}
@Override
- public Object setAttribute(String name, Object expected, Object desired)
- throws IllegalStateException, AccessControlException, IllegalArgumentException
+ public AuthenticationProvider getAuthenticationProvider()
+ {
+ return _authenticationProvider;
+ }
+
+ public void setAuthenticationProvider(AuthenticationProvider authenticationProvider)
{
- return super.setAttribute(name, expected, desired); //TODO - Implement
+ _authenticationProvider = authenticationProvider;
}
+
}
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortFactory.java?rev=1447646&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortFactory.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortFactory.java Tue Feb 19 09:35:28 2013
@@ -0,0 +1,215 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model.adapter;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.Protocol.ProtocolType;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.util.MapValueConverter;
+
+public class PortFactory
+{
+ public static final int DEFAULT_AMQP_SEND_BUFFER_SIZE = 262144;
+ public static final int DEFAULT_AMQP_RECEIVE_BUFFER_SIZE = 262144;
+ public static final boolean DEFAULT_AMQP_NEED_CLIENT_AUTH = false;
+ public static final boolean DEFAULT_AMQP_WANT_CLIENT_AUTH = false;
+ public static final boolean DEFAULT_AMQP_TCP_NO_DELAY = true;
+ public static final String DEFAULT_AMQP_BINDING = "*";
+ public static final Transport DEFAULT_TRANSPORT = Transport.TCP;
+
+ private final Collection<Protocol> _defaultProtocols;
+
+ public PortFactory()
+ {
+ Set<Protocol> defaultProtocols = EnumSet.of(Protocol.AMQP_0_8, Protocol.AMQP_0_9, Protocol.AMQP_0_9_1,
+ Protocol.AMQP_0_10, Protocol.AMQP_1_0);
+ String excludedProtocols = System.getProperty(BrokerProperties.PROPERTY_BROKER_DEFAULT_AMQP_PROTOCOL_EXCLUDES);
+ if (excludedProtocols != null)
+ {
+ String[] excludes = excludedProtocols.split(",");
+ for (String exclude : excludes)
+ {
+ Protocol protocol = Protocol.valueOf(exclude);
+ defaultProtocols.remove(protocol);
+ }
+ }
+ String includedProtocols = System.getProperty(BrokerProperties.PROPERTY_BROKER_DEFAULT_AMQP_PROTOCOL_INCLUDES);
+ if (includedProtocols != null)
+ {
+ String[] includes = includedProtocols.split(",");
+ for (String include : includes)
+ {
+ Protocol protocol = Protocol.valueOf(include);
+ defaultProtocols.add(protocol);
+ }
+ }
+ _defaultProtocols = Collections.unmodifiableCollection(defaultProtocols);
+ }
+
+ public Port createPort(UUID id, Broker broker, Map<String, Object> objectAttributes)
+ {
+ Map<String, Object> attributes = retrieveAttributes(objectAttributes);
+
+ final Port port;
+ Map<String, Object> defaults = new HashMap<String, Object>();
+ defaults.put(Port.TRANSPORTS, Collections.singleton(DEFAULT_TRANSPORT));
+ Object portValue = attributes.get(Port.PORT);
+ if (portValue == null)
+ {
+ throw new IllegalConfigurationException("Port attribute is not specified for port: " + attributes);
+ }
+ if (isAmqpProtocol(attributes))
+ {
+ Object binding = attributes.get(Port.BINDING_ADDRESS);
+ if (binding == null)
+ {
+ binding = DEFAULT_AMQP_BINDING;
+ defaults.put(Port.BINDING_ADDRESS, DEFAULT_AMQP_BINDING);
+ }
+ defaults.put(Port.NAME, binding + ":" + portValue);
+ defaults.put(Port.PROTOCOLS, _defaultProtocols);
+ defaults.put(Port.TCP_NO_DELAY, DEFAULT_AMQP_TCP_NO_DELAY);
+ defaults.put(Port.WANT_CLIENT_AUTH, DEFAULT_AMQP_WANT_CLIENT_AUTH);
+ defaults.put(Port.NEED_CLIENT_AUTH, DEFAULT_AMQP_NEED_CLIENT_AUTH);
+ defaults.put(Port.RECEIVE_BUFFER_SIZE, DEFAULT_AMQP_RECEIVE_BUFFER_SIZE);
+ defaults.put(Port.SEND_BUFFER_SIZE, DEFAULT_AMQP_SEND_BUFFER_SIZE);
+ port = new AmqpPortAdapter(id, broker, attributes, defaults, broker.getTaskExecutor());
+ }
+ else
+ {
+ @SuppressWarnings("unchecked")
+ Collection<Protocol> protocols = (Collection<Protocol>)attributes.get(Port.PROTOCOLS);
+ if (protocols.size() > 1)
+ {
+ throw new IllegalConfigurationException("Only one protocol can be used on non AMQP port");
+ }
+ Protocol protocol = protocols.iterator().next();
+ defaults.put(Port.NAME, portValue + "-" + protocol.name());
+ port = new PortAdapter(id, broker, attributes, defaults, broker.getTaskExecutor());
+ }
+ return port;
+ }
+
+ private Map<String, Object> retrieveAttributes(Map<String, Object> objectAttributes)
+ {
+ Map<String, Object> attributes = new HashMap<String, Object>(objectAttributes);
+
+ if (objectAttributes.containsKey(Port.PROTOCOLS))
+ {
+ final Set<Protocol> protocolSet = MapValueConverter.getEnumSetAttribute(Port.PROTOCOLS, objectAttributes, Protocol.class);
+ attributes.put(Port.PROTOCOLS, protocolSet);
+ }
+
+ if (objectAttributes.containsKey(Port.TRANSPORTS))
+ {
+ final Set<Transport> transportSet = MapValueConverter.getEnumSetAttribute(Port.TRANSPORTS, objectAttributes,
+ Transport.class);
+ attributes.put(Port.TRANSPORTS, transportSet);
+ }
+
+ if (objectAttributes.containsKey(Port.PORT))
+ {
+ Integer port = MapValueConverter.getIntegerAttribute(Port.PORT, objectAttributes);
+ attributes.put(Port.PORT, port);
+ }
+
+ if (objectAttributes.containsKey(Port.TCP_NO_DELAY))
+ {
+ boolean tcpNoDelay = MapValueConverter.getBooleanAttribute(Port.TCP_NO_DELAY, objectAttributes);
+ attributes.put(Port.TCP_NO_DELAY, tcpNoDelay);
+ }
+
+ if (objectAttributes.containsKey(Port.RECEIVE_BUFFER_SIZE))
+ {
+ int receiveBufferSize = MapValueConverter.getIntegerAttribute(Port.RECEIVE_BUFFER_SIZE, objectAttributes);
+ attributes.put(Port.RECEIVE_BUFFER_SIZE, receiveBufferSize);
+ }
+
+ if (objectAttributes.containsKey(Port.SEND_BUFFER_SIZE))
+ {
+ int sendBufferSize = MapValueConverter.getIntegerAttribute(Port.SEND_BUFFER_SIZE, objectAttributes);
+ attributes.put(Port.SEND_BUFFER_SIZE, sendBufferSize);
+ }
+
+ if (objectAttributes.containsKey(Port.NEED_CLIENT_AUTH))
+ {
+ boolean needClientAuth = MapValueConverter.getBooleanAttribute(Port.NEED_CLIENT_AUTH, objectAttributes);
+ attributes.put(Port.NEED_CLIENT_AUTH, needClientAuth);
+ }
+
+ if (objectAttributes.containsKey(Port.WANT_CLIENT_AUTH))
+ {
+ boolean wantClientAuth = MapValueConverter.getBooleanAttribute(Port.WANT_CLIENT_AUTH, objectAttributes);
+ attributes.put(Port.WANT_CLIENT_AUTH, wantClientAuth);
+ }
+
+ if (objectAttributes.containsKey(Port.BINDING_ADDRESS))
+ {
+ String binding = MapValueConverter.getStringAttribute(Port.BINDING_ADDRESS, objectAttributes);
+ attributes.put(Port.BINDING_ADDRESS, binding);
+ }
+ return attributes;
+ }
+
+ private boolean isAmqpProtocol(Map<String, Object> portAttributes)
+ {
+ @SuppressWarnings("unchecked")
+ Set<Protocol> protocols = (Set<Protocol>) portAttributes.get(Port.PROTOCOLS);
+ if (protocols == null || protocols.isEmpty())
+ {
+ // defaulting to AMQP if protocol is not specified
+ return true;
+ }
+
+ Set<ProtocolType> protocolTypes = new HashSet<ProtocolType>();
+ for (Protocol protocolObject : protocols)
+ {
+ protocolTypes.add(protocolObject.getProtocolType());
+ }
+
+ if (protocolTypes.size() > 1)
+ {
+ throw new IllegalConfigurationException("Found different protocol types '" + protocolTypes
+ + "' on port configuration: " + portAttributes);
+ }
+
+ return protocolTypes.contains(ProtocolType.AMQP);
+ }
+
+ public Collection<Protocol> getDefaultProtocols()
+ {
+ return _defaultProtocols;
+ }
+
+}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java?rev=1447646&r1=1447645&r2=1447646&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java Tue Feb 19 09:35:28 2013
@@ -43,6 +43,7 @@ import org.apache.qpid.server.model.Stat
import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.util.MapValueConverter;
final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.SubscriptionRegistrationListener, AMQQueue.NotificationListener
{
@@ -79,7 +80,7 @@ final class QueueAdapter extends Abstrac
public QueueAdapter(final VirtualHostAdapter virtualHostAdapter, final AMQQueue queue)
{
- super(queue.getId());
+ super(queue.getId(), virtualHostAdapter.getTaskExecutor());
_vhost = virtualHostAdapter;
addParent(org.apache.qpid.server.model.VirtualHost.class, virtualHostAdapter);
@@ -206,47 +207,47 @@ final class QueueAdapter extends Abstrac
}
@Override
- public Object setAttribute(String name, Object expected, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException
+ public boolean changeAttribute(String name, Object expected, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException
{
try
{
if(ALERT_REPEAT_GAP.equals(name))
{
_queue.setMinimumAlertRepeatGap((Long)desired);
- return desired;
+ return true;
}
else if(ALERT_THRESHOLD_MESSAGE_AGE.equals(name))
{
_queue.setMaximumMessageAge((Long)desired);
- return desired;
+ return true;
}
else if(ALERT_THRESHOLD_MESSAGE_SIZE.equals(name))
{
_queue.setMaximumMessageSize((Long)desired);
- return desired;
+ return true;
}
else if(ALERT_THRESHOLD_QUEUE_DEPTH_BYTES.equals(name))
{
_queue.setMaximumQueueDepth((Long)desired);
- return desired;
+ return true;
}
else if(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES.equals(name))
{
_queue.setMaximumMessageCount((Long)desired);
- return desired;
+ return true;
}
else if(ALTERNATE_EXCHANGE.equals(name))
{
// In future we may want to accept a UUID as an alternative way to identifying the exchange
ExchangeAdapter alternateExchange = (ExchangeAdapter) desired;
_queue.setAlternateExchange(alternateExchange == null ? null : alternateExchange.getExchange());
- return desired;
+ return true;
}
else if(EXCLUSIVE.equals(name))
{
Boolean exclusiveFlag = (Boolean) desired;
_queue.setExclusive(exclusiveFlag);
- return desired;
+ return true;
}
else if(MESSAGE_GROUP_KEY.equals(name))
{
@@ -267,7 +268,7 @@ final class QueueAdapter extends Abstrac
else if(MAXIMUM_DELIVERY_ATTEMPTS.equals(name))
{
_queue.setMaximumDeliveryCount((Integer)desired);
- return desired;
+ return true;
}
else if(NO_LOCAL.equals(name))
{
@@ -280,12 +281,12 @@ final class QueueAdapter extends Abstrac
else if(QUEUE_FLOW_CONTROL_SIZE_BYTES.equals(name))
{
_queue.setCapacity((Long)desired);
- return desired;
+ return true;
}
else if(QUEUE_FLOW_RESUME_SIZE_BYTES.equals(name))
{
_queue.setFlowResumeCapacity((Long)desired);
- return desired;
+ return true;
}
else if(QUEUE_FLOW_STOPPED.equals(name))
{
@@ -302,10 +303,10 @@ final class QueueAdapter extends Abstrac
else if (DESCRIPTION.equals(name))
{
_queue.setDescription((String) desired);
- return desired;
+ return true;
}
- return super.setAttribute(name, expected, desired);
+ return super.changeAttribute(name, expected, desired);
}
finally
{
@@ -496,8 +497,8 @@ final class QueueAdapter extends Abstrac
throws AccessControlException, IllegalStateException
{
attributes = new HashMap<String, Object>(attributes);
- String bindingKey = getStringAttribute(org.apache.qpid.server.model.Binding.NAME, attributes, "");
- Map<String, Object> bindingArgs = getMapAttribute(org.apache.qpid.server.model.Binding.ARGUMENTS, attributes, Collections.EMPTY_MAP);
+ String bindingKey = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Binding.NAME, attributes, "");
+ Map<String, Object> bindingArgs = MapValueConverter.getMapAttribute(org.apache.qpid.server.model.Binding.ARGUMENTS, attributes, Collections.<String,Object>emptyMap());
attributes.remove(org.apache.qpid.server.model.Binding.NAME);
attributes.remove(org.apache.qpid.server.model.Binding.ARGUMENTS);
@@ -509,7 +510,7 @@ final class QueueAdapter extends Abstrac
@Override
- public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
+ public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
{
if(childClass == org.apache.qpid.server.model.Binding.class)
{
@@ -713,15 +714,15 @@ final class QueueAdapter extends Abstrac
}
@Override
- public State setDesiredState(State currentState, State desiredState) throws IllegalStateTransitionException,
+ protected boolean setState(State currentState, State desiredState) throws IllegalStateTransitionException,
AccessControlException
{
if (desiredState == State.DELETED)
{
delete();
- return State.DELETED;
+ return true;
}
- return super.setDesiredState(currentState, desiredState);
+ return false;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org