You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2012/03/13 16:56:54 UTC
svn commit: r1300204 [7/8] - in
/qpid/branches/java-config-and-management/qpid/java: ./
broker-plugins/experimental/shutdown/src/main/java/org/apache/qpid/shutdown/
broker-plugins/extras/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker...
Added: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java?rev=1300204&view=auto
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java (added)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java Tue Mar 13 15:56:45 2012
@@ -0,0 +1,445 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model.adapter;
+
+import java.security.AccessControlException;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.WeakHashMap;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.Statistics;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostAlias;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.security.SecurityManager;
+
+final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, ExchangeRegistry.RegistryChangeListener,
+ QueueRegistry.RegistryChangeListener,
+ IConnectionRegistry.RegistryChangeListener
+{
+
+ private final org.apache.qpid.server.virtualhost.VirtualHost _virtualHost;
+
+ private final Map<AMQConnectionModel, ConnectionAdapter> _connectionAdapters =
+ new HashMap<AMQConnectionModel, ConnectionAdapter>();
+
+ private final Map<AMQQueue, QueueAdapter> _queueAdapters =
+ new HashMap<AMQQueue, QueueAdapter>();
+
+ private final Map<org.apache.qpid.server.exchange.Exchange, ExchangeAdapter> _exchangeAdapters =
+ new HashMap<org.apache.qpid.server.exchange.Exchange, ExchangeAdapter>();
+ private final StatisticsAdapter _statistics;
+
+
+ public VirtualHostAdapter(final org.apache.qpid.server.virtualhost.VirtualHost virtualHost)
+ {
+ _virtualHost = virtualHost;
+ _statistics = new StatisticsAdapter(virtualHost);
+ virtualHost.getQueueRegistry().addRegistryChangeListener(this);
+ populateQueues();
+ virtualHost.getExchangeRegistry().addRegistryChangeListener(this);
+ populateExchanges();
+ virtualHost.getConnectionRegistry().addRegistryChangeListener(this);
+ populateConnections();
+
+ }
+
+
+ private void populateExchanges()
+ {
+ Collection<org.apache.qpid.server.exchange.Exchange> actualExchanges =
+ _virtualHost.getExchangeRegistry().getExchanges();
+
+ synchronized (_exchangeAdapters)
+ {
+ for(org.apache.qpid.server.exchange.Exchange exchange : actualExchanges)
+ {
+ if(!_exchangeAdapters.containsKey(exchange))
+ {
+ _exchangeAdapters.put(exchange, new ExchangeAdapter(this,exchange));
+ }
+ }
+ }
+ }
+
+
+ private void populateQueues()
+ {
+ Collection<AMQQueue> actualQueues = _virtualHost.getQueueRegistry().getQueues();
+
+ synchronized(_queueAdapters)
+ {
+ for(AMQQueue queue : actualQueues)
+ {
+ if(!_queueAdapters.containsKey(queue))
+ {
+ _queueAdapters.put(queue, new QueueAdapter(this,queue));
+ }
+ }
+ }
+ }
+
+ private void populateConnections()
+ {
+
+ List<AMQConnectionModel> actualConnections = _virtualHost.getConnectionRegistry().getConnections();
+
+ synchronized(_connectionAdapters)
+ {
+ for(AMQConnectionModel conn : actualConnections)
+ {
+ if(!_connectionAdapters.containsKey(conn))
+ {
+ _connectionAdapters.put(conn, new ConnectionAdapter(conn));
+ }
+ }
+ }
+
+ }
+
+ public String getReplicationGroupName()
+ {
+ return null; //TODO
+ }
+
+ public Collection<VirtualHostAlias> getAliases()
+ {
+ return null; //TODO
+ }
+
+ public Collection<Connection> getConnections()
+ {
+ synchronized(_connectionAdapters)
+ {
+ return new ArrayList<Connection>(_connectionAdapters.values());
+ }
+
+ }
+
+ public Collection<Queue> getQueues()
+ {
+ synchronized(_queueAdapters)
+ {
+ return new ArrayList<Queue>(_queueAdapters.values());
+ }
+ }
+
+ public Collection<Exchange> getExchanges()
+ {
+ synchronized (_exchangeAdapters)
+ {
+ return new ArrayList<Exchange>(_exchangeAdapters.values());
+ }
+ }
+
+ public Exchange createExchange(final String name,
+ final State initialState,
+ final boolean durable,
+ final LifetimePolicy lifetime,
+ final long ttl,
+ final String type,
+ final Map<String, Object> attributes)
+ throws AccessControlException, IllegalArgumentException
+ {
+ try
+ {
+ org.apache.qpid.server.exchange.Exchange exchange =
+ _virtualHost.getExchangeFactory().createExchange(name, type, durable,
+ lifetime == LifetimePolicy.AUTO_DELETE);
+ _virtualHost.getExchangeRegistry().registerExchange(exchange);
+ if(durable)
+ {
+ _virtualHost.getDurableConfigurationStore().createExchange(exchange);
+ }
+
+ synchronized (_exchangeAdapters)
+ {
+ return _exchangeAdapters.get(exchange);
+ }
+ }
+ catch(AMQException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ public Queue createQueue(final String name,
+ final State initialState,
+ final boolean durable,
+ final LifetimePolicy lifetime,
+ final long ttl,
+ final Map<String, Object> attributes)
+ throws AccessControlException, IllegalArgumentException
+ {
+ boolean exclusive = false;
+ String owner;
+ if(exclusive)
+ {
+ owner = null;
+ }
+ else
+ {
+ Set<Principal> principals =
+ SecurityManager.getThreadSubject().getPrincipals();
+ if(principals != null && !principals.isEmpty())
+ {
+ owner = principals.iterator().next().getName();
+ }
+ else
+ {
+ owner = null;
+ }
+ }
+ try
+ {
+ AMQQueue queue =
+ AMQQueueFactory.createAMQQueueImpl(name, durable, owner, lifetime == LifetimePolicy.AUTO_DELETE,
+ exclusive,
+ _virtualHost, attributes);
+ _virtualHost.getQueueRegistry().registerQueue(queue);
+ if(durable)
+ {
+ _virtualHost.getDurableConfigurationStore().createQueue(queue, FieldTable.convertToFieldTable(attributes));
+ }
+ synchronized (_queueAdapters)
+ {
+ return _queueAdapters.get(queue);
+ }
+
+ }
+ catch(AMQException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
+
+ }
+
+ public String getName()
+ {
+ return _virtualHost.getName();
+ }
+
+ public String setName(final String currentName, final String desiredName)
+ throws IllegalStateException, AccessControlException
+ {
+ throw new IllegalStateException();
+ }
+
+ public State getActualState()
+ {
+ return getDesiredState();
+ }
+
+ public boolean isDurable()
+ {
+ return true;
+ }
+
+ public void setDurable(final boolean durable)
+ throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ throw new IllegalStateException();
+ }
+
+ public LifetimePolicy getLifetimePolicy()
+ {
+ return LifetimePolicy.PERMANENT;
+ }
+
+ public LifetimePolicy setLifetimePolicy(final LifetimePolicy expected, final LifetimePolicy desired)
+ throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ throw new IllegalStateException();
+ }
+
+ public long getTimeToLive()
+ {
+ return 0;
+ }
+
+ public long setTimeToLive(final long expected, final long desired)
+ throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ throw new IllegalStateException();
+ }
+
+ public Statistics getStatistics()
+ {
+ return _statistics;
+ }
+
+ private final Map<org.apache.qpid.server.subscription.Subscription, SubscriptionAdapter> _allSubscriptions =
+ new WeakHashMap<org.apache.qpid.server.subscription.Subscription, SubscriptionAdapter>();
+
+ public SubscriptionAdapter getOrCreateAdapter(final org.apache.qpid.server.subscription.Subscription subscription)
+ {
+ synchronized(_allSubscriptions)
+ {
+ SubscriptionAdapter adapter = _allSubscriptions.get(subscription);
+ if(adapter == null)
+ {
+ adapter = new SubscriptionAdapter(subscription);
+ _allSubscriptions.put(subscription, adapter);
+ }
+ return adapter;
+ }
+ }
+
+ public void exchangeRegistered(org.apache.qpid.server.exchange.Exchange exchange)
+ {
+ ExchangeAdapter adapter = null;
+ synchronized (_exchangeAdapters)
+ {
+ if(!_exchangeAdapters.containsKey(exchange))
+ {
+ adapter = new ExchangeAdapter(this, exchange);
+ _exchangeAdapters.put(exchange, adapter);
+
+ }
+
+ }
+ if(adapter != null)
+ {
+ childAdded(adapter);
+ }
+
+ }
+
+
+ public void exchangeUnregistered(org.apache.qpid.server.exchange.Exchange exchange)
+ {
+ ExchangeAdapter adapter;
+ synchronized (_exchangeAdapters)
+ {
+ adapter = _exchangeAdapters.remove(exchange);
+
+ }
+
+ if(adapter != null)
+ {
+ childRemoved(adapter);
+ }
+ }
+
+ public void queueRegistered(AMQQueue queue)
+ {
+ QueueAdapter adapter = null;
+ synchronized (_queueAdapters)
+ {
+ if(!_queueAdapters.containsKey(queue))
+ {
+ adapter = new QueueAdapter(this, queue);
+ _queueAdapters.put(queue, adapter);
+
+ }
+
+ }
+ if(adapter != null)
+ {
+ childAdded(adapter);
+ }
+
+ }
+
+ public void queueUnregistered(AMQQueue queue)
+ {
+
+ QueueAdapter adapter;
+ synchronized (_queueAdapters)
+ {
+ adapter = _queueAdapters.remove(queue);
+
+ }
+
+ if(adapter != null)
+ {
+ childRemoved(adapter);
+ }
+ }
+
+ public void connectionRegistered(AMQConnectionModel connection)
+ {
+ ConnectionAdapter adapter = null;
+ synchronized (_connectionAdapters)
+ {
+ if(!_connectionAdapters.containsKey(connection))
+ {
+ adapter = new ConnectionAdapter(connection);
+ _connectionAdapters.put(connection, adapter);
+
+ }
+
+ }
+ if(adapter != null)
+ {
+ childAdded(adapter);
+ }
+ }
+
+ public void connectionUnregistered(AMQConnectionModel connection)
+ {
+
+ ConnectionAdapter adapter;
+ synchronized (_connectionAdapters)
+ {
+ adapter = _connectionAdapters.remove(connection);
+
+ }
+
+ if(adapter != null)
+ {
+ childRemoved(adapter);
+ }
+ }
+
+ QueueAdapter getQueueAdapter(AMQQueue queue)
+ {
+ synchronized (_queueAdapters)
+ {
+ return _queueAdapters.get(queue);
+ }
+ }
+
+ public void deleteQueue(Queue queue)
+ throws AccessControlException, IllegalStateException
+ {
+ // TODO
+ throw new UnsupportedOperationException("Not Yet Implemented");
+ }
+}
Added: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/AbstractConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/AbstractConfiguredObject.java?rev=1300204&view=auto
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/AbstractConfiguredObject.java (added)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/AbstractConfiguredObject.java Tue Mar 13 15:56:45 2012
@@ -0,0 +1,288 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model.impl;
+
+import org.apache.qpid.server.model.ConfigurationChangeListener;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.IllegalStateTransitionException;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.State;
+
+import java.security.AccessControlException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+abstract class AbstractConfiguredObject implements ConfiguredObject
+{
+ public static final Map<String, Object> EMPTY_ATTRIBUTE_MAP =
+ Collections.<String, Object>emptyMap();
+ public static final Map<Class<? extends ConfiguredObject>, ConfiguredObject> EMPTY_PARENT_MAP =
+ Collections.<Class<? extends ConfiguredObject>, ConfiguredObject>emptyMap();
+
+ private UUID _id;
+ private String _name;
+ private State _state;
+ private boolean _isDurable;
+ private LifetimePolicy _lifetimePolicy;
+ private long _timeToLive;
+
+
+ private final Map<String,Object> _attributes = new HashMap<String, Object>();
+ private final Map<Class<? extends ConfiguredObject>, ConfiguredObject> _parents =
+ new HashMap<Class<? extends ConfiguredObject>, ConfiguredObject>();
+
+ private final Collection<ConfigurationChangeListener> _changeListeners =
+ new ArrayList<ConfigurationChangeListener>();
+
+ protected AbstractConfiguredObject(final UUID id,
+ final String name,
+ final State state,
+ final boolean durable,
+ final LifetimePolicy lifetimePolicy,
+ final long timeToLive,
+ final Map<String, Object> attributes,
+ final Map<Class<? extends ConfiguredObject>, ConfiguredObject> parents)
+ {
+ _id = id;
+ _name = name;
+ _state = state;
+ _isDurable = durable;
+ _lifetimePolicy = lifetimePolicy;
+ _timeToLive = timeToLive;
+ _attributes.putAll(attributes);
+ _parents.putAll(parents);
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public String setName(final String currentName, final String desiredName)
+ throws IllegalStateException, AccessControlException
+ {
+ if(desiredName == null)
+ {
+ throw new NullPointerException("The name may not be null");
+ }
+
+ synchronized (getLock())
+ {
+ if(_name.equals(currentName))
+ {
+ _name = desiredName;
+ }
+ return _name;
+ }
+ }
+
+ public State getDesiredState()
+ {
+ return _state;
+ }
+
+ public State setDesiredState(final State currentState, final State desiredState)
+ throws IllegalStateTransitionException, AccessControlException
+ {
+ synchronized (getLock())
+ {
+ if(_state == currentState && currentState != desiredState)
+ {
+ _state = desiredState;
+ for(ConfigurationChangeListener listener : _changeListeners)
+ {
+ listener.stateChanged(this, currentState, desiredState);
+ }
+ }
+ return _state;
+ }
+ }
+
+ public void addChangeListener(final ConfigurationChangeListener listener)
+ {
+ if(listener == null)
+ {
+ throw new NullPointerException("Cannot add a null listener");
+ }
+ synchronized (getLock())
+ {
+ if(!_changeListeners.contains(listener))
+ {
+ _changeListeners.add(listener);
+ }
+ }
+ }
+
+ public boolean removeChangeListener(final ConfigurationChangeListener listener)
+ {
+ if(listener == null)
+ {
+ throw new NullPointerException("Cannot remove a null listener");
+ }
+ synchronized (getLock())
+ {
+ return _changeListeners.remove(listener);
+ }
+ }
+
+ public boolean isDurable()
+ {
+ return _isDurable;
+ }
+
+ public void setDurable(final boolean durable)
+ throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ synchronized (getLock())
+ {
+ _isDurable = durable;
+ }
+ }
+
+ public LifetimePolicy getLifetimePolicy()
+ {
+ return _lifetimePolicy;
+ }
+
+ public LifetimePolicy setLifetimePolicy(final LifetimePolicy expected, final LifetimePolicy desired)
+ throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ synchronized(getLock())
+ {
+ if((_lifetimePolicy == null && expected == null)
+ || (_lifetimePolicy != null && _lifetimePolicy.equals(expected)))
+ {
+ _lifetimePolicy = desired;
+ }
+
+ return _lifetimePolicy;
+
+ }
+ }
+
+ public long getTimeToLive()
+ {
+ return _timeToLive;
+ }
+
+ public long setTimeToLive(final long expected, final long desired)
+ throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ synchronized(getLock())
+ {
+ if(_timeToLive == expected)
+ {
+ _timeToLive = desired;
+ }
+ return _timeToLive;
+ }
+ }
+
+ public Collection<String> getAttributeNames()
+ {
+ synchronized(_attributes)
+ {
+ return new ArrayList<String>(_attributes.keySet());
+ }
+ }
+
+ public Object getAttribute(final String name)
+ {
+ synchronized (getLock())
+ {
+ return _attributes.get(name);
+ }
+ }
+
+ public Object setAttribute(final String name, final Object expected, final Object desired)
+ throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ synchronized (getLock())
+ {
+ Object currentValue = _attributes.get(name);
+ if ((currentValue == null && expected == null && desired != null)
+ || (currentValue != null && currentValue.equals(expected) && !currentValue.equals(desired)))
+ {
+ _attributes.put(name, desired);
+ return desired;
+ }
+ else
+ {
+ return currentValue;
+ }
+ }
+ }
+
+
+ public <T extends ConfiguredObject> T getParent(final Class<T> clazz)
+ {
+ synchronized (getLock())
+ {
+ return (T) _parents.get(clazz);
+ }
+ }
+
+ protected <T extends ConfiguredObject> void addParent(Class<T> clazz, T parent)
+ {
+ synchronized (getLock())
+ {
+ _parents.put(clazz, parent);
+ }
+ }
+
+ protected <T extends ConfiguredObject> void removeParent(Class<T> clazz)
+ {
+ synchronized (getLock())
+ {
+ _parents.remove(clazz);
+ }
+ }
+
+ protected void notifyChildAddedListener(ConfiguredObject child)
+ {
+ for (ConfigurationChangeListener listener : _changeListeners)
+ {
+ listener.childAdded(this, child);
+ }
+ }
+
+ protected void notifyChildRemovedListener(ConfiguredObject child)
+ {
+ synchronized (getLock())
+ {
+ for (ConfigurationChangeListener listener : _changeListeners)
+ {
+ listener.childRemoved(this, child);
+ }
+ }
+ }
+
+ abstract protected Object getLock();
+}
Added: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/BrokerImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/BrokerImpl.java?rev=1300204&view=auto
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/BrokerImpl.java (added)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/BrokerImpl.java Tue Mar 13 15:56:45 2012
@@ -0,0 +1,145 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model.impl;
+
+import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.Statistics;
+import org.apache.qpid.server.model.VirtualHost;
+
+import java.security.AccessControlException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public class BrokerImpl extends AbstractConfiguredObject implements Broker
+{
+ private final Collection<VirtualHost> _virtualHosts = new ArrayList<VirtualHost>();
+ private final Collection<Port> _ports = new ArrayList<Port>();
+ private final Collection<AuthenticationProvider> _authenticationProviders = new ArrayList<AuthenticationProvider>();
+
+ public BrokerImpl(final UUID id,
+ final String name,
+ final State state,
+ final boolean durable,
+ final LifetimePolicy lifetimePolicy,
+ final long timeToLive,
+ final Map<String, Object> attributes)
+ {
+ super(id, name, state, durable, lifetimePolicy, timeToLive, attributes, Collections.EMPTY_MAP);
+ }
+
+ @Override
+ protected Object getLock()
+ {
+ return this;
+ }
+
+ public Collection<VirtualHost> getVirtualHosts()
+ {
+ synchronized (getLock())
+ {
+ return new ArrayList<VirtualHost>(_virtualHosts);
+ }
+ }
+
+ public Collection<Port> getPorts()
+ {
+ synchronized (getLock())
+ {
+ return new ArrayList<Port>(_ports);
+ }
+ }
+
+ public Collection<AuthenticationProvider> getAuthenticationProviders()
+ {
+ synchronized (getLock())
+ {
+ return new ArrayList<AuthenticationProvider>(_authenticationProviders);
+ }
+ }
+
+ public State getActualState()
+ {
+ return getDesiredState();
+ }
+
+ public Statistics getStatistics()
+ {
+ // TODO
+ return null;
+ }
+
+ public VirtualHost createVirtualHost(String name, State initialState,boolean durable,
+ LifetimePolicy lifetime, long ttl, Map<String, Object> attributes)
+ throws AccessControlException, IllegalArgumentException
+ {
+ // TODO - check name is valid and not reserved
+ // TODO - check permissions
+
+ synchronized (getLock())
+ {
+ for(VirtualHost virtualHost : _virtualHosts)
+ {
+ if(virtualHost.getName().equals(name))
+ {
+ throw new IllegalArgumentException("A virtual host with the name '"+name+"' already exists");
+ }
+ }
+ VirtualHostImpl vhost = new VirtualHostImpl(UUID.randomUUID(),
+ name,
+ initialState,
+ durable,
+ lifetime,
+ ttl,
+ attributes,
+ this);
+ _virtualHosts.add(vhost);
+
+ // TODO - create a mapping for each port with "default" authentication provider and alias of the vhost name?
+
+ notifyChildAddedListener(vhost);
+ return vhost;
+ }
+ }
+
+ public void deleteVirtualHost(VirtualHost virtualHost)
+ {
+ synchronized (getLock())
+ {
+ boolean found = _virtualHosts.remove(virtualHost);
+ if (!found)
+ {
+ throw new IllegalArgumentException("A virtual host with the name '" + virtualHost.getName() + "' does not exist");
+ }
+ notifyChildRemovedListener(virtualHost);
+ }
+ }
+
+}
Added: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/ExchangeImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/ExchangeImpl.java?rev=1300204&view=auto
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/ExchangeImpl.java (added)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/ExchangeImpl.java Tue Mar 13 15:56:45 2012
@@ -0,0 +1,96 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model.impl;
+
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Publisher;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.Statistics;
+import org.apache.qpid.server.model.VirtualHost;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+class ExchangeImpl extends AbstractConfiguredObject implements Exchange
+{
+
+ private final VirtualHostImpl _virtualHost;
+
+ ExchangeImpl(final UUID id,
+ final String name,
+ final State state,
+ final boolean durable,
+ final LifetimePolicy lifetimePolicy,
+ final long timeToLive,
+ final String exchangeType,
+ final Map<String, Object> attributes,
+ final VirtualHostImpl parent)
+ {
+ super(id, name, state, durable, lifetimePolicy, timeToLive, fixAttributes(attributes, exchangeType),
+ (Map) Collections.singletonMap(VirtualHost.class, parent));
+ _virtualHost = parent;
+
+ }
+
+ private static Map<String, Object> fixAttributes(final Map<String, Object> attributes, final String exchangeType)
+ {
+ Map<String,Object> fixedAttributes = new HashMap<String, Object>(attributes);
+ fixedAttributes.put(EXCHANGE_TYPE, exchangeType);
+ return fixedAttributes;
+ }
+
+ @Override
+ protected Object getLock()
+ {
+ return _virtualHost.getLock();
+ }
+
+ public String getExchangeType()
+ {
+ return (String) getAttribute(EXCHANGE_TYPE);
+ }
+
+ public Collection<Binding> getBindings()
+ {
+ return null; //TODO
+ }
+
+ public Collection<Publisher> getPublishers()
+ {
+ return null; //TODO
+ }
+
+ public State getActualState()
+ {
+ State vhostState = _virtualHost.getActualState();
+ return vhostState == State.ACTIVE ? getDesiredState() : vhostState;
+ }
+
+ public Statistics getStatistics()
+ {
+ return null; // TODO
+ }
+}
Added: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/QueueImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/QueueImpl.java?rev=1300204&view=auto
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/QueueImpl.java (added)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/QueueImpl.java Tue Mar 13 15:56:45 2012
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model.impl;
+
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.Statistics;
+import org.apache.qpid.server.model.Subscription;
+import org.apache.qpid.server.model.VirtualHost;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+
+class QueueImpl extends AbstractConfiguredObject implements Queue
+{
+
+ private final VirtualHostImpl _virtualHost;
+
+ QueueImpl(final UUID id,
+ final String name,
+ final State state,
+ final boolean durable,
+ final LifetimePolicy lifetimePolicy,
+ final long timeToLive,
+ final Map<String, Object> attributes,
+ final VirtualHostImpl parent)
+ {
+ super(id, name, state, durable, lifetimePolicy, timeToLive, attributes,
+ (Map) Collections.singletonMap(VirtualHost.class, parent));
+
+ _virtualHost = parent;
+
+ }
+
+ @Override
+ protected Object getLock()
+ {
+ return _virtualHost.getLock();
+ }
+
+ public Collection<Binding> getBindings()
+ {
+ return null; //TODO
+ }
+
+ public Collection<Subscription> getSubscriptions()
+ {
+ return null; //TODO
+ }
+
+ public State getActualState()
+ {
+ State vhostState = _virtualHost.getActualState();
+ return vhostState == State.ACTIVE ? getDesiredState() : vhostState;
+ }
+
+ public Statistics getStatistics()
+ {
+ return null; //TODO
+ }
+}
Added: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/VirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/VirtualHostImpl.java?rev=1300204&view=auto
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/VirtualHostImpl.java (added)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/impl/VirtualHostImpl.java Tue Mar 13 15:56:45 2012
@@ -0,0 +1,233 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model.impl;
+
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.Statistics;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostAlias;
+
+import java.security.AccessControlException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+class VirtualHostImpl extends AbstractConfiguredObject implements VirtualHost
+{
+ // attribute names
+ private static final String REPLICATION_GROUP_NAME = "replicationGroupName";
+
+ private final Collection<VirtualHostAlias> _aliases = new ArrayList<VirtualHostAlias>();
+ private final Collection<Connection> _connections = new ArrayList<Connection>();
+ private final Collection<Queue> _queues = new ArrayList<Queue>();
+ private final Collection<Exchange> _exchanges = new ArrayList<Exchange>();
+
+ private final Map<String, Queue> _queueMap = new ConcurrentHashMap<String, Queue>();
+ private final Map<String, Exchange> _exchangeMap= new ConcurrentHashMap<String, Exchange>();
+
+ private final BrokerImpl _broker;
+
+
+ VirtualHostImpl(final UUID id,
+ final String name,
+ final State state,
+ final boolean durable,
+ final LifetimePolicy lifetimePolicy,
+ final long timeToLive,
+ final Map<String, Object> attributes,
+ final BrokerImpl parent)
+ {
+ super(id, name, state, durable, lifetimePolicy, timeToLive, attributes,
+ (Map) Collections.singletonMap(Broker.class, parent));
+
+ _broker = parent;
+ }
+
+ @Override
+ protected Object getLock()
+ {
+ return this;
+ }
+
+ public String getReplicationGroupName()
+ {
+ return (String) getAttribute(REPLICATION_GROUP_NAME);
+ }
+
+ @Override
+ public Object setAttribute(final String name, final Object expected, final Object desired)
+ throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ synchronized(getLock())
+ {
+ if(REPLICATION_GROUP_NAME.equals(name))
+ {
+ if(getActualState() != State.STOPPED)
+ {
+ throw new IllegalStateException("A virtual host must be stopped before you can change the replication group");
+ }
+ if(!(desired instanceof String))
+ {
+ throw new IllegalArgumentException("The desired replication group MUST be a String");
+ }
+ }
+ return super.setAttribute(name, expected, desired);
+ }
+ }
+
+ public Statistics getStatistics()
+ {
+ return null; //TODO
+ }
+
+ public Collection<VirtualHostAlias> getAliases()
+ {
+ synchronized(getLock())
+ {
+ return new ArrayList<VirtualHostAlias>(_aliases);
+ }
+ }
+
+ public Collection<Connection> getConnections()
+ {
+ synchronized (getLock())
+ {
+ return new ArrayList<Connection>(_connections);
+ }
+ }
+
+ public Collection<Queue> getQueues()
+ {
+ synchronized (getLock())
+ {
+ return new ArrayList<Queue>(_queues);
+ }
+ }
+
+ public Collection<Exchange> getExchanges()
+ {
+ synchronized (getLock())
+ {
+ return new ArrayList<Exchange>(_exchanges);
+ }
+ }
+
+ public State getActualState()
+ {
+ final State brokerActualState = _broker.getActualState();
+ return brokerActualState == State.ACTIVE ? getDesiredState() : brokerActualState;
+ }
+
+
+ public Exchange createExchange(String name, State initialState,boolean durable,
+ LifetimePolicy lifetime, long ttl, String type, Map<String, Object> attributes)
+ throws AccessControlException, IllegalArgumentException
+ {
+ // TODO - check name is valid and not reserved
+ // TODO - check type
+ // TODO - check permissions
+
+ synchronized (getLock())
+ {
+ for(Exchange exchange : _exchanges)
+ {
+ if(exchange.getName().equals(name))
+ {
+ throw new IllegalArgumentException("A exchange with the name '"+name+"' already exists");
+ }
+ }
+ ExchangeImpl exchange = new ExchangeImpl(UUID.randomUUID(),
+ name,
+ initialState,
+ durable,
+ lifetime,
+ ttl,
+ type,
+ attributes,
+ this);
+ _exchanges.add(exchange);
+ _exchangeMap.put(name, exchange);
+
+ notifyChildAddedListener(exchange);
+ return exchange;
+ }
+ }
+
+
+ public Queue createQueue(String name, State initialState,boolean durable,
+ LifetimePolicy lifetime, long ttl, Map<String, Object> attributes)
+ throws AccessControlException, IllegalArgumentException
+ {
+ // TODO - check name is valid and not reserved
+ // TODO - check permissions
+
+ synchronized (getLock())
+ {
+ for(Queue queue : _queues)
+ {
+ if(queue.getName().equals(name))
+ {
+ throw new IllegalArgumentException("A queue with the name '"+name+"' already exists");
+ }
+ }
+ QueueImpl queue = new QueueImpl(UUID.randomUUID(),
+ name,
+ initialState,
+ durable,
+ lifetime,
+ ttl,
+ attributes,
+ this);
+
+ _queues.add(queue);
+ _queueMap.put(name, queue);
+
+ notifyChildAddedListener(queue);
+ // TODO - add binding to default exchange?, or make the default exchange work directly off the map held here
+
+ return queue;
+ }
+ }
+
+ @Override
+ public void deleteQueue(Queue queue)
+ {
+ synchronized (getLock())
+ {
+ boolean found = _queues.remove(queue);
+ if (!found)
+ {
+ throw new IllegalArgumentException("A queue with the name '" + queue.getName()+ "' does not exist");
+ }
+ _queueMap.remove(queue.getName());
+ notifyChildRemovedListener(queue);
+ }
+ }
+}
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/OsgiSystemPackages.properties
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/OsgiSystemPackages.properties?rev=1300204&r1=1300203&r2=1300204&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/OsgiSystemPackages.properties (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/OsgiSystemPackages.properties Tue Mar 13 15:56:45 2012
@@ -31,6 +31,11 @@
javax.management.openmbean=1.0.0
javax.management=1.0.0
+javax.management.remote.rmi=1.0.0
+javax.management.remote=1.0.0
+
+javax.servlet=2
+javax.servlet.http=2
javax.security.auth=1.0.0
javax.security.auth.callback=1.0.0
@@ -46,6 +51,9 @@ org.osgi.service.startlevel=1.0.0
org.osgi.service.url=1.0.0
org.osgi.util.tracker=1.0.0
+org.apache.commons.codec=1.3.0
+org.apache.commons.codec.binary=1.3.0
+
org.apache.commons.configuration=1.0.0
org.apache.commons.lang=1.0.0
@@ -56,6 +64,12 @@ org.apache.log4j=1.2.12
org.slf4j=1.6.1
+org.mortbay.jetty=6.1.14
+org.mortbay.jetty.servlet=6.1.14
+
+org.codehaus.jackson=1.9.0
+org.codehaus.jackson.map=1.9.0
+
# For Qpid packages (org.apache.qpid), the version number is automatically overridden by QpidPropertis#getReleaseVersion()
org.apache.qpid.junit.extensions.util=0.0.0
@@ -64,16 +78,22 @@ org.apache.qpid.common=0.0.0
org.apache.qpid.exchange=0.0.0
org.apache.qpid.framing=0.0.0
org.apache.qpid.management.common.mbeans.annotations=0.0.0
+org.apache.qpid.management.common.mbeans=0.0.0
org.apache.qpid.protocol=0.0.0
org.apache.qpid.transport=0.0.0
org.apache.qpid.transport.codec=0.0.0
org.apache.qpid.server.binding=0.0.0
+org.apache.qpid.server.model=0.0.0
+org.apache.qpid.server.model.adapter=0.0.0
+org.apache.qpid.server.model.impl=0.0.0
org.apache.qpid.server.configuration=0.0.0
org.apache.qpid.server.configuration.plugins=0.0.0
org.apache.qpid.server.configuration.management=0.0.0
+org.apache.qpid.server.connection=0.0.0
org.apache.qpid.server.exchange=0.0.0
org.apache.qpid.server.logging=0.0.0
org.apache.qpid.server.logging.actors=0.0.0
+org.apache.qpid.server.logging.messages=0.0.0
org.apache.qpid.server.logging.subjects=0.0.0
org.apache.qpid.server.message=0.0.0
org.apache.qpid.server.management=0.0.0
@@ -88,6 +108,8 @@ org.apache.qpid.server.security.access.p
org.apache.qpid.server.security.auth=0.0.0
org.apache.qpid.server.security.auth.sasl=0.0.0
org.apache.qpid.server.security.auth.manager=0.0.0
+org.apache.qpid.server.security.auth.rmi=0.0.0
+org.apache.qpid.server.stats=0.0.0
org.apache.qpid.server.virtualhost=0.0.0
org.apache.qpid.server.virtualhost.plugins=0.0.0
org.apache.qpid.util=0.0.0
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1300204&r1=1300203&r2=1300204&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Tue Mar 13 15:56:45 2012
@@ -46,6 +46,7 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.actors.ManagementActor;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
+import org.apache.qpid.server.management.AMQProtocolSessionMBean;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -184,6 +185,7 @@ public class AMQProtocolEngine implement
_registry = virtualHostRegistry.getApplicationRegistry();
initialiseStatistics();
+
}
public void setNetworkConnection(NetworkConnection network)
@@ -238,7 +240,7 @@ public class AMQProtocolEngine implement
return new WriteDeliverMethod(channelId);
}
- public void received(final ByteBuffer msg)
+ public synchronized void received(final ByteBuffer msg)
{
final long arrivalTime = System.currentTimeMillis();
_lastReceivedTime = arrivalTime;
@@ -1342,12 +1344,7 @@ public class AMQProtocolEngine implement
public List<AMQSessionModel> getSessionModels()
{
- List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
- for (AMQChannel channel : getChannels())
- {
- sessions.add((AMQSessionModel) channel);
- }
- return sessions;
+ return new ArrayList<AMQSessionModel>(getChannels());
}
public LogSubject getLogSubject()
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1300204&r1=1300203&r2=1300204&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Mar 13 15:56:45 2012
@@ -36,6 +36,7 @@ import org.apache.qpid.server.store.Tran
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -79,6 +80,7 @@ public interface AMQQueue extends Managa
void unregisterSubscription(final Subscription subscription) throws AMQException;
+ Collection<Subscription> getConsumers();
int getConsumerCount();
@@ -150,9 +152,9 @@ public interface AMQQueue extends Managa
{
boolean visit(QueueEntry entry);
}
-
+
void visit(Visitor visitor);
-
+
long getMaximumMessageSize();
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java?rev=1300204&r1=1300203&r2=1300204&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java Tue Mar 13 15:56:45 2012
@@ -23,6 +23,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -32,6 +33,8 @@ public class DefaultQueueRegistry implem
private ConcurrentMap<AMQShortString, AMQQueue> _queueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>();
private final VirtualHost _virtualHost;
+ private final Collection<RegistryChangeListener> _listeners =
+ new ArrayList<RegistryChangeListener>();
public DefaultQueueRegistry(VirtualHost virtualHost)
{
@@ -46,11 +49,28 @@ public class DefaultQueueRegistry implem
public void registerQueue(AMQQueue queue)
{
_queueMap.put(queue.getNameShortString(), queue);
+ synchronized (_listeners)
+ {
+ for(RegistryChangeListener listener : _listeners)
+ {
+ listener.queueRegistered(queue);
+ }
+ }
}
public void unregisterQueue(AMQShortString name)
{
- _queueMap.remove(name);
+ AMQQueue q = _queueMap.remove(name);
+ if(q != null)
+ {
+ synchronized (_listeners)
+ {
+ for(RegistryChangeListener listener : _listeners)
+ {
+ listener.queueUnregistered(q);
+ }
+ }
+ }
}
public AMQQueue getQueue(AMQShortString name)
@@ -72,4 +92,12 @@ public class DefaultQueueRegistry implem
{
return getQueue(new AMQShortString(queue));
}
+
+ public void addRegistryChangeListener(RegistryChangeListener listener)
+ {
+ synchronized(_listeners)
+ {
+ _listeners.add(listener);
+ }
+ }
}
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?rev=1300204&r1=1300203&r2=1300204&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java Tue Mar 13 15:56:45 2012
@@ -27,7 +27,7 @@ public enum NotificationCheck
MESSAGE_COUNT_ALERT
{
- boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ public boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
int msgCount;
final long maximumMessageCount = queue.getMaximumMessageCount();
@@ -41,7 +41,7 @@ public enum NotificationCheck
},
MESSAGE_SIZE_ALERT(true)
{
- boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ public boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
final long maximumMessageSize = queue.getMaximumMessageSize();
if(maximumMessageSize != 0)
@@ -63,7 +63,7 @@ public enum NotificationCheck
},
QUEUE_DEPTH_ALERT
{
- boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ public boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
// Check for threshold queue depth in bytes
final long maximumQueueDepth = queue.getMaximumQueueDepth();
@@ -84,7 +84,7 @@ public enum NotificationCheck
},
MESSAGE_AGE_ALERT
{
- boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ public boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
final long maxMessageAge = queue.getMaximumMessageAge();
@@ -126,6 +126,6 @@ public enum NotificationCheck
return _messageSpecific;
}
- abstract boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener);
+ public abstract boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener);
}
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java?rev=1300204&r1=1300203&r2=1300204&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java Tue Mar 13 15:56:45 2012
@@ -40,4 +40,13 @@ public interface QueueRegistry
Collection<AMQQueue> getQueues();
AMQQueue getQueue(String queue);
+
+ void addRegistryChangeListener(RegistryChangeListener listener);
+
+ interface RegistryChangeListener
+ {
+ void queueRegistered(AMQQueue queue);
+ void queueUnregistered(AMQQueue queue);
+
+ }
}
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1300204&r1=1300203&r2=1300204&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Tue Mar 13 15:56:45 2012
@@ -19,6 +19,7 @@
package org.apache.qpid.server.queue;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
@@ -52,6 +53,7 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.actors.QueueActor;
import org.apache.qpid.server.logging.messages.QueueMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
+import org.apache.qpid.server.management.AMQQueueMBean;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -339,7 +341,7 @@ public class SimpleAMQQueue implements A
{
return _exclusive;
}
-
+
public void setExclusive(boolean exclusive) throws AMQException
{
_exclusive = exclusive;
@@ -430,8 +432,8 @@ public class SimpleAMQQueue implements A
{
throw new AMQSecurityException("Permission denied");
}
-
-
+
+
if (hasExclusiveSubscriber())
{
throw new ExistingExclusiveSubscription();
@@ -464,14 +466,14 @@ public class SimpleAMQQueue implements A
subscription.setNoLocal(_nolocal);
}
_subscriptionList.add(subscription);
-
+
//Increment consumerCountHigh if necessary. (un)registerSubscription are both
//synchronized methods so we don't need additional synchronization here
if(_counsumerCountHigh.get() < getConsumerCount())
{
_counsumerCountHigh.incrementAndGet();
}
-
+
if (isDeleted())
{
subscription.queueDeleted(this);
@@ -526,6 +528,18 @@ public class SimpleAMQQueue implements A
}
+ public Collection<Subscription> getConsumers()
+ {
+ List<Subscription> consumers = new ArrayList<Subscription>();
+ SubscriptionList.SubscriptionNodeIterator iter = _subscriptionList.iterator();
+ while(iter.advance())
+ {
+ consumers.add(iter.getNode().getSubscription());
+ }
+ return consumers;
+
+ }
+
public void resetSubPointersForGroups(Subscription subscription, boolean clearAssignments)
{
QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription);
@@ -576,10 +590,10 @@ public class SimpleAMQQueue implements A
break;
}
}
-
+
reconfigure();
}
-
+
private void reconfigure()
{
//Reconfigure the queue for to reflect this new binding.
@@ -604,7 +618,7 @@ public class SimpleAMQQueue implements A
public void removeBinding(final Binding binding)
{
_bindings.remove(binding);
-
+
reconfigure();
}
@@ -738,8 +752,8 @@ public class SimpleAMQQueue implements A
{
try
{
- if (!sub.isSuspended()
- && subscriptionReadyAndHasInterest(sub, entry)
+ if (!sub.isSuspended()
+ && subscriptionReadyAndHasInterest(sub, entry)
&& mightAssign(sub, entry)
&& !sub.wouldSuspend(entry))
{
@@ -800,15 +814,15 @@ public class SimpleAMQQueue implements A
{
getAtomicQueueCount().incrementAndGet();
}
-
+
private void incrementTxnEnqueueStats(final ServerMessage message)
{
_msgTxnEnqueues.incrementAndGet();
_byteTxnEnqueues.addAndGet(message.getSize());
}
-
+
private void incrementTxnDequeueStats(QueueEntry entry)
- {
+ {
_msgTxnDequeues.incrementAndGet();
_byteTxnDequeues.addAndGet(entry.getSize());
}
@@ -888,7 +902,7 @@ public class SimpleAMQQueue implements A
{
_deliveredMessages.decrementAndGet();
}
-
+
if(sub != null && sub.isSessionTransactional())
{
incrementTxnDequeueStats(entry);
@@ -941,11 +955,13 @@ public class SimpleAMQQueue implements A
}
}
+
+
public int getConsumerCount()
{
return _subscriptionList.size();
}
-
+
public int getConsumerCountHigh()
{
return _counsumerCountHigh.get();
@@ -1412,7 +1428,7 @@ public class SimpleAMQQueue implements A
}
public long clearQueue() throws AMQException
- {
+ {
return clear(0l);
}
@@ -1423,7 +1439,7 @@ public class SimpleAMQQueue implements A
{
throw new AMQSecurityException("Permission denied: queue " + getName());
}
-
+
QueueEntryIterator queueListIterator = _entries.iterator();
long count = 0;
@@ -1490,7 +1506,7 @@ public class SimpleAMQQueue implements A
{
throw new AMQSecurityException("Permission denied: " + getName());
}
-
+
if (!_deleted.getAndSet(true))
{
@@ -2357,22 +2373,22 @@ public class SimpleAMQQueue implements A
{
return _dequeueSize.get();
}
-
+
public long getByteTxnEnqueues()
{
return _byteTxnEnqueues.get();
}
-
+
public long getByteTxnDequeues()
{
return _byteTxnDequeues.get();
}
-
+
public long getMsgTxnEnqueues()
{
return _msgTxnEnqueues.get();
}
-
+
public long getMsgTxnDequeues()
{
return _msgTxnDequeues.get();
@@ -2409,21 +2425,21 @@ public class SimpleAMQQueue implements A
{
return _unackedMsgCountHigh.get();
}
-
+
public long getUnackedMessageCount()
{
return _unackedMsgCount.get();
}
-
+
public void decrementUnackedMsgCount()
{
_unackedMsgCount.decrementAndGet();
}
-
+
private void incrementUnackedMsgCount()
{
long unackedMsgCount = _unackedMsgCount.incrementAndGet();
-
+
long unackedMsgCountHigh;
while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get()))
{
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=1300204&r1=1300203&r2=1300204&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Tue Mar 13 15:56:45 2012
@@ -90,7 +90,7 @@ public abstract class ApplicationRegistr
private AuthenticationManager _authenticationManager;
- private VirtualHostRegistry _virtualHostRegistry;
+ private final VirtualHostRegistry _virtualHostRegistry = new VirtualHostRegistry(this);
private SecurityManager _securityManager;
@@ -109,7 +109,7 @@ public abstract class ApplicationRegistr
private BrokerConfig _broker;
private ConfigStore _configStore;
-
+
private Timer _reportingTimer;
private boolean _statisticsEnabled = false;
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
@@ -121,9 +121,12 @@ public abstract class ApplicationRegistr
return _logger;
}
- protected Map<InetSocketAddress, QpidAcceptor> getAcceptors()
+ public Map<InetSocketAddress, QpidAcceptor> getAcceptors()
{
- return _acceptors;
+ synchronized (_acceptors)
+ {
+ return new HashMap<InetSocketAddress, QpidAcceptor>(_acceptors);
+ }
}
protected void setManagedObjectRegistry(ManagedObjectRegistry managedObjectRegistry)
@@ -136,11 +139,6 @@ public abstract class ApplicationRegistr
_authenticationManager = authenticationManager;
}
- protected void setVirtualHostRegistry(VirtualHostRegistry virtualHostRegistry)
- {
- _virtualHostRegistry = virtualHostRegistry;
- }
-
protected void setSecurityManager(SecurityManager securityManager)
{
_securityManager = securityManager;
@@ -303,7 +301,7 @@ public abstract class ApplicationRegistr
//Create the composite (log4j+SystemOut MessageLogger to be used during startup
RootMessageLogger[] messageLoggers = {new SystemOutMessageLogger(), _rootMessageLogger};
_startupMessageLogger = new CompositeStartupMessageLogger(messageLoggers);
-
+
CurrentActor.set(new BrokerActor(_startupMessageLogger));
try
@@ -316,8 +314,6 @@ public abstract class ApplicationRegistr
logStartupMessages(CurrentActor.get());
- _virtualHostRegistry = new VirtualHostRegistry(this);
-
_securityManager = new SecurityManager(_configuration, _pluginManager);
_authenticationManager = createAuthenticationManager();
@@ -347,7 +343,7 @@ public abstract class ApplicationRegistr
/**
* Iterates across all discovered authentication manager factories, offering the security configuration to each.
* Expects <b>exactly</b> one authentication manager to configure and initialise itself.
- *
+ *
* It is an error to configure more than one authentication manager, or to configure none.
*
* @return authentication manager
@@ -357,15 +353,15 @@ public abstract class ApplicationRegistr
{
final SecurityConfiguration securityConfiguration = _configuration.getConfiguration(SecurityConfiguration.class.getName());
final Collection<AuthenticationManagerPluginFactory<? extends Plugin>> factories = _pluginManager.getAuthenticationManagerPlugins().values();
-
+
if (factories.size() == 0)
{
throw new ConfigurationException("No authentication manager factory plugins found. Check the desired authentication" +
"manager plugin has been placed in the plugins directory.");
}
-
+
AuthenticationManager authMgr = null;
-
+
for (final Iterator<AuthenticationManagerPluginFactory<? extends Plugin>> iterator = factories.iterator(); iterator.hasNext();)
{
final AuthenticationManagerPluginFactory<? extends Plugin> factory = (AuthenticationManagerPluginFactory<? extends Plugin>) iterator.next();
@@ -387,7 +383,7 @@ public abstract class ApplicationRegistr
{
throw new ConfigurationException("No authentication managers configured within the configure file.");
}
-
+
return authMgr;
}
@@ -404,19 +400,19 @@ public abstract class ApplicationRegistr
{
_managedObjectRegistry = new NoopManagedObjectRegistry();
}
-
+
public void initialiseStatisticsReporting()
{
long report = _configuration.getStatisticsReportingPeriod() * 1000; // convert to ms
final boolean broker = _configuration.isStatisticsGenerationBrokerEnabled();
final boolean virtualhost = _configuration.isStatisticsGenerationVirtualhostsEnabled();
final boolean reset = _configuration.isStatisticsReportResetEnabled();
-
+
/* add a timer task to report statistics if generation is enabled for broker or virtualhosts */
if (report > 0L && (broker || virtualhost))
{
_reportingTimer = new Timer("Statistics-Reporting", true);
-
+
_reportingTimer.scheduleAtFixedRate(new StatisticsReportingTask(broker, virtualhost, reset),
@@ -636,7 +632,7 @@ public abstract class ApplicationRegistr
{
return _rootMessageLogger;
}
-
+
public RootMessageLogger getCompositeStartupMessageLogger()
{
return _startupMessageLogger;
@@ -669,7 +665,7 @@ public abstract class ApplicationRegistr
getBroker().addVirtualHost(virtualHost);
return virtualHost;
}
-
+
public void registerMessageDelivered(long messageSize)
{
if (isStatisticsEnabled())
@@ -678,7 +674,7 @@ public abstract class ApplicationRegistr
_dataDelivered.registerEvent(messageSize);
}
}
-
+
public void registerMessageReceived(long messageSize, long timestamp)
{
if (isStatisticsEnabled())
@@ -687,34 +683,34 @@ public abstract class ApplicationRegistr
_dataReceived.registerEvent(messageSize, timestamp);
}
}
-
+
public StatisticsCounter getMessageReceiptStatistics()
{
return _messagesReceived;
}
-
+
public StatisticsCounter getDataReceiptStatistics()
{
return _dataReceived;
}
-
+
public StatisticsCounter getMessageDeliveryStatistics()
{
return _messagesDelivered;
}
-
+
public StatisticsCounter getDataDeliveryStatistics()
{
return _dataDelivered;
}
-
+
public void resetStatistics()
{
_messagesDelivered.reset();
_dataDelivered.reset();
_messagesReceived.reset();
_dataReceived.reset();
-
+
for (VirtualHost vhost : _virtualHostRegistry.getVirtualHosts())
{
vhost.resetStatistics();
@@ -725,7 +721,7 @@ public abstract class ApplicationRegistr
{
setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
getConfiguration().isStatisticsGenerationBrokerEnabled());
-
+
_messagesDelivered = new StatisticsCounter("messages-delivered");
_dataDelivered = new StatisticsCounter("bytes-delivered");
_messagesReceived = new StatisticsCounter("messages-received");
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java?rev=1300204&r1=1300203&r2=1300204&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java Tue Mar 13 15:56:45 2012
@@ -37,6 +37,7 @@ import org.apache.qpid.server.virtualhos
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import java.net.InetSocketAddress;
+import java.util.Map;
import java.util.UUID;
public interface IApplicationRegistry extends StatisticsGatherer
@@ -94,6 +95,9 @@ public interface IApplicationRegistry ex
ConfigStore getConfigStore();
void setConfigStore(ConfigStore store);
-
+
void initialiseStatisticsReporting();
+
+ Map<InetSocketAddress, QpidAcceptor> getAcceptors();
+
}
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java?rev=1300204&r1=1300203&r2=1300204&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java Tue Mar 13 15:56:45 2012
@@ -28,10 +28,11 @@ import org.apache.qpid.configuration.Pro
import org.apache.qpid.configuration.PropertyUtils;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
+import org.apache.qpid.server.management.AMQUserManagementMBean;
import org.apache.qpid.server.security.auth.AuthenticationResult;
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
-import org.apache.qpid.server.security.auth.management.AMQUserManagementMBean;
+import org.apache.qpid.server.management.AMQUserManagementMBean;
import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
import org.apache.qpid.server.security.auth.sasl.JCAProvider;
import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java?rev=1300204&r1=1300203&r2=1300204&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java Tue Mar 13 15:56:45 2012
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.security.auth.rmi;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.AuthenticationResult;
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
@@ -83,7 +84,14 @@ public class RMIPasswordAuthenticator im
// Verify that an AuthenticationManager has been set.
if (_authenticationManager == null)
{
- throw new SecurityException(UNABLE_TO_LOOKUP);
+ if(ApplicationRegistry.getInstance().getAuthenticationManager() != null)
+ {
+ _authenticationManager = ApplicationRegistry.getInstance().getAuthenticationManager();
+ }
+ else
+ {
+ throw new SecurityException(UNABLE_TO_LOOKUP);
+ }
}
final AuthenticationResult result = _authenticationManager.authenticate(username, password);
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1300204&r1=1300203&r2=1300204&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Tue Mar 13 15:56:45 2012
@@ -30,6 +30,7 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.management.ServerConnectionMBean;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1300204&r1=1300203&r2=1300204&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Tue Mar 13 15:56:45 2012
@@ -219,7 +219,7 @@ public class ServerConnectionDelegate ex
}
@Override
- protected int getChannelMax()
+ public int getChannelMax()
{
return _maxNoOfChannels;
}
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1300204&r1=1300203&r2=1300204&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java Tue Mar 13 15:56:45 2012
@@ -29,7 +29,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.AMQBrokerManagerMBean;
+import org.apache.qpid.server.management.AMQBrokerManagerMBean;
import org.apache.qpid.server.binding.BindingFactory;
import org.apache.qpid.server.configuration.BrokerConfig;
import org.apache.qpid.server.configuration.ConfigStore;
@@ -51,8 +51,8 @@ import org.apache.qpid.server.logging.Lo
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.management.VirtualHostMBean;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
@@ -69,8 +69,6 @@ import org.apache.qpid.server.txn.DtxReg
import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin;
import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
-import javax.management.NotCompliantMBeanException;
-import javax.management.ObjectName;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -151,7 +149,7 @@ public class VirtualHostImpl implements
_securityManager = new SecurityManager(_appRegistry.getSecurityManager());
_securityManager.configureHostPlugins(_configuration);
- _virtualHostMBean = new VirtualHostMBean();
+ _virtualHostMBean = new VirtualHostMBean(this);
_connectionRegistry = new ConnectionRegistry();
@@ -226,36 +224,6 @@ public class VirtualHostImpl implements
}
/**
- * Virtual host JMX MBean class.
- *
- * This has some of the methods implemented from management intrerface for exchanges. Any
- * implementaion of an Exchange MBean should extend this class.
- */
- public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
- {
- public VirtualHostMBean() throws NotCompliantMBeanException
- {
- super(ManagedVirtualHost.class, ManagedVirtualHost.TYPE);
- }
-
- public String getObjectInstanceName()
- {
- return ObjectName.quote(_name);
- }
-
- public String getName()
- {
- return _name;
- }
-
- public VirtualHostImpl getVirtualHost()
- {
- return VirtualHostImpl.this;
- }
- }
-
-
- /**
* Initialise a housekeeping task to iterate over queues cleaning expired messages with no consumers
* and checking for idle or open transactions that have exceeded the permitted thresholds.
*
Modified: qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java?rev=1300204&r1=1300203&r2=1300204&view=diff
==============================================================================
--- qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java (original)
+++ qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java Tue Mar 13 15:56:45 2012
@@ -22,10 +22,12 @@ package org.apache.qpid.server.virtualho
import org.apache.qpid.common.Closeable;
import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -37,6 +39,8 @@ public class VirtualHostRegistry impleme
private String _defaultVirtualHostName;
private ApplicationRegistry _applicationRegistry;
+ private final Collection<RegistryChangeListener> _listeners =
+ Collections.synchronizedCollection(new ArrayList<RegistryChangeListener>());
public VirtualHostRegistry(ApplicationRegistry applicationRegistry)
{
@@ -50,11 +54,25 @@ public class VirtualHostRegistry impleme
throw new Exception("Virtual Host with name " + host.getName() + " already registered.");
}
_registry.put(host.getName(),host);
+ synchronized (_listeners)
+ {
+ for(RegistryChangeListener listener : _listeners)
+ {
+ listener.virtualHostRegistered(host);
+ }
+ }
}
public synchronized void unregisterVirtualHost(VirtualHost host)
{
_registry.remove(host.getName());
+ synchronized (_listeners)
+ {
+ for(RegistryChangeListener listener : _listeners)
+ {
+ listener.virtualHostUnregistered(host);
+ }
+ }
}
public VirtualHost getVirtualHost(String name)
@@ -106,4 +124,17 @@ public class VirtualHostRegistry impleme
}
}
+
+ public static interface RegistryChangeListener
+ {
+ void virtualHostRegistered(VirtualHost virtualHost);
+ void virtualHostUnregistered(VirtualHost virtualHost);
+
+ }
+
+ public void addRegistryChangeListener(RegistryChangeListener listener)
+ {
+ _listeners.add(listener);
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org