You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2012/03/30 15:44:29 UTC

svn commit: r1307416 [3/5] - in /qpid/trunk/qpid/java: ./ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ broker-plugins/extras/src/main/java/org/apache/qpid/extras/exchang...

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java Fri Mar 30 13:44:25 2012
@@ -34,7 +34,6 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.messages.BindingMessages;
 import org.apache.qpid.server.logging.subjects.BindingLogSubject;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Collections;
@@ -44,38 +43,14 @@ import java.util.concurrent.ConcurrentHa
 public class BindingFactory
 {
     private final VirtualHost _virtualHost;
-    private final DurableConfigurationStore.Source _configSource;
-    private final Exchange _defaultExchange;
 
     private final ConcurrentHashMap<BindingImpl, BindingImpl> _bindings = new ConcurrentHashMap<BindingImpl, BindingImpl>();
 
-
     public BindingFactory(final VirtualHost vhost)
     {
-        this(vhost, vhost.getExchangeRegistry().getDefaultExchange());
-    }
-
-    public BindingFactory(final DurableConfigurationStore.Source configSource, final Exchange defaultExchange)
-    {
-        _configSource = configSource;
-        _defaultExchange = defaultExchange;
-        if (configSource instanceof VirtualHost)
-        {
-            _virtualHost = (VirtualHost) configSource;
-        }
-        else
-        {
-            _virtualHost = null;
-        }
-    }
-
-    public VirtualHost getVirtualHost()
-    {
-        return _virtualHost;
+        _virtualHost = vhost;
     }
 
-
-
     private final class BindingImpl extends Binding implements AMQQueue.Task, Exchange.Task, BindingConfig
     {
         private final BindingLogSubject _logSubject;
@@ -156,30 +131,38 @@ public class BindingFactory
     private boolean makeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments, boolean restore, boolean force) throws AMQSecurityException, AMQInternalException
     {
         assert queue != null;
+        final Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange();
+
         if (bindingKey == null)
         {
             bindingKey = "";
         }
         if (exchange == null)
         {
-            exchange = _defaultExchange;
+            exchange = defaultExchange;
         }
         if (arguments == null)
         {
             arguments = Collections.emptyMap();
         }
 
+        if (exchange == null)
+        {
+            throw new IllegalArgumentException("exchange cannot be null");
+        }
+
         // The default exchange bindings must reflect the existence of queues, allow
         // all operations on it to succeed. It is up to the broker to prevent illegal
         // attempts at binding to this exchange, not the ACLs.
-        if(exchange != _defaultExchange)
+        if(exchange != defaultExchange)
         {
             //Perform ACLs
-            if (!getVirtualHost().getSecurityManager().authoriseBind(exchange, queue, new AMQShortString(bindingKey)))
+            if (!_virtualHost.getSecurityManager().authoriseBind(exchange, queue, new AMQShortString(bindingKey)))
             {
                 throw new AMQSecurityException("Permission denied: binding " + bindingKey);
             }
         }
+
         
         BindingImpl b = new BindingImpl(bindingKey,queue,exchange,arguments);
         BindingImpl existingMapping = _bindings.putIfAbsent(b,b);
@@ -192,7 +175,7 @@ public class BindingFactory
 
             if (b.isDurable() && !restore)
             {
-                _configSource.getMessageStore().bindQueue(exchange,new AMQShortString(bindingKey),queue,FieldTable.convertToFieldTable(arguments));
+                _virtualHost.getMessageStore().bindQueue(exchange,new AMQShortString(bindingKey),queue,FieldTable.convertToFieldTable(arguments));
             }
 
             queue.addQueueDeleteTask(b);
@@ -212,7 +195,7 @@ public class BindingFactory
 
     private ConfigStore getConfigStore()
     {
-        return getVirtualHost().getConfigStore();
+        return _virtualHost.getConfigStore();
     }
 
     public void restoreBinding(final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> argumentMap) throws AMQSecurityException, AMQInternalException
@@ -229,13 +212,15 @@ public class BindingFactory
     public Binding removeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException
     {
         assert queue != null;
+        final Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange();
+
         if (bindingKey == null)
         {
             bindingKey = "";
         }
         if (exchange == null)
         {
-            exchange = _defaultExchange;
+            exchange = defaultExchange;
         }
         if (arguments == null)
         {
@@ -245,10 +230,10 @@ public class BindingFactory
         // The default exchange bindings must reflect the existence of queues, allow
         // all operations on it to succeed. It is up to the broker to prevent illegal
         // attempts at binding to this exchange, not the ACLs.
-        if(exchange != _defaultExchange)
+        if(exchange != defaultExchange)
         {
             // Check access
-            if (!getVirtualHost().getSecurityManager().authoriseUnbind(exchange, new AMQShortString(bindingKey), queue))
+            if (!_virtualHost.getSecurityManager().authoriseUnbind(exchange, new AMQShortString(bindingKey), queue))
             {
                 throw new AMQSecurityException("Permission denied: unbinding " + bindingKey);
             }
@@ -265,7 +250,7 @@ public class BindingFactory
 
             if (b.isDurable())
             {
-                _configSource.getMessageStore().unbindQueue(exchange,
+                _virtualHost.getMessageStore().unbindQueue(exchange,
                                          new AMQShortString(bindingKey),
                                          queue,
                                          FieldTable.convertToFieldTable(arguments));
@@ -280,13 +265,15 @@ public class BindingFactory
     public Binding getBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments)
     {
         assert queue != null;
+        final Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange();
+
         if(bindingKey == null)
         {
             bindingKey = "";
         }
         if(exchange == null)
         {
-            exchange = _defaultExchange;
+            exchange = defaultExchange;
         }
         if(arguments == null)
         {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Fri Mar 30 13:44:25 2012
@@ -32,6 +32,7 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.MemoryMessageStoreFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -102,14 +103,14 @@ public class VirtualHostConfiguration ex
         return getConfig().subset("store");
     }
 
-    public String getMessageStoreClass()
+    public String getMessageStoreFactoryClass()
     {
-        return getStringValue("store.class", MemoryMessageStore.class.getName());
+        return getStringValue("store.factoryclass", MemoryMessageStoreFactory.class.getName());
     }
 
-    public void setMessageStoreClass(String storeClass)
+    public void setMessageStoreFactoryClass(String storeFactoryClass)
     {
-        getConfig().setProperty("store.class", storeClass);
+        getConfig().setProperty("store.factoryclass", storeFactoryClass);
     }
 
     public List getExchanges()

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java Fri Mar 30 13:44:25 2012
@@ -46,11 +46,19 @@ public class ConnectionRegistry implemen
     /** Close all of the currently open connections. */
     public void close()
     {
-        _logger.debug("Closing connection registry :" + _registry.size() + " connections.");
+        close(IConnectionRegistry.BROKER_SHUTDOWN_REPLY_TEXT);
+    }
+
+    public void close(final String replyText)
+    {
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Closing connection registry :" + _registry.size() + " connections.");
+        }
         while (!_registry.isEmpty())
         {
             AMQConnectionModel connection = _registry.get(0);
-            closeConnection(connection, AMQConstant.CONNECTION_FORCED, "Broker is shutting down");
+            closeConnection(connection, AMQConstant.CONNECTION_FORCED, replyText);
         }
     }
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java Fri Mar 30 13:44:25 2012
@@ -28,12 +28,17 @@ import java.util.List;
 
 public interface IConnectionRegistry
 {
+    public static final String BROKER_SHUTDOWN_REPLY_TEXT = "Broker is shutting down";
+    public static final String VHOST_PASSIVATE_REPLY_TEXT = "Virtual host is being passivated";
+
     public void initialise();
 
     public void close() throws AMQException;
-    
+
+    public void close(String replyText) throws AMQException;
+
     public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message);
-    
+
     public List<AMQConnectionModel> getConnections();
 
     public void registerConnection(AMQConnectionModel connnection);

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Fri Mar 30 13:44:25 2012
@@ -20,8 +20,6 @@
  */
 package org.apache.qpid.server.exchange;
 
-import org.apache.log4j.Logger;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.binding.Binding;
@@ -125,10 +123,18 @@ public abstract class AbstractExchange i
         _autoDelete = autoDelete;
         _ticket = ticket;
 
-        // TODO - fix
         _id = getConfigStore().createId();
 
         getConfigStore().addConfiguredObject(this);
+        createAndRegisterMBean();
+        _logSubject = new ExchangeLogSubject(this, this.getVirtualHost());
+
+        // Log Exchange creation
+        CurrentActor.get().message(ExchangeMessages.CREATED(String.valueOf(getTypeShortString()), String.valueOf(name), durable));
+    }
+
+    private void createAndRegisterMBean()
+    {
         try
         {
             _exchangeMbean = createMBean();
@@ -136,12 +142,8 @@ public abstract class AbstractExchange i
         }
         catch (JMException e)
         {
-            getLogger().error(e);
+            throw new RuntimeException("Failed to register mbean",e);
         }
-        _logSubject = new ExchangeLogSubject(this, this.getVirtualHost());
-
-        // Log Exchange creation
-        CurrentActor.get().message(ExchangeMessages.CREATED(String.valueOf(getTypeShortString()), String.valueOf(name), durable));
     }
 
     public ConfigStore getConfigStore()
@@ -149,8 +151,6 @@ public abstract class AbstractExchange i
         return getVirtualHost().getConfigStore();
     }
 
-    public abstract Logger getLogger();
-
     public boolean isDurable()
     {
         return _durable;
@@ -324,8 +324,7 @@ public abstract class AbstractExchange i
 
     public Map<String, Object> getArguments()
     {
-        // TODO - Fix
-        return Collections.EMPTY_MAP;
+        return Collections.emptyMap();
     }
 
     public UUID getId()

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Fri Mar 30 13:44:25 2012
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.exchange;
 
 import org.apache.log4j.Logger;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.exchange.ExchangeDefaults;
@@ -36,7 +35,7 @@ import java.util.concurrent.ConcurrentMa
 
 public class DefaultExchangeRegistry implements ExchangeRegistry
 {
-    private static final Logger _log = Logger.getLogger(DefaultExchangeRegistry.class);
+    private static final Logger LOGGER = Logger.getLogger(DefaultExchangeRegistry.class);
 
     /**
      * Maps from exchange name to exchange instance
@@ -59,8 +58,6 @@ public class DefaultExchangeRegistry imp
         new ExchangeInitialiser().initialise(_host.getExchangeFactory(), this, getDurableConfigurationStore());
     }
 
-
-
     public DurableConfigurationStore getDurableConfigurationStore()
     {
         return _host.getMessageStore();
@@ -153,4 +150,28 @@ public class DefaultExchangeRegistry imp
         }
     }
 
+    @Override
+    public void clearAndUnregisterMbeans()
+    {
+        for (final AMQShortString exchangeName : getExchangeNames())
+        {
+            final Exchange exchange = getExchange(exchangeName);
+
+            if (exchange instanceof AbstractExchange)
+            {
+                AbstractExchange abstractExchange = (AbstractExchange) exchange;
+                try
+                {
+                    abstractExchange.getManagedObject().unregister();
+                }
+                catch (AMQException e)
+                {
+                    LOGGER.warn("Failed to unregister mbean", e);
+                }
+            }
+        }
+        _exchangeMap.clear();
+        _exchangeMapStr.clear();
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java Fri Mar 30 13:44:25 2012
@@ -131,12 +131,6 @@ public class DirectExchange extends Abst
         return new DirectExchangeMBean(this);
     }
 
-    public Logger getLogger()
-    {
-        return _logger;
-    }
-
-
     public List<? extends BaseQueue> doRoute(InboundMessage payload)
     {
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java Fri Mar 30 13:44:25 2012
@@ -27,7 +27,8 @@ import org.apache.qpid.server.store.Dura
 
 public class ExchangeInitialiser
 {
-    public void initialise(ExchangeFactory factory, ExchangeRegistry registry, DurableConfigurationStore store) throws AMQException{
+    public void initialise(ExchangeFactory factory, ExchangeRegistry registry, DurableConfigurationStore store) throws AMQException
+    {
         for (ExchangeType<? extends Exchange> type : factory.getRegisteredTypes())
         {
             define (registry, factory, type.getDefaultExchangeName(), type.getName(), store);
@@ -44,7 +45,6 @@ public class ExchangeInitialiser
         {
             Exchange exchange = f.createExchange(name, type, true, false, 0);
             r.registerExchange(exchange);
-
             if(exchange.isDurable())
             {
                 store.createExchange(exchange);

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java Fri Mar 30 13:44:25 2012
@@ -51,5 +51,7 @@ public interface ExchangeRegistry
 
     Exchange getExchange(String exchangeName);
 
-    void unregisterExchange(String exchange, boolean ifUnused)  throws ExchangeInUseException, AMQException;;
+    void unregisterExchange(String exchange, boolean ifUnused)  throws ExchangeInUseException, AMQException;
+
+    void clearAndUnregisterMbeans();
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Fri Mar 30 13:44:25 2012
@@ -52,11 +52,6 @@ public class FanoutExchange extends Abst
         return new FanoutExchangeMBean(this);
     }
 
-    public Logger getLogger()
-    {
-        return _logger;
-    }
-
     public static final ExchangeType<FanoutExchange> TYPE = new ExchangeType<FanoutExchange>()
     {
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Fri Mar 30 13:44:25 2012
@@ -231,11 +231,6 @@ public class HeadersExchange extends Abs
         return new HeadersExchangeMBean(this);
     }
 
-    public Logger getLogger()
-    {
-        return _logger;
-    }
-
     protected void onBind(final Binding binding)
     {
         String bindingKey = binding.getBindingKey();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Fri Mar 30 13:44:25 2012
@@ -407,11 +407,6 @@ public class TopicExchange extends Abstr
         return new TopicExchangeMBean(this);
     }
 
-    public Logger getLogger()
-    {
-        return _logger;
-    }
-
     private Collection<AMQQueue> getMatchedQueues(InboundMessage message, AMQShortString routingKey)
     {
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java Fri Mar 30 13:44:25 2012
@@ -32,6 +32,7 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.state.AMQState;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.State;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class ConnectionOpenMethodHandler implements StateAwareMethodListener<ConnectionOpenBody>
@@ -82,6 +83,10 @@ public class ConnectionOpenMethodHandler
             {
                 throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied: '" + virtualHost.getName() + "'");
             }
+            else if (virtualHost.getState() != State.ACTIVE)
+            {
+                throw body.getConnectionException(AMQConstant.CONNECTION_FORCED, "Virtual host '" + virtualHost.getName() + "' is not active");
+            }
 
             session.setVirtualHost(virtualHost);
 
@@ -89,10 +94,10 @@ public class ConnectionOpenMethodHandler
             if (session.getContextKey() == null)
             {
                 session.setContextKey(generateClientID());
-            }            
+            }
 
             MethodRegistry methodRegistry = session.getMethodRegistry();
-            AMQMethodBody responseBody =  methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());            
+            AMQMethodBody responseBody =  methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
 
             stateManager.changeState(AMQState.CONNECTION_OPEN);
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ConfigStore_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ConfigStore_logmessages.properties?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ConfigStore_logmessages.properties (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ConfigStore_logmessages.properties Fri Mar 30 13:44:25 2012
@@ -18,8 +18,7 @@
 #
 # Default File used for all non-defined locales.
 
-# 0 - name
-CREATED = CFG-1001 : Created : {0}
+CREATED = CFG-1001 : Created
 # 0 - path
 STORE_LOCATION = CFG-1002 : Store location : {0}
 CLOSE = CFG-1003 : Closed

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties Fri Mar 30 13:44:25 2012
@@ -18,11 +18,11 @@
 #
 # Default File used for all non-defined locales.
 #
-# 0 - name
-CREATED = MST-1001 : Created : {0}
+CREATED = MST-1001 : Created
 # 0 - path
 STORE_LOCATION = MST-1002 : Store location : {0}
 CLOSED = MST-1003 : Closed
 RECOVERY_START = MST-1004 : Recovery Start
 RECOVERED = MST-1005 : Recovered {0,number} messages
-RECOVERY_COMPLETE = MST-1006 : Recovery Complete
\ No newline at end of file
+RECOVERY_COMPLETE = MST-1006 : Recovery Complete
+PASSIVATE = MST-1007 : Store Passivated

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties Fri Mar 30 13:44:25 2012
@@ -19,8 +19,7 @@
 # Default File used for all non-defined locales.
 #
 #
-# 0 - name
-CREATED = TXN-1001 : Created : {0}
+CREATED = TXN-1001 : Created
 # 0 - path
 STORE_LOCATION = TXN-1002 : Store location : {0}
 CLOSED = TXN-1003 : Closed

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java Fri Mar 30 13:44:25 2012
@@ -40,7 +40,8 @@ public class BindingLogSubject extends A
     public BindingLogSubject(String routingKey, Exchange exchange,
                              AMQQueue queue)
     {
-        setLogStringWithFormat(BINDING_FORMAT, queue.getVirtualHost().getName(),
+        setLogStringWithFormat(BINDING_FORMAT,
+                               queue.getVirtualHost().getName(),
                                exchange.getTypeShortString(),
                                exchange.getNameShortString(),
                                queue.getNameShortString(),

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java Fri Mar 30 13:44:25 2012
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.logging.subjects;
 
-import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.STORE_FORMAT;
@@ -28,10 +27,9 @@ import static org.apache.qpid.server.log
 public class MessageStoreLogSubject extends AbstractLogSubject
 {
 
-    /** Create an ExchangeLogSubject that Logs in the following format. */
-    public MessageStoreLogSubject(VirtualHost vhost, MessageStore store)
+    /** Create an MessageStoreLogSubject that Logs in the following format. */
+    public MessageStoreLogSubject(VirtualHost vhost, String messageStoreName)
     {
-        setLogStringWithFormat(STORE_FORMAT, vhost.getName(),
-                               store.getClass().getSimpleName());
+        setLogStringWithFormat(STORE_FORMAT, vhost.getName(), messageStoreName);
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java Fri Mar 30 13:44:25 2012
@@ -20,7 +20,10 @@
  */
 package org.apache.qpid.server.queue;
 
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Collection;
@@ -29,6 +32,8 @@ import java.util.concurrent.ConcurrentMa
 
 public class DefaultQueueRegistry implements QueueRegistry
 {
+    private static final Logger LOGGER = Logger.getLogger(DefaultExchangeRegistry.class);
+
     private ConcurrentMap<AMQShortString, AMQQueue> _queueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>();
 
     private final VirtualHost _virtualHost;
@@ -72,4 +77,22 @@ public class DefaultQueueRegistry implem
     {
         return getQueue(new AMQShortString(queue));
     }
+
+    @Override
+    public void stopAllAndUnregisterMBeans()
+    {
+        for (final AMQQueue queue : getQueues())
+        {
+            queue.stop();
+            try
+            {
+                queue.getManagedObject().unregister();
+            }
+            catch (AMQException e)
+            {
+                LOGGER.warn("Failed to unregister mbean", e);
+            }
+        }
+        _queueMap.clear();
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java Fri Mar 30 13:44:25 2012
@@ -40,4 +40,6 @@ public interface QueueRegistry
     Collection<AMQQueue> getQueues();
 
     AMQQueue getQueue(String queue);
+
+    void stopAllAndUnregisterMBeans();
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java Fri Mar 30 13:44:25 2012
@@ -28,7 +28,6 @@ import org.apache.qpid.framing.FieldTabl
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.federation.Bridge;
 import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.queue.AMQQueue;
 
 public interface DurableConfigurationStore
@@ -46,13 +45,11 @@ public interface DurableConfigurationSto
      * @param name             The name to be used by this store
      * @param recoveryHandler  Handler to be called as the store recovers on start up
      * @param config           The apache commons configuration object.
-     *
      * @throws Exception If any error occurs that means the store is unable to configure itself.
      */
     void configureConfigStore(String name,
                               ConfigurationRecoveryHandler recoveryHandler,
-                              Configuration config,
-                              LogSubject logSubject) throws Exception;
+                              Configuration config) throws Exception;
     /**
      * Makes the specified exchange persistent.
      *

Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java (from r1307317, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java&r1=1307317&r2=1307416&rev=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java Fri Mar 30 13:44:25 2012
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,26 +17,14 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import java.util.Collection;
+package org.apache.qpid.server.store;
 
-public interface QueueRegistry
+public enum Event
 {
-    VirtualHost getVirtualHost();
-
-    void registerQueue(AMQQueue queue);
-
-    void unregisterQueue(AMQShortString name);
-
-    AMQQueue getQueue(AMQShortString name);
-
-    Collection<AMQShortString> getQueueNames();
-
-    Collection<AMQQueue> getQueues();
-
-    AMQQueue getQueue(String queue);
+    BEFORE_ACTIVATE,
+    AFTER_ACTIVATE,
+    BEFORE_PASSIVATE,
+    AFTER_PASSIVATE,
+    BEFORE_CLOSE,
+    AFTER_CLOSE
 }

Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventListener.java (from r1307317, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventListener.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventListener.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java&r1=1307317&r2=1307416&rev=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventListener.java Fri Mar 30 13:44:25 2012
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,26 +17,9 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import java.util.Collection;
+package org.apache.qpid.server.store;
 
-public interface QueueRegistry
+public interface EventListener
 {
-    VirtualHost getVirtualHost();
-
-    void registerQueue(AMQQueue queue);
-
-    void unregisterQueue(AMQShortString name);
-
-    AMQQueue getQueue(AMQShortString name);
-
-    Collection<AMQShortString> getQueueNames();
-
-    Collection<AMQQueue> getQueues();
-
-    AMQQueue getQueue(String queue);
+    public void event(Event event);
 }

Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java (from r1307317, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java&r1=1307317&r2=1307416&rev=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java Fri Mar 30 13:44:25 2012
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,24 +19,30 @@
  */
 package org.apache.qpid.server.store;
 
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
-import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
-public abstract class AbstractMessageStore implements MessageStore
+public class EventManager
 {
-    private LogSubject _logSubject;
+    private ConcurrentMap<Event, List<EventListener>> _listeners = new ConcurrentHashMap<Event, List<EventListener>>();
 
-    public void configure(VirtualHost virtualHost) throws Exception
+    public void addEventListener(EventListener listener, Event event)
     {
-        _logSubject = new MessageStoreLogSubject(virtualHost, this);
-        CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
+        _listeners.putIfAbsent(event, new ArrayList<EventListener>());
+        final List<EventListener> list = _listeners.get(event);
+        list.add(listener);
     }
 
-    public void close() throws Exception
+    public void notifyEvent(Event event)
     {
-        CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
+        if (_listeners.containsKey(event))
+        {
+            for (EventListener listener : _listeners.get(event))
+            {
+                listener.event(event);
+            }
+        }
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Fri Mar 30 13:44:25 2012
@@ -20,103 +20,58 @@
  */
 package org.apache.qpid.server.store;
 
-import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.federation.Bridge;
-import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
 import org.apache.qpid.server.message.EnqueableMessage;
-import org.apache.qpid.server.queue.AMQQueue;
 
-import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-/** A simple message store that stores the messages in a threadsafe structure in memory. */
-public class MemoryMessageStore implements MessageStore
+/** A simple message store that stores the messages in a thread-safe structure in memory. */
+public class MemoryMessageStore extends NullMessageStore
 {
-    private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
-
-    private static final int DEFAULT_HASHTABLE_CAPACITY = 50000;
-
-    private static final String HASHTABLE_CAPACITY_CONFIG = "hashtable-capacity";
-
-
     private final AtomicLong _messageId = new AtomicLong(1);
-    private AtomicBoolean _closed = new AtomicBoolean(false);
-    private LogSubject _logSubject;
+    private final AtomicBoolean _closed = new AtomicBoolean(false);
 
     private static final Transaction IN_MEMORY_TRANSACTION = new Transaction()
     {
-        public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
+        @Override
+        public StoreFuture commitTranAsync() throws AMQStoreException
         {
+            return StoreFuture.IMMEDIATE_FUTURE;
         }
 
-        public void dequeueMessage(TransactionLogResource  queue, EnqueableMessage message) throws AMQStoreException
+        @Override
+        public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
         {
         }
 
-        public void commitTran() throws AMQStoreException
+        @Override
+        public void dequeueMessage(TransactionLogResource  queue, EnqueableMessage message) throws AMQStoreException
         {
         }
 
-        public StoreFuture commitTranAsync() throws AMQStoreException
+        @Override
+        public void commitTran() throws AMQStoreException
         {
-            return StoreFuture.IMMEDIATE_FUTURE;
         }
 
+        @Override
         public void abortTran() throws AMQStoreException
         {
         }
 
+        @Override
         public void removeXid(long format, byte[] globalId, byte[] branchId)
         {
         }
 
+        @Override
         public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
         {
         }
-
     };
 
-    public void configureConfigStore(String name, ConfigurationRecoveryHandler handler, Configuration configuration, LogSubject logSubject) throws Exception
-    {
-        _logSubject = logSubject;
-        CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED(this.getClass().getName()));
-
-
-    }
-
-    public void configureMessageStore(String name,
-                                      MessageStoreRecoveryHandler recoveryHandler,
-                                      TransactionLogRecoveryHandler tlogRecoveryHandler,
-                                      Configuration config, LogSubject logSubject) throws Exception
-    {
-        if(_logSubject == null)
-        {
-            _logSubject = logSubject;
-        }
-        int hashtableCapacity = config.getInt(name + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
-        _log.info("Using capacity " + hashtableCapacity + " for hash tables");
-        CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
-    }
-
-    public void close() throws Exception
-    {
-        _closed.getAndSet(true);
-        CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
-
-    }
-
+    @Override
     public StoredMessage addMessage(StorableMessageMetaData metaData)
     {
         final long id = _messageId.getAndIncrement();
@@ -125,96 +80,21 @@ public class MemoryMessageStore implemen
         return message;
     }
 
-
-    public void createExchange(Exchange exchange) throws AMQStoreException
-    {
-
-    }
-
-    public void removeExchange(Exchange exchange) throws AMQStoreException
-    {
-
-    }
-
-    public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
-    {
-
-    }
-
-    public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
-    {
-
-    }
-
-
-    public void createQueue(AMQQueue queue) throws AMQStoreException
-    {
-        // Not requred to do anything
-    }
-
-    public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
-    {
-        // Not required to do anything
-    }
-
-    public void removeQueue(final AMQQueue queue) throws AMQStoreException
-    {
-        // Not required to do anything
-    }
-    
-    public void updateQueue(final AMQQueue queue) throws AMQStoreException
-    {
-        // Not required to do anything
-    }
-
-    public void createBrokerLink(final BrokerLink link) throws AMQStoreException
-    {
-
-    }
-
-    public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
-    {
-
-    }
-
-    public void createBridge(final Bridge bridge) throws AMQStoreException
-    {
-
-    }
-
-    public void deleteBridge(final Bridge bridge) throws AMQStoreException
-    {
-
-    }
-
+    @Override
     public Transaction newTransaction()
     {
         return IN_MEMORY_TRANSACTION;
     }
 
-
-    public List<AMQQueue> createQueues() throws AMQException
-    {
-        return null;
-    }
-
-    public Long getNewMessageId()
-    {
-        return _messageId.getAndIncrement();
-    }
-
+    @Override
     public boolean isPersistent()
     {
         return false;
     }
 
-    private void checkNotClosed() throws MessageStoreClosedException
-     {
-        if (_closed.get())
-        {
-            throw new MessageStoreClosedException();
-        }
+    @Override
+    public void close() throws Exception
+    {
+        _closed.getAndSet(true);
     }
-
-
 }

Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java (from r1307317, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java&r1=1307317&r2=1307416&rev=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java Fri Mar 30 13:44:25 2012
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,23 +20,22 @@
 package org.apache.qpid.server.store;
 
 import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
-import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.decorators.EventDecorator;
+import org.apache.qpid.server.store.decorators.OperationalLoggingDecorator;
 
-public abstract class AbstractMessageStore implements MessageStore
+public class MemoryMessageStoreFactory implements MessageStoreFactory
 {
-    private LogSubject _logSubject;
 
-    public void configure(VirtualHost virtualHost) throws Exception
+    @Override
+    public MessageStore createMessageStore(LogSubject logSubject)
     {
-        _logSubject = new MessageStoreLogSubject(virtualHost, this);
-        CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
+        return new OperationalLoggingDecorator(new EventDecorator(new MemoryMessageStore()), logSubject);
     }
 
-    public void close() throws Exception
+    @Override
+    public String getStoreClassName()
     {
-        CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
+        return MemoryMessageStore.class.getSimpleName();
     }
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Fri Mar 30 13:44:25 2012
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.store;
 
 import org.apache.commons.configuration.Configuration;
-import org.apache.qpid.server.logging.LogSubject;
 
 /**
  * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages.
@@ -42,9 +41,9 @@ public interface MessageStore extends Du
     void configureMessageStore(String name,
                                MessageStoreRecoveryHandler messageRecoveryHandler,
                                TransactionLogRecoveryHandler tlogRecoveryHandler,
-                               Configuration config, LogSubject logSubject) throws Exception;
-
+                               Configuration config) throws Exception;
 
+    void activate() throws Exception;
 
     public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData);
 
@@ -65,4 +64,8 @@ public interface MessageStore extends Du
      */
     void close() throws Exception;
 
+    void addEventListener(EventListener eventListener, Event event);
+
+    MessageStore getUnderlyingStore();
+
 }

Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java (from r1307317, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java&r1=1307317&r2=1307416&rev=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java Fri Mar 30 13:44:25 2012
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,26 +17,11 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+package org.apache.qpid.server.store;
 
-import java.util.Collection;
-
-public interface QueueRegistry
+public class MessageStoreConstants
 {
-    VirtualHost getVirtualHost();
-
-    void registerQueue(AMQQueue queue);
-
-    void unregisterQueue(AMQShortString name);
-
-    AMQQueue getQueue(AMQShortString name);
-
-    Collection<AMQShortString> getQueueNames();
 
-    Collection<AMQQueue> getQueues();
+    public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
 
-    AMQQueue getQueue(String queue);
 }

Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java (from r1307317, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java&r1=1307317&r2=1307416&rev=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java Fri Mar 30 13:44:25 2012
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,26 +17,13 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+package org.apache.qpid.server.store;
 
-import java.util.Collection;
+import org.apache.qpid.server.logging.LogSubject;
 
-public interface QueueRegistry
+public interface MessageStoreFactory
 {
-    VirtualHost getVirtualHost();
-
-    void registerQueue(AMQQueue queue);
-
-    void unregisterQueue(AMQShortString name);
-
-    AMQQueue getQueue(AMQShortString name);
-
-    Collection<AMQShortString> getQueueNames();
-
-    Collection<AMQQueue> getQueues();
+    MessageStore createMessageStore(LogSubject logSubject);
 
-    AMQQueue getQueue(String queue);
+    String getStoreClassName();
 }

Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java?rev=1307416&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java Fri Mar 30 13:44:25 2012
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.federation.Bridge;
+import org.apache.qpid.server.federation.BrokerLink;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public class NullMessageStore implements MessageStore
+{
+    @Override
+    public void configureConfigStore(String name,
+                                     ConfigurationRecoveryHandler recoveryHandler,
+                                     Configuration config) throws Exception
+    {
+    }
+
+    @Override
+    public void createExchange(Exchange exchange) throws AMQStoreException
+    {
+    }
+
+    @Override
+    public void removeExchange(Exchange exchange) throws AMQStoreException
+    {
+    }
+
+    @Override
+    public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
+    {
+    }
+
+    @Override
+    public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
+    {
+    }
+
+    @Override
+    public void createQueue(AMQQueue queue) throws AMQStoreException
+    {
+    }
+
+    @Override
+    public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
+    {
+    }
+
+    @Override
+    public void removeQueue(AMQQueue queue) throws AMQStoreException
+    {
+    }
+
+    @Override
+    public void updateQueue(AMQQueue queue) throws AMQStoreException
+    {
+    }
+
+    @Override
+    public void createBrokerLink(final BrokerLink link) throws AMQStoreException
+    {
+    }
+
+    @Override
+    public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
+    {
+    }
+
+    @Override
+    public void createBridge(final Bridge bridge) throws AMQStoreException
+    {
+    }
+
+    @Override
+    public void deleteBridge(final Bridge bridge) throws AMQStoreException
+    {
+    }
+
+    @Override
+    public void configureMessageStore(String name,
+            MessageStoreRecoveryHandler recoveryHandler,
+            TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception
+    {
+    }
+
+    @Override
+    public void close() throws Exception
+    {
+    }
+
+    @Override
+    public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
+    {
+        return null;
+    }
+
+    @Override
+    public boolean isPersistent()
+    {
+        return false;
+    }
+
+    @Override
+    public Transaction newTransaction()
+    {
+        return null;
+    }
+
+    @Override
+    public void activate() throws Exception
+    {
+    }
+
+    @Override
+    public void addEventListener(EventListener eventListener, Event event)
+    {
+    }
+
+    @Override
+    public MessageStore getUnderlyingStore()
+    {
+        return this;
+    }
+}
\ No newline at end of file

Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java (from r1307317, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java&r1=1307317&r2=1307416&rev=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java Fri Mar 30 13:44:25 2012
@@ -18,26 +18,14 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.queue;
+package org.apache.qpid.server.store;
 
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import java.util.Collection;
-
-public interface QueueRegistry
+public enum State
 {
-    VirtualHost getVirtualHost();
-
-    void registerQueue(AMQQueue queue);
-
-    void unregisterQueue(AMQShortString name);
-
-    AMQQueue getQueue(AMQShortString name);
-
-    Collection<AMQShortString> getQueueNames();
-
-    Collection<AMQQueue> getQueues();
-
-    AMQQueue getQueue(String queue);
-}
+    INITIAL,
+    CONFIGURING,
+    RECOVERING,
+    ACTIVE,
+    CLOSING,
+    CLOSED
+}
\ No newline at end of file

Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java?rev=1307416&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java Fri Mar 30 13:44:25 2012
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+
+public class StateManager
+{
+    private State _state = State.INITIAL;
+
+    public synchronized State getState()
+    {
+        return _state;
+    }
+
+    public synchronized void stateTransition(final State current, final State desired)
+    {
+        if (_state != current)
+        {
+            throw new IllegalStateException("Cannot transition to the state: " + desired + "; need to be in state: " + current
+                                   + "; currently in state: " + _state);
+        }
+        _state = desired;
+    }
+
+    public synchronized boolean isInState(State testedState)
+    {
+        return _state.equals(testedState);
+    }
+
+    public synchronized boolean isNotInState(State testedState)
+    {
+        return !isInState(testedState);
+    }
+
+    public synchronized void checkInState(State checkedState)
+    {
+        if (isNotInState(checkedState))
+        {
+            throw new IllegalStateException("Unexpected state. Was : " + _state + " but expected : " + checkedState);
+        }
+    }
+}

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java Fri Mar 30 13:44:25 2012
@@ -31,11 +31,10 @@ public interface StoreFuture
 
         public void waitForCompletion()
         {
-
         }
     };
 
     boolean isComplete();
 
     void waitForCompletion();
-}
\ No newline at end of file
+}

Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/AbstractDecorator.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/AbstractDecorator.java?rev=1307416&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/AbstractDecorator.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/AbstractDecorator.java Fri Mar 30 13:44:25 2012
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.decorators;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.federation.Bridge;
+import org.apache.qpid.server.federation.BrokerLink;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.Event;
+import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
+
+/**
+ * AbstractDecorator.  All methods <bMUST</b> perform simple
+ * delegation to their equivalent decorated counterpart without
+ * change.
+ */
+public class AbstractDecorator implements MessageStore
+{
+    protected final MessageStore _decoratedStore;
+
+    public AbstractDecorator(MessageStore store)
+    {
+        _decoratedStore = store;
+    }
+
+    @Override
+    public void configureMessageStore(String name,
+            MessageStoreRecoveryHandler messageRecoveryHandler,
+            TransactionLogRecoveryHandler tlogRecoveryHandler,
+            Configuration config) throws Exception
+    {
+        _decoratedStore.configureMessageStore(name, messageRecoveryHandler,
+                tlogRecoveryHandler, config);
+    }
+
+    @Override
+    public void configureConfigStore(String name,
+            ConfigurationRecoveryHandler recoveryHandler, Configuration config) throws Exception
+    {
+        _decoratedStore.configureConfigStore(name, recoveryHandler, config);
+    }
+
+    @Override
+    public void activate() throws Exception
+    {
+        _decoratedStore.activate();
+    }
+
+    @Override
+    public void close() throws Exception
+    {
+        _decoratedStore.close();
+    }
+
+    @Override
+    public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(
+            T metaData)
+    {
+        return _decoratedStore.addMessage(metaData);
+    }
+
+    @Override
+    public void createExchange(Exchange exchange) throws AMQStoreException
+    {
+        _decoratedStore.createExchange(exchange);
+    }
+
+    @Override
+    public boolean isPersistent()
+    {
+        return _decoratedStore.isPersistent();
+    }
+
+    @Override
+    public Transaction newTransaction()
+    {
+        return _decoratedStore.newTransaction();
+    }
+
+    @Override
+    public void removeExchange(Exchange exchange) throws AMQStoreException
+    {
+        _decoratedStore.removeExchange(exchange);
+    }
+
+    @Override
+    public void addEventListener(EventListener eventListener, Event event)
+    {
+        _decoratedStore.addEventListener(eventListener, event);
+    }
+
+    @Override
+    public void bindQueue(Exchange exchange, AMQShortString routingKey,
+            AMQQueue queue, FieldTable args) throws AMQStoreException
+    {
+        _decoratedStore.bindQueue(exchange, routingKey, queue, args);
+    }
+
+    @Override
+    public void unbindQueue(Exchange exchange, AMQShortString routingKey,
+            AMQQueue queue, FieldTable args) throws AMQStoreException
+    {
+        _decoratedStore.unbindQueue(exchange, routingKey, queue, args);
+    }
+
+    @Override
+    public void createQueue(AMQQueue queue) throws AMQStoreException
+    {
+        _decoratedStore.createQueue(queue);
+    }
+
+    @Override
+    public void createQueue(AMQQueue queue, FieldTable arguments)
+            throws AMQStoreException
+    {
+        _decoratedStore.createQueue(queue, arguments);
+    }
+
+    @Override
+    public void removeQueue(AMQQueue queue) throws AMQStoreException
+    {
+        _decoratedStore.removeQueue(queue);
+    }
+
+    @Override
+    public void updateQueue(AMQQueue queue) throws AMQStoreException
+    {
+        _decoratedStore.updateQueue(queue);
+    }
+
+    @Override
+    public void createBrokerLink(BrokerLink link) throws AMQStoreException
+    {
+        _decoratedStore.createBrokerLink(link);
+    }
+
+    @Override
+    public void deleteBrokerLink(BrokerLink link) throws AMQStoreException
+    {
+        _decoratedStore.deleteBrokerLink(link);
+    }
+
+    @Override
+    public void createBridge(Bridge bridge) throws AMQStoreException
+    {
+        _decoratedStore.createBridge(bridge);
+    }
+
+    @Override
+    public void deleteBridge(Bridge bridge) throws AMQStoreException
+    {
+        _decoratedStore.deleteBridge(bridge);
+    }
+
+    @Override
+    public MessageStore getUnderlyingStore()
+    {
+        return _decoratedStore.getUnderlyingStore();
+    }
+}

Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/EventDecorator.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/EventDecorator.java?rev=1307416&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/EventDecorator.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/EventDecorator.java Fri Mar 30 13:44:25 2012
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.decorators;
+
+import org.apache.qpid.server.store.Event;
+import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.EventManager;
+import org.apache.qpid.server.store.MessageStore;
+
+public class EventDecorator extends AbstractDecorator
+{
+    protected final EventManager _eventManager;
+
+    public EventDecorator(MessageStore store)
+    {
+        super(store);
+        _eventManager = new EventManager();
+    }
+
+    @Override
+    public void activate() throws Exception
+    {
+        _eventManager.notifyEvent(Event.BEFORE_ACTIVATE);
+        _decoratedStore.activate();
+        _eventManager.notifyEvent(Event.AFTER_ACTIVATE);
+    }
+
+    @Override
+    public void close() throws Exception
+    {
+        _eventManager.notifyEvent(Event.BEFORE_CLOSE);
+        _decoratedStore.close();
+        _eventManager.notifyEvent(Event.AFTER_CLOSE);
+    }
+
+    @Override
+    public void addEventListener(EventListener eventListener, Event event)
+    {
+        _eventManager.addEventListener(eventListener, event);
+    }
+}

Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecorator.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecorator.java?rev=1307416&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecorator.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecorator.java Fri Mar 30 13:44:25 2012
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.decorators;
+
+import static org.apache.qpid.server.store.MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
+
+public class OperationalLoggingDecorator extends AbstractDecorator
+{
+    protected final LogSubject _logSubject;
+
+    public OperationalLoggingDecorator(final MessageStore decoratedStore, LogSubject logSubject)
+    {
+        super(decoratedStore);
+        _logSubject = logSubject;
+    }
+
+    @Override
+    public void configureMessageStore(String name,
+            MessageStoreRecoveryHandler messageRecoveryHandler,
+            TransactionLogRecoveryHandler tlogRecoveryHandler,
+            Configuration config) throws Exception
+    {
+        CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED());
+        CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED());
+
+        if (config != null && config.getString(ENVIRONMENT_PATH_PROPERTY) != null)
+        {
+            CurrentActor.get().message(_logSubject, MessageStoreMessages.STORE_LOCATION(config.getString(ENVIRONMENT_PATH_PROPERTY)));
+        }
+
+        _decoratedStore.configureMessageStore(name, messageRecoveryHandler,
+                tlogRecoveryHandler, config);
+    }
+
+    @Override
+    public void configureConfigStore(String name,
+            ConfigurationRecoveryHandler recoveryHandler, Configuration config) throws Exception
+    {
+        CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED());
+
+        _decoratedStore.configureConfigStore(name, recoveryHandler, config);
+    }
+
+    @Override
+    public void activate() throws Exception
+    {
+        CurrentActor.get().message(_logSubject, MessageStoreMessages.RECOVERY_START());
+        _decoratedStore.activate();
+        CurrentActor.get().message(_logSubject, MessageStoreMessages.RECOVERY_COMPLETE());
+    }
+
+    @Override
+    public void close() throws Exception
+    {
+        CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
+        _decoratedStore.close();
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org