You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/04/17 03:07:36 UTC
svn commit: r1588126 [1/3] - in /qpid/trunk/qpid/java:
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/
broker-core/src/main/java/org/apache/qpid/server/ broker-core/src/mai...
Author: rgodfrey
Date: Thu Apr 17 01:07:34 2014
New Revision: 1588126
URL: http://svn.apache.org/r1588126
Log:
QPID-5709 : [Java Broker] Replace exchange registry / factory with use of common configured object mechanism for registration of children
Added:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeFactory.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeFactory.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeFactory.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeFactory.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
- copied, changed from r1588125, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java
Removed:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupManager.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupManagerFactory.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java
qpid/trunk/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.GroupManagerFactory
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/group/FileGroupManagerFactoryTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/group/FileGroupManagerTest.java
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeExistsException.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeIsAlternateException.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/RequiredExchangeException.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
qpid/trunk/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/BrokerConfigurationStoreCreatorTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileKeyStoreCreationTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ConfigurationEntryStoreTestCase.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStoreTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStoreTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/trunk/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/GroupProviderRestTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java?rev=1588126&r1=1588125&r2=1588126&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java Thu Apr 17 01:07:34 2014
@@ -127,10 +127,16 @@ public class BDBHAVirtualHost extends Ab
{
_messageStore.getEnvironmentFacade().getEnvironment().flushLog(true);
- ConfiguredObjectRecordHandler upgraderRecoverer = new ConfiguredObjectRecordRecoveverAndUpgrader(this, getDurableConfigurationRecoverers());
- _messageStore.visitConfiguredObjectRecords(upgraderRecoverer);
-
- initialiseModel();
+ if(isStoreEmpty())
+ {
+ createDefaultExchanges();
+ }
+ else
+ {
+ ConfiguredObjectRecordHandler upgraderRecoverer =
+ new ConfiguredObjectRecordRecoveverAndUpgrader(this, getDurableConfigurationRecoverers());
+ _messageStore.visitConfiguredObjectRecords(upgraderRecoverer);
+ }
new MessageStoreRecoverer(this, getMessageStoreLogSubject()).recover();
@@ -157,8 +163,7 @@ public class BDBHAVirtualHost extends Ab
getConnectionRegistry().close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT);
removeHouseKeepingTasks();
- getQueueRegistry().stopAllAndUnregisterMBeans();
- getExchangeRegistry().clearAndUnregisterMbeans();
+ getQueueRegistry().close();
getDtxRegistry().close();
finalState = VirtualHostState.PASSIVE;
Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java?rev=1588126&r1=1588125&r2=1588126&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java Thu Apr 17 01:07:34 2014
@@ -38,6 +38,7 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
+import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
@@ -145,7 +146,7 @@ public class VirtualHostTest extends Qpi
private VirtualHost<?,?,?> createHost(Map<String, Object> attributes)
{
- ConfiguredObjectFactory factory = new ConfiguredObjectFactory(Model.getInstance());
+ ConfiguredObjectFactory factory = new ConfiguredObjectFactoryImpl(Model.getInstance());
ConfiguredObjectTypeFactory vhostFactory =
factory.getConfiguredObjectTypeFactory(VirtualHost.class, attributes);
attributes = new HashMap<String, Object>(attributes);
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java?rev=1588126&r1=1588125&r2=1588126&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java Thu Apr 17 01:07:34 2014
@@ -44,6 +44,7 @@ import org.apache.qpid.server.logging.Sy
import org.apache.qpid.server.logging.log4j.LoggingManagementFacade;
import org.apache.qpid.server.logging.messages.BrokerMessages;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
+import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.SystemContext;
import org.apache.qpid.server.model.SystemContextImpl;
@@ -60,6 +61,7 @@ public class Broker
private volatile IApplicationRegistry _applicationRegistry;
private EventLogger _eventLogger;
private boolean _configuringOwnLogging = false;
+ private final TaskExecutor _taskExecutor = new TaskExecutor();
protected static class InitException extends RuntimeException
{
@@ -85,6 +87,8 @@ public class Broker
{
_applicationRegistry.close();
}
+ _taskExecutor.stop();
+
}
finally
{
@@ -134,10 +138,10 @@ public class Broker
}
LogRecorder logRecorder = new LogRecorder();
- TaskExecutor taskExecutor = new TaskExecutor();
- taskExecutor.start();
- ConfiguredObjectFactory configuredObjectFactory = new ConfiguredObjectFactory(Model.getInstance());
- SystemContext systemContext = new SystemContextImpl(taskExecutor, configuredObjectFactory, _eventLogger, logRecorder, options);
+
+ _taskExecutor.start();
+ ConfiguredObjectFactory configuredObjectFactory = new ConfiguredObjectFactoryImpl(Model.getInstance());
+ SystemContext systemContext = new SystemContextImpl(_taskExecutor, configuredObjectFactory, _eventLogger, logRecorder, options);
BrokerConfigurationStoreCreator storeCreator = new BrokerConfigurationStoreCreator();
DurableConfigurationStore store = storeCreator.createStore(systemContext, storeType, options.getInitialConfigurationLocation(),
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java?rev=1588126&r1=1588125&r2=1588126&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java Thu Apr 17 01:07:34 2014
@@ -38,8 +38,10 @@ import org.apache.qpid.server.model.Abst
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.util.StateChangeListener;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class BindingImpl
extends AbstractConfiguredObject<BindingImpl>
@@ -48,8 +50,7 @@ public class BindingImpl
private final String _bindingKey;
private final AMQQueue _queue;
private final ExchangeImpl _exchange;
- private final Map<String, Object> _arguments;
- private final UUID _id;
+ private Map<String, Object> _arguments;
private final AtomicLong _matches = new AtomicLong();
private final BindingLogSubject _logSubject;
@@ -81,7 +82,6 @@ public class BindingImpl
public BindingImpl(UUID id, Map<String, Object> attributes, AMQQueue queue, ExchangeImpl exchange)
{
super(parentsMap(queue,exchange),enhanceWithDurable(combineIdWithAttributes(id, attributes), queue, exchange),queue.getVirtualHost().getTaskExecutor());
- _id = id;
_bindingKey = (String)attributes.get(org.apache.qpid.server.model.Binding.NAME);
_queue = queue;
_exchange = exchange;
@@ -198,7 +198,7 @@ public class BindingImpl
public String toString()
{
- return "Binding{bindingKey="+_bindingKey+", exchange="+_exchange+", queue="+_queue+", id= " + _id + " }";
+ return "Binding{bindingKey="+_bindingKey+", exchange="+_exchange+", queue="+_queue+", id= " + getId() + " }";
}
public void delete()
@@ -229,36 +229,6 @@ public class BindingImpl
}
@Override
- public Object getAttribute(final String name)
- {
- if(ID.equals(name))
- {
- return getId();
- }
- else if(NAME.equals(name))
- {
- return _bindingKey;
- }
- else if(DURABLE.equals(name))
- {
- return isDurable();
- }
- else if(LIFETIME_POLICY.equals(name))
- {
- return getLifetimePolicy();
- }
- else if(QUEUE.equals(name))
- {
- return _queue;
- }
- else if(EXCHANGE.equals(name))
- {
- return _exchange;
- }
- return super.getAttribute(name);
- }
-
- @Override
public Object setAttribute(final String name, final Object expected, final Object desired) throws IllegalStateException,
AccessControlException, IllegalArgumentException
{
@@ -276,4 +246,29 @@ public class BindingImpl
{
return _exchange.getEventLogger();
}
+
+ public void setArguments(final Map<String, Object> arguments)
+ {
+ if(getTaskExecutor().isTaskExecutorThread())
+ {
+ _arguments = arguments;
+ super.setAttribute(ARGUMENTS, getActualAttributes().get(ARGUMENTS), arguments);
+ if (isDurable())
+ {
+ VirtualHostImpl<?, ?, ?> vhost = (VirtualHostImpl<?, ?, ?>) _exchange.getParent(VirtualHost.class);
+ vhost.getDurableConfigurationStore().update(true, asObjectRecord());
+ }
+ }
+ else
+ {
+ getTaskExecutor().submitAndWait(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ setArguments(arguments);
+ }
+ });
+ }
+ }
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java?rev=1588126&r1=1588125&r2=1588126&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java Thu Apr 17 01:07:34 2014
@@ -142,6 +142,19 @@ public class TaskExecutor
return future;
}
+ public void submitAndWait(final Runnable task) throws CancellationException
+ {
+ submitAndWait(new Task<Void>()
+ {
+ @Override
+ public Void call()
+ {
+ task.run();
+ return null;
+ }
+ });
+ }
+
public <T> T submitAndWait(Task<T> task) throws CancellationException
{
try
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1588126&r1=1588125&r2=1588126&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Thu Apr 17 01:07:34 2014
@@ -37,6 +37,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.log4j.Logger;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogSubject;
@@ -54,7 +55,6 @@ import org.apache.qpid.server.model.Mana
import org.apache.qpid.server.model.Publisher;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
@@ -65,6 +65,7 @@ import org.apache.qpid.server.util.Actio
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
+import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.UnknownExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -111,7 +112,15 @@ public abstract class AbstractExchange<T
super(parentsMap(vhost), attributes, vhost.getTaskExecutor());
_virtualHost = vhost;
// check ACL
- _virtualHost.getSecurityManager().authoriseCreateExchange(this);
+ try
+ {
+ _virtualHost.getSecurityManager().authoriseCreateExchange(this);
+ }
+ catch (AccessControlException e)
+ {
+ deleted();
+ throw e;
+ }
_logSubject = new ExchangeLogSubject(this, this.getVirtualHost());
@@ -129,15 +138,52 @@ public abstract class AbstractExchange<T
}
@Override
+ public void validate()
+ {
+ super.validate();
+
+ if(!_virtualHost.getSecurityManager().isSystemProcess())
+ {
+ if (isReservedExchangeName(getName()))
+ {
+ deleted();
+ throw new ReservedExchangeNameException(getName());
+ }
+ }
+ }
+
+ private boolean isReservedExchangeName(String name)
+ {
+ if (name == null || ExchangeDefaults.DEFAULT_EXCHANGE_NAME.equals(name)
+ || name.startsWith("amq.") || name.startsWith("qpid."))
+ {
+ return true;
+ }
+ return false;
+ }
+
+
+ @Override
protected void onOpen()
{
super.onOpen();
- postSetAlternateExchange();
+
// Log Exchange creation
getEventLogger().message(ExchangeMessages.CREATED(getExchangeType().getType(), getName(), isDurable()));
}
@Override
+ protected void onCreate()
+ {
+ super.onCreate();
+ if(isDurable())
+ {
+ DurableConfigurationStoreHelper.createExchange(getVirtualHost().getDurableConfigurationStore(), this);
+ }
+
+ }
+
+ @Override
public EventLogger getEventLogger()
{
return _virtualHost.getEventLogger();
@@ -156,8 +202,25 @@ public abstract class AbstractExchange<T
return getLifetimePolicy() != LifetimePolicy.PERMANENT;
}
- public void close()
+ public void delete()
{
+ _virtualHost.getSecurityManager().authoriseDelete(this);
+
+ if(hasReferrers())
+ {
+ throw new ExchangeIsAlternateException(getName());
+ }
+
+ if(getExchangeType().getDefaultExchangeName().equals( getName() ))
+ {
+ throw new RequiredExchangeException(getName());
+ }
+
+ if (isDurable() && !isAutoDelete())
+ {
+ DurableConfigurationStoreHelper.removeExchange(getVirtualHost().getDurableConfigurationStore(), this);
+
+ }
if(_closed.compareAndSet(false,true))
{
@@ -180,7 +243,15 @@ public abstract class AbstractExchange<T
task.performAction(this);
}
_closeTaskList.clear();
+
+ if (isDurable() && !isAutoDelete())
+ {
+ DurableConfigurationStoreHelper.removeExchange(getVirtualHost().getDurableConfigurationStore(), this);
+
+ }
}
+ deleted();
+
}
public String toString()
@@ -623,10 +694,7 @@ public abstract class AbstractExchange<T
if (id == null)
{
- id = UUIDGenerator.generateBindingUUID(getName(),
- queue.getName(),
- bindingKey,
- _virtualHost.getName());
+ id = UUID.randomUUID();
}
Map<String,Object> attributes = new HashMap<String, Object>();
@@ -636,36 +704,47 @@ public abstract class AbstractExchange<T
attributes.put(org.apache.qpid.server.model.Binding.ARGUMENTS, arguments);
}
- BindingImpl b = new BindingImpl(id, attributes, queue, this);
-
- BindingImpl existingMapping = _bindingsMap.putIfAbsent(new BindingIdentifier(bindingKey,queue), b);
- if (existingMapping == null || force)
+ BindingImpl existingMapping;
+ synchronized(this)
{
- b.addStateChangeListener(_bindingListener);
- b.open();
- if (existingMapping != null)
- {
- existingMapping.delete();
- }
+ BindingIdentifier bindingIdentifier = new BindingIdentifier(bindingKey, queue);
+ existingMapping = _bindingsMap.get(bindingIdentifier);
- if (b.isDurable() && !restore)
+ if (existingMapping == null)
{
- DurableConfigurationStoreHelper.createBinding(_virtualHost.getDurableConfigurationStore(), b);
- }
+ BindingImpl b = new BindingImpl(id, attributes, queue, this);
+ b.addStateChangeListener(_bindingListener);
+ b.open();
- queue.addBinding(b);
- childAdded(b);
+ if (b.isDurable() && !restore)
+ {
+ DurableConfigurationStoreHelper.createBinding(_virtualHost.getDurableConfigurationStore(), b);
+ }
+ _bindingsMap.put(bindingIdentifier, b);
+ queue.addBinding(b);
+ childAdded(b);
- doAddBinding(b);
+ doAddBinding(b);
- return true;
- }
- else
- {
- return false;
+ return true;
+ }
+ else if(force)
+ {
+ Map<String,Object> oldArguments = existingMapping.getArguments();
+ existingMapping.setArguments(arguments);
+ onBindingUpdated(existingMapping, oldArguments);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
}
}
+ protected abstract void onBindingUpdated(final BindingImpl binding,
+ final Map<String, Object> oldArguments);
+
@Override
protected boolean setState(final State currentState, final State desiredState)
{
@@ -803,37 +882,12 @@ public abstract class AbstractExchange<T
}
@Override
- public void delete()
- {
- try
- {
- _virtualHost.removeExchange(this,true);
- }
- catch (ExchangeIsAlternateException e)
- {
- throw new UnsupportedOperationException(e.getMessage(),e);
- }
- catch (RequiredExchangeException e)
- {
- throw new UnsupportedOperationException("'"+e.getMessage()+"' is a reserved exchange and can't be deleted",e);
- }
- }
-
- @Override
public Object getAttribute(final String name)
{
if(ConfiguredObject.STATE.equals(name))
{
return getState();
}
- else if(LIFETIME_POLICY.equals(name))
- {
- return getLifetimePolicy();
- }
- else if(DURABLE.equals(name))
- {
- return isDurable();
- }
return super.getAttribute(name);
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=1588126&r1=1588125&r2=1588126&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java Thu Apr 17 01:07:34 2014
@@ -70,6 +70,11 @@ public class DirectExchange extends Abst
recalculateQueues();
}
+ public synchronized void updateBinding(BindingImpl binding)
+ {
+ recalculateQueues();
+ }
+
private void recalculateQueues()
{
List<BaseQueue> queues = new ArrayList<BaseQueue>(_bindings.size());
@@ -188,6 +193,19 @@ public class DirectExchange extends Abst
}
+ @Override
+ protected void onBindingUpdated(final BindingImpl binding, final Map<String, Object> oldArguments)
+ {
+ String bindingKey = binding.getBindingKey();
+ AMQQueue queue = binding.getAMQQueue();
+
+ assert queue != null;
+ assert bindingKey != null;
+
+ BindingSet bindings = _bindingsByKey.get(bindingKey);
+ bindings.updateBinding(binding);
+ }
+
protected void onBind(final BindingImpl binding)
{
String bindingKey = binding.getBindingKey();
Added: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeFactory.java?rev=1588126&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeFactory.java (added)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeFactory.java Thu Apr 17 01:07:34 2014
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import java.util.Map;
+
+import org.apache.qpid.server.model.AbstractConfiguredObjectTypeFactory;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+
+public class DirectExchangeFactory extends AbstractConfiguredObjectTypeFactory<DirectExchange>
+{
+ public DirectExchangeFactory()
+ {
+ super(DirectExchange.class);
+ }
+
+ @Override
+ public DirectExchange createInstance(Map<String, Object> attributes, ConfiguredObject<?>... parents)
+ {
+ VirtualHost<?,?,?> virtualHost = getParent(VirtualHost.class, parents);
+ if (!(virtualHost instanceof VirtualHostImpl))
+ {
+ throw new IllegalArgumentException("Unexpected virtual host is set as a parent. Expected instance of " + VirtualHostImpl.class.getName());
+ }
+ return new DirectExchange((VirtualHostImpl<?, ?, ?>)virtualHost , attributes);
+ }
+
+}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java?rev=1588126&r1=1588125&r2=1588126&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java Thu Apr 17 01:07:34 2014
@@ -62,7 +62,7 @@ public interface ExchangeImpl<T extends
void restoreBinding(UUID id, String bindingKey, AMQQueue queue,
Map<String, Object> argumentMap);
- void close();
+ void delete();
/**
* Determines whether a message would be isBound to a particular queue using a specific routing key and arguments
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=1588126&r1=1588125&r2=1588126&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Thu Apr 17 01:07:34 2014
@@ -125,6 +125,80 @@ public class FanoutExchange extends Abst
}
+ @Override
+ protected synchronized void onBindingUpdated(final BindingImpl binding, final Map<String, Object> oldArguments)
+ {
+ AMQQueue queue = binding.getAMQQueue();
+
+ if (binding.getArguments() == null || binding.getArguments().isEmpty() || !FilterSupport.argumentsContainFilter(
+ binding.getArguments()))
+ {
+ if(oldArguments != null && !oldArguments.isEmpty() && FilterSupport.argumentsContainFilter(oldArguments))
+ {
+ _unfilteredQueues.add(queue);
+ if(_queues.containsKey(queue))
+ {
+ _queues.put(queue,_queues.get(queue)+1);
+ }
+ else
+ {
+ _queues.put(queue, ONE);
+ }
+
+ // No longer any reason to check filters for this queue
+ _filteredQueues.remove(queue);
+ }
+ // else - nothing has changed, remains unfiltered
+ }
+ else
+ {
+ HashMap<AMQQueue,Map<BindingImpl, MessageFilter>> filteredBindings =
+ new HashMap<AMQQueue,Map<BindingImpl, MessageFilter>>(_filteredBindings.get());
+
+ Map<BindingImpl,MessageFilter> bindingsForQueue;
+
+ final MessageFilter messageFilter;
+
+ try
+ {
+ messageFilter = FilterSupport.createMessageFilter(binding.getArguments(), binding.getAMQQueue());
+ }
+ catch (AMQInvalidArgumentException e)
+ {
+ _logger.warn("Cannot bind queue " + queue + " to exchange this " + this + " because selector cannot be parsed.", e);
+ return;
+ }
+
+
+ if (oldArguments != null && !oldArguments.isEmpty() && FilterSupport.argumentsContainFilter(oldArguments))
+ {
+ bindingsForQueue = new HashMap<BindingImpl,MessageFilter>(filteredBindings.remove(binding.getAMQQueue()));
+ }
+ else // previously unfiltered
+ {
+ bindingsForQueue = new HashMap<BindingImpl,MessageFilter>();
+
+ Integer oldValue = _queues.remove(queue);
+ if (ONE.equals(oldValue))
+ {
+ // should start checking filters for this queue
+ _filteredQueues.add(queue);
+ _unfilteredQueues.remove(queue);
+ }
+ else
+ {
+ _queues.put(queue, oldValue - 1);
+ }
+
+ }
+ bindingsForQueue.put(binding, messageFilter);
+ filteredBindings.put(binding.getAMQQueue(),bindingsForQueue);
+
+ _filteredBindings.set(filteredBindings);
+
+ }
+
+ }
protected synchronized void onBind(final BindingImpl binding)
{
@@ -156,8 +230,7 @@ public class FanoutExchange extends Abst
new HashMap<AMQQueue,Map<BindingImpl, MessageFilter>>(_filteredBindings.get());
Map<BindingImpl, MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getAMQQueue());
- final
- MessageFilter messageFilter =
+ final MessageFilter messageFilter =
FilterSupport.createMessageFilter(binding.getArguments(), binding.getAMQQueue());
if(bindingsForQueue != null)
Added: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeFactory.java?rev=1588126&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeFactory.java (added)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeFactory.java Thu Apr 17 01:07:34 2014
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import java.util.Map;
+
+import org.apache.qpid.server.model.AbstractConfiguredObjectTypeFactory;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+
+public class FanoutExchangeFactory extends AbstractConfiguredObjectTypeFactory<FanoutExchange>
+{
+ public FanoutExchangeFactory()
+ {
+ super(FanoutExchange.class);
+ }
+
+ @Override
+ public FanoutExchange createInstance(Map<String, Object> attributes, ConfiguredObject<?>... parents)
+ {
+ VirtualHost<?,?,?> virtualHost = getParent(VirtualHost.class, parents);
+ if (!(virtualHost instanceof VirtualHostImpl))
+ {
+ throw new IllegalArgumentException("Unexpected virtual host is set as a parent. Expected instance of " + VirtualHostImpl.class.getName());
+ }
+ return new FanoutExchange((VirtualHostImpl<?, ?, ?>)virtualHost , attributes);
+ }
+
+}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=1588126&r1=1588125&r2=1588126&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Thu Apr 17 01:07:34 2014
@@ -22,6 +22,7 @@ package org.apache.qpid.server.exchange;
import java.util.ArrayList;
import java.util.LinkedHashSet;
+import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -158,6 +159,21 @@ public class HeadersExchange extends Abs
}
+ @Override
+ protected void onBindingUpdated(final BindingImpl binding, final Map<String, Object> oldArguments)
+ {
+ HeadersBinding headersBinding = new HeadersBinding(binding);
+ ListIterator<HeadersBinding> iter = _bindingHeaderMatchers.listIterator();
+ while(iter.hasNext())
+ {
+ if(iter.next().equals(headersBinding))
+ {
+ iter.set(headersBinding);
+ }
+ }
+
+ }
+
protected void onUnbind(final BindingImpl binding)
{
assert binding != null;
Added: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeFactory.java?rev=1588126&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeFactory.java (added)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeFactory.java Thu Apr 17 01:07:34 2014
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import java.util.Map;
+
+import org.apache.qpid.server.model.AbstractConfiguredObjectTypeFactory;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+
+public class HeadersExchangeFactory extends AbstractConfiguredObjectTypeFactory<HeadersExchange>
+{
+ public HeadersExchangeFactory()
+ {
+ super(HeadersExchange.class);
+ }
+
+ @Override
+ public HeadersExchange createInstance(Map<String, Object> attributes, ConfiguredObject<?>... parents)
+ {
+ VirtualHost<?,?,?> virtualHost = getParent(VirtualHost.class, parents);
+ if (!(virtualHost instanceof VirtualHostImpl))
+ {
+ throw new IllegalArgumentException("Unexpected virtual host is set as a parent. Expected instance of " + VirtualHostImpl.class.getName());
+ }
+ return new HeadersExchange((VirtualHostImpl<?, ?, ?>)virtualHost , attributes);
+ }
+
+}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=1588126&r1=1588125&r2=1588126&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Thu Apr 17 01:07:34 2014
@@ -72,6 +72,67 @@ public class TopicExchange extends Abstr
return TYPE;
}
+ @Override
+ protected void onBindingUpdated(final BindingImpl binding, final Map<String, Object> oldArguments)
+ {
+ final String bindingKey = binding.getBindingKey();
+ AMQQueue queue = binding.getAMQQueue();
+ Map<String,Object> args = binding.getArguments();
+
+ assert queue != null;
+ assert bindingKey != null;
+
+ _logger.debug("Registering queue " + queue.getName() + " with routing key " + bindingKey);
+
+
+ String routingKey = TopicNormalizer.normalize(bindingKey);
+
+ try
+ {
+
+ if (_bindings.containsKey(binding))
+ {
+ Map<String, Object> oldArgs = _bindings.get(binding);
+ TopicExchangeResult result = _topicExchangeResults.get(routingKey);
+
+ if (FilterSupport.argumentsContainFilter(args))
+ {
+ if (FilterSupport.argumentsContainFilter(oldArgs))
+ {
+ result.replaceQueueFilter(queue,
+ FilterSupport.createMessageFilter(oldArgs, queue),
+ FilterSupport.createMessageFilter(args, queue));
+ }
+ else
+ {
+ result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue));
+ result.removeUnfilteredQueue(queue);
+ }
+ }
+ else
+ {
+ if (FilterSupport.argumentsContainFilter(oldArgs))
+ {
+ result.addUnfilteredQueue(queue);
+ result.removeFilteredQueue(queue, FilterSupport.createMessageFilter(oldArgs, queue));
+ }
+ else
+ {
+ // TODO - fix control flow
+ return;
+ }
+ }
+
+ }
+ }
+ catch (AMQInvalidArgumentException e)
+ {
+ throw new ConnectionScopedRuntimeException(e);
+ }
+
+
+ }
+
protected synchronized void registerQueue(final BindingImpl binding) throws AMQInvalidArgumentException
{
final String bindingKey = binding.getBindingKey();
Added: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeFactory.java?rev=1588126&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeFactory.java (added)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeFactory.java Thu Apr 17 01:07:34 2014
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import java.util.Map;
+
+import org.apache.qpid.server.model.AbstractConfiguredObjectTypeFactory;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+
+public class TopicExchangeFactory extends AbstractConfiguredObjectTypeFactory<TopicExchange>
+{
+ public TopicExchangeFactory()
+ {
+ super(TopicExchange.class);
+ }
+
+ @Override
+ public TopicExchange createInstance(Map<String, Object> attributes, ConfiguredObject<?>... parents)
+ {
+ VirtualHost<?,?,?> virtualHost = getParent(VirtualHost.class, parents);
+ if (!(virtualHost instanceof VirtualHostImpl))
+ {
+ throw new IllegalArgumentException("Unexpected virtual host is set as a parent. Expected instance of " + VirtualHostImpl.class.getName());
+ }
+ return new TopicExchange((VirtualHostImpl<?, ?, ?>)virtualHost , attributes);
+ }
+
+}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1588126&r1=1588125&r2=1588126&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java Thu Apr 17 01:07:34 2014
@@ -41,6 +41,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -104,7 +105,12 @@ public abstract class AbstractConfigured
new ArrayList<ConfigurationChangeListener>();
private final Map<Class<? extends ConfiguredObject>, Collection<ConfiguredObject<?>>> _children =
- new HashMap<Class<? extends ConfiguredObject>, Collection<ConfiguredObject<?>>>();
+ new ConcurrentHashMap<Class<? extends ConfiguredObject>, Collection<ConfiguredObject<?>>>();
+ private final Map<Class<? extends ConfiguredObject>, Map<UUID,ConfiguredObject<?>>> _childrenById =
+ new ConcurrentHashMap<Class<? extends ConfiguredObject>, Map<UUID,ConfiguredObject<?>>>();
+ private final Map<Class<? extends ConfiguredObject>, Map<String,ConfiguredObject<?>>> _childrenByName =
+ new ConcurrentHashMap<Class<? extends ConfiguredObject>, Map<String,ConfiguredObject<?>>>();
+
@ManagedAttributeField
private final UUID _id;
@@ -178,6 +184,8 @@ public abstract class AbstractConfigured
if(idObj == null)
{
uuid = UUID.randomUUID();
+ attributes = new HashMap<String, Object>(attributes);
+ attributes.put(ID, uuid);
}
else
{
@@ -185,6 +193,7 @@ public abstract class AbstractConfigured
}
_id = uuid;
+ _name = AttributeValueConverter.STRING_CONVERTER.convert(attributes.get(NAME),this);
_attributeTypes = getAttributeTypes(getClass());
_automatedFields = getAutomatedFields(getClass());
@@ -205,6 +214,8 @@ public abstract class AbstractConfigured
for (Class<? extends ConfiguredObject> childClass : Model.getInstance().getChildTypes(getCategoryClass()))
{
_children.put(childClass, new CopyOnWriteArrayList<ConfiguredObject<?>>());
+ _childrenById.put(childClass, new ConcurrentHashMap<UUID, ConfiguredObject<?>>());
+ _childrenByName.put(childClass, new ConcurrentHashMap<String, ConfiguredObject<?>>());
}
for(ConfiguredObject<?> parent : parents.values())
@@ -220,7 +231,6 @@ public abstract class AbstractConfigured
addParent((Class<ConfiguredObject<?>>) entry.getKey(), entry.getValue());
}
- _name = AttributeValueConverter.STRING_CONVERTER.convert(attributes.get(NAME),this);
Object durableObj = attributes.get(DURABLE);
_durable = AttributeValueConverter.BOOLEAN_CONVERTER.convert(durableObj == null ? _attributeTypes.get(DURABLE).getAnnotation().defaultValue() : durableObj, this);
@@ -353,7 +363,7 @@ public abstract class AbstractConfigured
}
}
- public void open()
+ public final void open()
{
if(_open.compareAndSet(false,true))
{
@@ -364,7 +374,7 @@ public abstract class AbstractConfigured
}
- public void create()
+ public final void create()
{
if(_open.compareAndSet(false,true))
{
@@ -391,7 +401,7 @@ public abstract class AbstractConfigured
});
}
- protected void doValidation()
+ protected final void doValidation()
{
applyToChildren(new Action<ConfiguredObject<?>>()
{
@@ -407,7 +417,7 @@ public abstract class AbstractConfigured
validate();
}
- protected void doResolution()
+ protected final void doResolution()
{
resolve();
applyToChildren(new Action<ConfiguredObject<?>>()
@@ -423,7 +433,7 @@ public abstract class AbstractConfigured
});
}
- protected void doCreation()
+ protected final void doCreation()
{
onCreate();
applyToChildren(new Action<ConfiguredObject<?>>()
@@ -454,7 +464,8 @@ public abstract class AbstractConfigured
}
}
- public void validate()
+ public void
+ validate()
{
}
@@ -868,29 +879,78 @@ public abstract class AbstractConfigured
private <C extends ConfiguredObject> void registerChild(final C child)
{
+
Class categoryClass = child.getCategoryClass();
+ UUID childId = child.getId();
+ String name = child.getName();
+ if(_childrenById.get(categoryClass).containsKey(childId))
+ {
+ throw new DuplicateIdException(child);
+ }
+ if(_childrenByName.get(categoryClass).containsKey(name))
+ {
+ Collection<Class<? extends ConfiguredObject>> parentTypes =
+ new ArrayList<Class<? extends ConfiguredObject>>(Model.getInstance().getParentTypes(categoryClass));
+ parentTypes.remove(getCategoryClass());
+ boolean duplicate = true;
+
+ C existing = (C) _childrenByName.get(categoryClass).get(name);
+ for(Class<? extends ConfiguredObject> parentType : parentTypes)
+ {
+ ConfiguredObject existingParent = existing.getParent(parentType);
+ ConfiguredObject childParent = child.getParent(parentType);
+ duplicate = existingParent == childParent;
+ if(!duplicate)
+ {
+ break;
+ }
+ }
+
+ if(duplicate)
+ {
+ throw new DuplicateNameException(child);
+ }
+ }
_children.get(categoryClass).add(child);
+ _childrenById.get(categoryClass).put(childId,child);
+ _childrenByName.get(categoryClass).put(name, child);
+
}
protected void deleted()
{
- for(ConfiguredObject<?> parent : _parents.values())
+ for (ConfiguredObject<?> parent : _parents.values())
{
- if(parent instanceof AbstractConfiguredObject<?>)
+ if (parent instanceof AbstractConfiguredObject<?>)
{
- ((AbstractConfiguredObject<?>)parent).unregisterChild(this);
+ AbstractConfiguredObject<?> parentObj = (AbstractConfiguredObject<?>) parent;
+ parentObj.unregisterChild(this);
+ parentObj.childRemoved(this);
}
}
}
- protected <C extends ConfiguredObject> void unregisterChild(final C child)
+ private <C extends ConfiguredObject> void unregisterChild(final C child)
{
- _children.get(child.getCategoryClass()).remove(child);
+ Class categoryClass = child.getCategoryClass();
+ _children.get(categoryClass).remove(child);
+ _childrenById.get(categoryClass).remove(child.getId());
+ _childrenByName.get(categoryClass).remove(child.getName());
}
+ @Override
+ public final <C extends ConfiguredObject> C getChildById(final Class<C> clazz, final UUID id)
+ {
+ return (C) _childrenById.get(Model.getCategory(clazz)).get(id);
+ }
+ @Override
+ public final <C extends ConfiguredObject> C getChildByName(final Class<C> clazz, final String name)
+ {
+ return (C) _childrenByName.get(Model.getCategory(clazz)).get(name);
+ }
@Override
public <C extends ConfiguredObject> Collection<C> getChildren(final Class<C> clazz)
@@ -1581,7 +1641,8 @@ public abstract class AbstractConfigured
int oldSize = 0;
Model model = Model.getInstance();
- Set<Class<? extends ConfiguredObject>> allDescendants = new HashSet<Class<? extends ConfiguredObject>>(model.getChildTypes(candidate));
+ Set<Class<? extends ConfiguredObject>> allDescendants = new HashSet<Class<? extends ConfiguredObject>>(model.getChildTypes(
+ candidate));
while(allDescendants.size() > oldSize)
{
oldSize = allDescendants.size();
@@ -1661,4 +1722,27 @@ public abstract class AbstractConfigured
throw new ServerScopedRuntimeException("Unable to find attribute definition for method " + method.getName());
}
}
+
+ protected final static class DuplicateIdException extends RuntimeException
+ {
+ public DuplicateIdException(final ConfiguredObject<?> child)
+ {
+ super("Child of type " + child.getClass().getSimpleName() + " already exists with id of " + child.getId());
+ }
+ }
+
+ protected final static class DuplicateNameException extends RuntimeException
+ {
+ private final String _name;
+ public DuplicateNameException(final ConfiguredObject<?> child)
+ {
+ super("Child of type " + child.getClass().getSimpleName() + " already exists with name of " + child.getName());
+ _name = child.getName();
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+ }
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java?rev=1588126&r1=1588125&r2=1588126&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java Thu Apr 17 01:07:34 2014
@@ -194,4 +194,6 @@ public interface Broker<X extends Broker
void setEventLogger(EventLogger eventLogger);
AuthenticationProvider<?> getManagementModeAuthenticationProvider();
+
+ ConfiguredObjectFactory getObjectFactory();
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java?rev=1588126&r1=1588125&r2=1588126&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java Thu Apr 17 01:07:34 2014
@@ -233,6 +233,10 @@ public interface ConfiguredObject<X exte
*/
<C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz);
+ <C extends ConfiguredObject> C getChildById(Class<C> clazz, UUID id);
+
+ <C extends ConfiguredObject> C getChildByName(Class<C> clazz, String name);
+
<C extends ConfiguredObject> C createChild(Class<C> childClass,
Map<String, Object> attributes,
Added: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java?rev=1588126&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java (added)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java Thu Apr 17 01:07:34 2014
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.UnresolvedConfiguredObject;
+
+public interface ConfiguredObjectFactory
+{
+ <X extends ConfiguredObject<X>> UnresolvedConfiguredObject<X> recover(ConfiguredObjectRecord record,
+ ConfiguredObject<?>... parents);
+
+ <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(Class<X> categoryClass,
+ Map<String, Object> attributes);
+
+ <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(String category,
+ String type);
+
+ Collection<String> getSupportedTypes(Class<? extends ConfiguredObject> category);
+
+ Model getModel();
+}
Copied: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java (from r1588125, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java?p2=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java&p1=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java&r1=1588125&r2=1588126&rev=1588126&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java Thu Apr 17 01:07:34 2014
@@ -32,7 +32,7 @@ import org.apache.qpid.server.store.Conf
import org.apache.qpid.server.store.UnresolvedConfiguredObject;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
-public class ConfiguredObjectFactory
+public class ConfiguredObjectFactoryImpl implements ConfiguredObjectFactory
{
private final Map<String, String> _defaultTypes = new HashMap<String, String>();
private final Map<String, Map<String, ConfiguredObjectTypeFactory>> _allFactories =
@@ -41,7 +41,7 @@ public class ConfiguredObjectFactory
private final Model _model;
- public ConfiguredObjectFactory(Model model)
+ public ConfiguredObjectFactoryImpl(Model model)
{
_model = model;
@@ -77,6 +77,7 @@ public class ConfiguredObjectFactory
}
}
+ @Override
public <X extends ConfiguredObject<X>> UnresolvedConfiguredObject<X> recover(ConfiguredObjectRecord record,
ConfiguredObject<?>... parents)
{
@@ -95,7 +96,9 @@ public class ConfiguredObjectFactory
return factory.recover(record, parents);
}
- public <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(final Class<X> categoryClass, Map<String,Object> attributes)
+ @Override
+ public <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(final Class<X> categoryClass,
+ Map<String, Object> attributes)
{
final String category = categoryClass.getSimpleName();
Map<String, ConfiguredObjectTypeFactory> categoryFactories = _allFactories.get(category);
@@ -123,7 +126,9 @@ public class ConfiguredObjectFactory
return factory;
}
- public <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(final String category, final String type)
+ @Override
+ public <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(final String category,
+ final String type)
{
Map<String, ConfiguredObjectTypeFactory> categoryFactories = _allFactories.get(category);
if(categoryFactories == null)
@@ -138,12 +143,14 @@ public class ConfiguredObjectFactory
return factory;
}
+ @Override
public Collection<String> getSupportedTypes(Class<? extends ConfiguredObject> category)
{
return Collections.unmodifiableCollection(_supportedTypes.get(category.getSimpleName()));
}
+ @Override
public Model getModel()
{
return _model;
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java?rev=1588126&r1=1588125&r2=1588126&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java Thu Apr 17 01:07:34 2014
@@ -56,4 +56,6 @@ public interface Session<X extends Sessi
@ManagedStatistic
long getUnacknowledgedMessages();
+
+ void delete();
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java?rev=1588126&r1=1588125&r2=1588126&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java Thu Apr 17 01:07:34 2014
@@ -166,11 +166,13 @@ public class BrokerAdapter extends Abstr
String modelVersion = (String) getActualAttributes().get(Broker.MODEL_VERSION);
if (modelVersion == null)
{
+ deleted();
throw new IllegalConfigurationException("Broker " + Broker.MODEL_VERSION + " must be specified");
}
if (!MODEL_VERSION_PATTERN.matcher(modelVersion).matches())
{
+ deleted();
throw new IllegalConfigurationException("Broker " + Broker.MODEL_VERSION + " is specified in incorrect format: "
+ modelVersion);
}
@@ -182,12 +184,14 @@ public class BrokerAdapter extends Abstr
if (majorModelVersion != Model.MODEL_MAJOR_VERSION || minorModelVersion > Model.MODEL_MINOR_VERSION)
{
+ deleted();
throw new IllegalConfigurationException("The model version '" + modelVersion
+ "' in configuration is incompatible with the broker model version '" + Model.MODEL_VERSION + "'");
}
if(!isDurable())
{
+ deleted();
throw new IllegalArgumentException(getClass().getSimpleName() + " must be durable");
}
}
@@ -1203,6 +1207,12 @@ public class BrokerAdapter extends Abstr
}
@Override
+ public ConfiguredObjectFactory getObjectFactory()
+ {
+ return _objectFactory;
+ }
+
+ @Override
public void setEventLogger(final EventLogger eventLogger)
{
_eventLogger = eventLogger;
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java?rev=1588126&r1=1588125&r2=1588126&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java Thu Apr 17 01:07:34 2014
@@ -27,6 +27,8 @@ import org.apache.qpid.server.model.Mana
@ManagedObject( category = false, type = "GroupFile" )
public interface FileBasedGroupProvider<X extends FileBasedGroupProvider<X>> extends GroupProvider<X>
{
+ String PATH="path";
+
@ManagedAttribute( automate = true, mandatory = true)
String getPath();
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java?rev=1588126&r1=1588125&r2=1588126&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java Thu Apr 17 01:07:34 2014
@@ -23,28 +23,49 @@ import java.io.File;
import java.io.IOException;
import java.security.AccessControlException;
import java.security.Principal;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Logger;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.model.*;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
-import org.apache.qpid.server.security.access.Operation;
-import org.apache.qpid.server.security.group.FileGroupManager;
-import org.apache.qpid.server.security.group.GroupManager;
+import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Group;
+import org.apache.qpid.server.model.GroupMember;
+import org.apache.qpid.server.model.GroupProvider;
+import org.apache.qpid.server.model.IllegalStateTransitionException;
+import org.apache.qpid.server.model.ManagedAttributeField;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.security.access.Operation;
+import org.apache.qpid.server.security.auth.UsernamePrincipal;
+import org.apache.qpid.server.security.group.FileGroupDatabase;
+import org.apache.qpid.server.security.group.GroupPrincipal;
import org.apache.qpid.server.util.MapValueConverter;
public class FileBasedGroupProviderImpl
extends AbstractConfiguredObject<FileBasedGroupProviderImpl> implements FileBasedGroupProvider<FileBasedGroupProviderImpl>
{
+ public static final String RESOURCE_BUNDLE = "org.apache.qpid.server.security.group.FileGroupProviderAttributeDescriptions";
+ public static final String GROUP_FILE_PROVIDER_TYPE = "GroupFile";
private static Logger LOGGER = Logger.getLogger(FileBasedGroupProviderImpl.class);
- private GroupManager _groupManager;
private final Broker<?> _broker;
private AtomicReference<State> _state;
+ private FileGroupDatabase _groupDatabase;
+
@ManagedAttributeField
private String _path;
@@ -100,18 +121,74 @@ public class FileBasedGroupProviderImpl
protected void onOpen()
{
super.onOpen();
- if(_groupManager == null)
+ if(_groupDatabase == null)
+ {
+ _groupDatabase = new FileGroupDatabase();
+ try
+ {
+ _groupDatabase.setGroupFile(getPath());
+ }
+ catch (IOException e)
+ {
+ setState(getState(), State.ERRORED);
+ LOGGER.warn(("Unable to open preferences file at " + _path));
+ }
+ }
+ Set<Principal> groups = getGroupPrincipals();
+ Collection<Group> principals = new ArrayList<Group>(groups.size());
+ for (Principal group : groups)
{
- _groupManager = new FileGroupManager(getPath());
+ Map<String,Object> attrMap = new HashMap<String, Object>();
+ UUID id = UUIDGenerator.generateGroupUUID(getName(),group.getName());
+ attrMap.put(Group.ID, id);
+ attrMap.put(Group.NAME, group.getName());
+ GroupAdapter groupAdapter = new GroupAdapter(attrMap, getTaskExecutor());
+ principals.add(groupAdapter);
}
+
}
@Override
protected void onCreate()
{
super.onCreate();
- _groupManager = new FileGroupManager(getPath());
- _groupManager.onCreate();
+ _groupDatabase = new FileGroupDatabase();
+
+ File file = new File(_path);
+ if (!file.exists())
+ {
+ File parent = file.getParentFile();
+ if (!parent.exists())
+ {
+ parent.mkdirs();
+ }
+ if (parent.exists())
+ {
+ try
+ {
+ file.createNewFile();
+ }
+ catch (IOException e)
+ {
+ throw new IllegalConfigurationException("Cannot create group file");
+ }
+ }
+ else
+ {
+ throw new IllegalConfigurationException("Cannot create group file");
+ }
+ }
+ try
+ {
+ _groupDatabase.setGroupFile(getPath());
+ }
+ catch (IOException e)
+ {
+ setState(getState(), State.ERRORED);
+ LOGGER.warn(("Unable to open preferences file at " + _path));
+ }
+
+
}
@Override
@@ -147,12 +224,16 @@ public class FileBasedGroupProviderImpl
String groupName = (String) attributes.get(Group.NAME);
getSecurityManager().authoriseGroupOperation(Operation.CREATE, groupName);
- _groupManager.createGroup(groupName);
+
+ _groupDatabase.createGroup(groupName);
+
Map<String,Object> attrMap = new HashMap<String, Object>();
UUID id = UUIDGenerator.generateGroupUUID(getName(),groupName);
attrMap.put(Group.ID, id);
attrMap.put(Group.NAME, groupName);
- return (C) new GroupAdapter(attrMap, getTaskExecutor());
+ GroupAdapter groupAdapter = new GroupAdapter(attrMap, getTaskExecutor());
+ groupAdapter.create();
+ return (C) groupAdapter;
}
@@ -161,35 +242,25 @@ public class FileBasedGroupProviderImpl
+ childClass);
}
- @SuppressWarnings("unchecked")
- @Override
- public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
+ private Set<Principal> getGroupPrincipals()
{
- if (clazz == Group.class)
+
+ Set<String> groups = _groupDatabase == null ? Collections.<String>emptySet() : _groupDatabase.getAllGroups();
+ if (groups.isEmpty())
{
- Set<Principal> groups = _groupManager == null ? Collections.<Principal>emptySet() : _groupManager.getGroupPrincipals();
- Collection<Group> principals = new ArrayList<Group>(groups.size());
- for (Principal group : groups)
- {
- Map<String,Object> attrMap = new HashMap<String, Object>();
- UUID id = UUIDGenerator.generateGroupUUID(getName(),group.getName());
- attrMap.put(Group.ID, id);
- attrMap.put(Group.NAME, group.getName());
- principals.add(new GroupAdapter(attrMap, getTaskExecutor()));
- }
- return (Collection<C>) Collections
- .unmodifiableCollection(principals);
+ return Collections.emptySet();
}
else
{
- return null;
+ Set<Principal> principals = new HashSet<Principal>();
+ for (String groupName : groups)
+ {
+ principals.add(new GroupPrincipal(groupName));
+ }
+ return principals;
}
}
- public GroupManager getGroupManager()
- {
- return _groupManager;
- }
private SecurityManager getSecurityManager()
{
@@ -207,7 +278,15 @@ public class FileBasedGroupProviderImpl
{
try
{
- _groupManager.open();
+ try
+ {
+ _groupDatabase.setGroupFile(getPath());
+ }
+ catch (IOException e)
+ {
+ throw new IllegalConfigurationException("Unable to set group file " + getPath(), e);
+ }
+
return true;
}
catch(RuntimeException e)
@@ -232,7 +311,6 @@ public class FileBasedGroupProviderImpl
{
if (_state.compareAndSet(state, State.STOPPED))
{
- _groupManager.close();
return true;
}
else
@@ -245,8 +323,15 @@ public class FileBasedGroupProviderImpl
if ((state == State.INITIALISING || state == State.ACTIVE || state == State.STOPPED || state == State.QUIESCED || state == State.ERRORED)
&& _state.compareAndSet(state, State.DELETED))
{
- _groupManager.close();
- _groupManager.onDelete();
+ File file = new File(getPath());
+ if (file.exists())
+ {
+ if (!file.delete())
+ {
+ throw new IllegalConfigurationException("Cannot delete group file");
+ }
+ }
+
deleted();
return true;
}
@@ -267,7 +352,20 @@ public class FileBasedGroupProviderImpl
public Set<Principal> getGroupPrincipalsForUser(String username)
{
- return _groupManager.getGroupPrincipalsForUser(username);
+ Set<String> groups = _groupDatabase.getGroupsForUser(username);
+ if (groups.isEmpty())
+ {
+ return Collections.emptySet();
+ }
+ else
+ {
+ Set<Principal> principals = new HashSet<Principal>();
+ for (String groupName : groups)
+ {
+ principals.add(new GroupPrincipal(groupName));
+ }
+ return principals;
+ }
}
@Override
@@ -337,6 +435,24 @@ public class FileBasedGroupProviderImpl
}
@Override
+ protected void onOpen()
+ {
+ super.onOpen();
+ Set<Principal> usersInGroup = getUserPrincipalsForGroup(getName());
+ Collection<GroupMember> members = new ArrayList<GroupMember>();
+ for (Principal principal : usersInGroup)
+ {
+ UUID id = UUIDGenerator.generateGroupMemberUUID(FileBasedGroupProviderImpl.this.getName(), getName(), principal.getName());
+ Map<String,Object> attrMap = new HashMap<String, Object>();
+ attrMap.put(GroupMember.ID,id);
+ attrMap.put(GroupMember.NAME, principal.getName());
+ GroupMemberAdapter groupMemberAdapter = new GroupMemberAdapter(attrMap, getTaskExecutor());
+ groupMemberAdapter.open();
+ members.add(groupMemberAdapter);
+ }
+ }
+
+ @Override
protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes)
{
super.validateChange(proxyForValidation, changedAttributes);
@@ -346,33 +462,25 @@ public class FileBasedGroupProviderImpl
}
}
- @Override
- public <C extends ConfiguredObject> Collection<C> getChildren(
- Class<C> clazz)
+ private Set<Principal> getUserPrincipalsForGroup(String group)
{
- if (clazz == GroupMember.class)
+ Set<String> users = _groupDatabase.getUsersInGroup(group);
+ if (users.isEmpty())
{
- Set<Principal> usersInGroup = _groupManager
- .getUserPrincipalsForGroup(getName());
- Collection<GroupMember> members = new ArrayList<GroupMember>();
- for (Principal principal : usersInGroup)
- {
- UUID id = UUIDGenerator.generateGroupMemberUUID(FileBasedGroupProviderImpl.this.getName(), getName(), principal.getName());
- Map<String,Object> attrMap = new HashMap<String, Object>();
- attrMap.put(GroupMember.ID,id);
- attrMap.put(GroupMember.NAME, principal.getName());
- members.add(new GroupMemberAdapter(attrMap, getTaskExecutor()));
- }
- return (Collection<C>) Collections
- .unmodifiableCollection(members);
+ return Collections.emptySet();
}
else
{
- return null;
+ Set<Principal> principals = new HashSet<Principal>();
+ for (String user : users)
+ {
+ principals.add(new UsernamePrincipal(user));
+ }
+ return principals;
}
-
}
+
@Override
public <C extends ConfiguredObject> C addChild(Class<C> childClass,
Map<String, Object> attributes,
@@ -384,12 +492,14 @@ public class FileBasedGroupProviderImpl
getSecurityManager().authoriseGroupOperation(Operation.UPDATE, getName());
- _groupManager.addUserToGroup(memberName, getName());
+ _groupDatabase.addUserToGroup(memberName, getName());
UUID id = UUIDGenerator.generateGroupMemberUUID(FileBasedGroupProviderImpl.this.getName(), getName(), memberName);
Map<String,Object> attrMap = new HashMap<String, Object>();
attrMap.put(GroupMember.ID,id);
attrMap.put(GroupMember.NAME, memberName);
- return (C) new GroupMemberAdapter(attrMap, getTaskExecutor());
+ GroupMemberAdapter groupMemberAdapter = new GroupMemberAdapter(attrMap, getTaskExecutor());
+ groupMemberAdapter.create();
+ return (C) groupMemberAdapter;
}
@@ -405,7 +515,8 @@ public class FileBasedGroupProviderImpl
if (desiredState == State.DELETED)
{
getSecurityManager().authoriseGroupOperation(Operation.DELETE, getName());
- _groupManager.removeGroup(getName());
+ _groupDatabase.removeGroup(getName());
+ deleted();
return true;
}
@@ -479,7 +590,8 @@ public class FileBasedGroupProviderImpl
{
getSecurityManager().authoriseGroupOperation(Operation.UPDATE, GroupAdapter.this.getName());
- _groupManager.removeUserFromGroup(getName(), GroupAdapter.this.getName());
+ _groupDatabase.removeUserFromGroup(getName(), GroupAdapter.this.getName());
+ deleted();
return true;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org