You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2012/03/30 15:44:29 UTC
svn commit: r1307416 [3/5] - in /qpid/trunk/qpid/java: ./
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/
broker-plugins/extras/src/main/java/org/apache/qpid/extras/exchang...
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java Fri Mar 30 13:44:25 2012
@@ -34,7 +34,6 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.messages.BindingMessages;
import org.apache.qpid.server.logging.subjects.BindingLogSubject;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collections;
@@ -44,38 +43,14 @@ import java.util.concurrent.ConcurrentHa
public class BindingFactory
{
private final VirtualHost _virtualHost;
- private final DurableConfigurationStore.Source _configSource;
- private final Exchange _defaultExchange;
private final ConcurrentHashMap<BindingImpl, BindingImpl> _bindings = new ConcurrentHashMap<BindingImpl, BindingImpl>();
-
public BindingFactory(final VirtualHost vhost)
{
- this(vhost, vhost.getExchangeRegistry().getDefaultExchange());
- }
-
- public BindingFactory(final DurableConfigurationStore.Source configSource, final Exchange defaultExchange)
- {
- _configSource = configSource;
- _defaultExchange = defaultExchange;
- if (configSource instanceof VirtualHost)
- {
- _virtualHost = (VirtualHost) configSource;
- }
- else
- {
- _virtualHost = null;
- }
- }
-
- public VirtualHost getVirtualHost()
- {
- return _virtualHost;
+ _virtualHost = vhost;
}
-
-
private final class BindingImpl extends Binding implements AMQQueue.Task, Exchange.Task, BindingConfig
{
private final BindingLogSubject _logSubject;
@@ -156,30 +131,38 @@ public class BindingFactory
private boolean makeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments, boolean restore, boolean force) throws AMQSecurityException, AMQInternalException
{
assert queue != null;
+ final Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange();
+
if (bindingKey == null)
{
bindingKey = "";
}
if (exchange == null)
{
- exchange = _defaultExchange;
+ exchange = defaultExchange;
}
if (arguments == null)
{
arguments = Collections.emptyMap();
}
+ if (exchange == null)
+ {
+ throw new IllegalArgumentException("exchange cannot be null");
+ }
+
// The default exchange bindings must reflect the existence of queues, allow
// all operations on it to succeed. It is up to the broker to prevent illegal
// attempts at binding to this exchange, not the ACLs.
- if(exchange != _defaultExchange)
+ if(exchange != defaultExchange)
{
//Perform ACLs
- if (!getVirtualHost().getSecurityManager().authoriseBind(exchange, queue, new AMQShortString(bindingKey)))
+ if (!_virtualHost.getSecurityManager().authoriseBind(exchange, queue, new AMQShortString(bindingKey)))
{
throw new AMQSecurityException("Permission denied: binding " + bindingKey);
}
}
+
BindingImpl b = new BindingImpl(bindingKey,queue,exchange,arguments);
BindingImpl existingMapping = _bindings.putIfAbsent(b,b);
@@ -192,7 +175,7 @@ public class BindingFactory
if (b.isDurable() && !restore)
{
- _configSource.getMessageStore().bindQueue(exchange,new AMQShortString(bindingKey),queue,FieldTable.convertToFieldTable(arguments));
+ _virtualHost.getMessageStore().bindQueue(exchange,new AMQShortString(bindingKey),queue,FieldTable.convertToFieldTable(arguments));
}
queue.addQueueDeleteTask(b);
@@ -212,7 +195,7 @@ public class BindingFactory
private ConfigStore getConfigStore()
{
- return getVirtualHost().getConfigStore();
+ return _virtualHost.getConfigStore();
}
public void restoreBinding(final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> argumentMap) throws AMQSecurityException, AMQInternalException
@@ -229,13 +212,15 @@ public class BindingFactory
public Binding removeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException
{
assert queue != null;
+ final Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange();
+
if (bindingKey == null)
{
bindingKey = "";
}
if (exchange == null)
{
- exchange = _defaultExchange;
+ exchange = defaultExchange;
}
if (arguments == null)
{
@@ -245,10 +230,10 @@ public class BindingFactory
// The default exchange bindings must reflect the existence of queues, allow
// all operations on it to succeed. It is up to the broker to prevent illegal
// attempts at binding to this exchange, not the ACLs.
- if(exchange != _defaultExchange)
+ if(exchange != defaultExchange)
{
// Check access
- if (!getVirtualHost().getSecurityManager().authoriseUnbind(exchange, new AMQShortString(bindingKey), queue))
+ if (!_virtualHost.getSecurityManager().authoriseUnbind(exchange, new AMQShortString(bindingKey), queue))
{
throw new AMQSecurityException("Permission denied: unbinding " + bindingKey);
}
@@ -265,7 +250,7 @@ public class BindingFactory
if (b.isDurable())
{
- _configSource.getMessageStore().unbindQueue(exchange,
+ _virtualHost.getMessageStore().unbindQueue(exchange,
new AMQShortString(bindingKey),
queue,
FieldTable.convertToFieldTable(arguments));
@@ -280,13 +265,15 @@ public class BindingFactory
public Binding getBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments)
{
assert queue != null;
+ final Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange();
+
if(bindingKey == null)
{
bindingKey = "";
}
if(exchange == null)
{
- exchange = _defaultExchange;
+ exchange = defaultExchange;
}
if(arguments == null)
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Fri Mar 30 13:44:25 2012
@@ -32,6 +32,7 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.MemoryMessageStoreFactory;
import java.util.ArrayList;
import java.util.Arrays;
@@ -102,14 +103,14 @@ public class VirtualHostConfiguration ex
return getConfig().subset("store");
}
- public String getMessageStoreClass()
+ public String getMessageStoreFactoryClass()
{
- return getStringValue("store.class", MemoryMessageStore.class.getName());
+ return getStringValue("store.factoryclass", MemoryMessageStoreFactory.class.getName());
}
- public void setMessageStoreClass(String storeClass)
+ public void setMessageStoreFactoryClass(String storeFactoryClass)
{
- getConfig().setProperty("store.class", storeClass);
+ getConfig().setProperty("store.factoryclass", storeFactoryClass);
}
public List getExchanges()
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java Fri Mar 30 13:44:25 2012
@@ -46,11 +46,19 @@ public class ConnectionRegistry implemen
/** Close all of the currently open connections. */
public void close()
{
- _logger.debug("Closing connection registry :" + _registry.size() + " connections.");
+ close(IConnectionRegistry.BROKER_SHUTDOWN_REPLY_TEXT);
+ }
+
+ public void close(final String replyText)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Closing connection registry :" + _registry.size() + " connections.");
+ }
while (!_registry.isEmpty())
{
AMQConnectionModel connection = _registry.get(0);
- closeConnection(connection, AMQConstant.CONNECTION_FORCED, "Broker is shutting down");
+ closeConnection(connection, AMQConstant.CONNECTION_FORCED, replyText);
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java Fri Mar 30 13:44:25 2012
@@ -28,12 +28,17 @@ import java.util.List;
public interface IConnectionRegistry
{
+ public static final String BROKER_SHUTDOWN_REPLY_TEXT = "Broker is shutting down";
+ public static final String VHOST_PASSIVATE_REPLY_TEXT = "Virtual host is being passivated";
+
public void initialise();
public void close() throws AMQException;
-
+
+ public void close(String replyText) throws AMQException;
+
public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message);
-
+
public List<AMQConnectionModel> getConnections();
public void registerConnection(AMQConnectionModel connnection);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Fri Mar 30 13:44:25 2012
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.exchange;
-import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.binding.Binding;
@@ -125,10 +123,18 @@ public abstract class AbstractExchange i
_autoDelete = autoDelete;
_ticket = ticket;
- // TODO - fix
_id = getConfigStore().createId();
getConfigStore().addConfiguredObject(this);
+ createAndRegisterMBean();
+ _logSubject = new ExchangeLogSubject(this, this.getVirtualHost());
+
+ // Log Exchange creation
+ CurrentActor.get().message(ExchangeMessages.CREATED(String.valueOf(getTypeShortString()), String.valueOf(name), durable));
+ }
+
+ private void createAndRegisterMBean()
+ {
try
{
_exchangeMbean = createMBean();
@@ -136,12 +142,8 @@ public abstract class AbstractExchange i
}
catch (JMException e)
{
- getLogger().error(e);
+ throw new RuntimeException("Failed to register mbean",e);
}
- _logSubject = new ExchangeLogSubject(this, this.getVirtualHost());
-
- // Log Exchange creation
- CurrentActor.get().message(ExchangeMessages.CREATED(String.valueOf(getTypeShortString()), String.valueOf(name), durable));
}
public ConfigStore getConfigStore()
@@ -149,8 +151,6 @@ public abstract class AbstractExchange i
return getVirtualHost().getConfigStore();
}
- public abstract Logger getLogger();
-
public boolean isDurable()
{
return _durable;
@@ -324,8 +324,7 @@ public abstract class AbstractExchange i
public Map<String, Object> getArguments()
{
- // TODO - Fix
- return Collections.EMPTY_MAP;
+ return Collections.emptyMap();
}
public UUID getId()
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Fri Mar 30 13:44:25 2012
@@ -21,7 +21,6 @@
package org.apache.qpid.server.exchange;
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -36,7 +35,7 @@ import java.util.concurrent.ConcurrentMa
public class DefaultExchangeRegistry implements ExchangeRegistry
{
- private static final Logger _log = Logger.getLogger(DefaultExchangeRegistry.class);
+ private static final Logger LOGGER = Logger.getLogger(DefaultExchangeRegistry.class);
/**
* Maps from exchange name to exchange instance
@@ -59,8 +58,6 @@ public class DefaultExchangeRegistry imp
new ExchangeInitialiser().initialise(_host.getExchangeFactory(), this, getDurableConfigurationStore());
}
-
-
public DurableConfigurationStore getDurableConfigurationStore()
{
return _host.getMessageStore();
@@ -153,4 +150,28 @@ public class DefaultExchangeRegistry imp
}
}
+ @Override
+ public void clearAndUnregisterMbeans()
+ {
+ for (final AMQShortString exchangeName : getExchangeNames())
+ {
+ final Exchange exchange = getExchange(exchangeName);
+
+ if (exchange instanceof AbstractExchange)
+ {
+ AbstractExchange abstractExchange = (AbstractExchange) exchange;
+ try
+ {
+ abstractExchange.getManagedObject().unregister();
+ }
+ catch (AMQException e)
+ {
+ LOGGER.warn("Failed to unregister mbean", e);
+ }
+ }
+ }
+ _exchangeMap.clear();
+ _exchangeMapStr.clear();
+ }
+
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java Fri Mar 30 13:44:25 2012
@@ -131,12 +131,6 @@ public class DirectExchange extends Abst
return new DirectExchangeMBean(this);
}
- public Logger getLogger()
- {
- return _logger;
- }
-
-
public List<? extends BaseQueue> doRoute(InboundMessage payload)
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java Fri Mar 30 13:44:25 2012
@@ -27,7 +27,8 @@ import org.apache.qpid.server.store.Dura
public class ExchangeInitialiser
{
- public void initialise(ExchangeFactory factory, ExchangeRegistry registry, DurableConfigurationStore store) throws AMQException{
+ public void initialise(ExchangeFactory factory, ExchangeRegistry registry, DurableConfigurationStore store) throws AMQException
+ {
for (ExchangeType<? extends Exchange> type : factory.getRegisteredTypes())
{
define (registry, factory, type.getDefaultExchangeName(), type.getName(), store);
@@ -44,7 +45,6 @@ public class ExchangeInitialiser
{
Exchange exchange = f.createExchange(name, type, true, false, 0);
r.registerExchange(exchange);
-
if(exchange.isDurable())
{
store.createExchange(exchange);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java Fri Mar 30 13:44:25 2012
@@ -51,5 +51,7 @@ public interface ExchangeRegistry
Exchange getExchange(String exchangeName);
- void unregisterExchange(String exchange, boolean ifUnused) throws ExchangeInUseException, AMQException;;
+ void unregisterExchange(String exchange, boolean ifUnused) throws ExchangeInUseException, AMQException;
+
+ void clearAndUnregisterMbeans();
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Fri Mar 30 13:44:25 2012
@@ -52,11 +52,6 @@ public class FanoutExchange extends Abst
return new FanoutExchangeMBean(this);
}
- public Logger getLogger()
- {
- return _logger;
- }
-
public static final ExchangeType<FanoutExchange> TYPE = new ExchangeType<FanoutExchange>()
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Fri Mar 30 13:44:25 2012
@@ -231,11 +231,6 @@ public class HeadersExchange extends Abs
return new HeadersExchangeMBean(this);
}
- public Logger getLogger()
- {
- return _logger;
- }
-
protected void onBind(final Binding binding)
{
String bindingKey = binding.getBindingKey();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Fri Mar 30 13:44:25 2012
@@ -407,11 +407,6 @@ public class TopicExchange extends Abstr
return new TopicExchangeMBean(this);
}
- public Logger getLogger()
- {
- return _logger;
- }
-
private Collection<AMQQueue> getMatchedQueues(InboundMessage message, AMQShortString routingKey)
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java Fri Mar 30 13:44:25 2012
@@ -32,6 +32,7 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.State;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class ConnectionOpenMethodHandler implements StateAwareMethodListener<ConnectionOpenBody>
@@ -82,6 +83,10 @@ public class ConnectionOpenMethodHandler
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied: '" + virtualHost.getName() + "'");
}
+ else if (virtualHost.getState() != State.ACTIVE)
+ {
+ throw body.getConnectionException(AMQConstant.CONNECTION_FORCED, "Virtual host '" + virtualHost.getName() + "' is not active");
+ }
session.setVirtualHost(virtualHost);
@@ -89,10 +94,10 @@ public class ConnectionOpenMethodHandler
if (session.getContextKey() == null)
{
session.setContextKey(generateClientID());
- }
+ }
MethodRegistry methodRegistry = session.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
+ AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
stateManager.changeState(AMQState.CONNECTION_OPEN);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ConfigStore_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ConfigStore_logmessages.properties?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ConfigStore_logmessages.properties (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ConfigStore_logmessages.properties Fri Mar 30 13:44:25 2012
@@ -18,8 +18,7 @@
#
# Default File used for all non-defined locales.
-# 0 - name
-CREATED = CFG-1001 : Created : {0}
+CREATED = CFG-1001 : Created
# 0 - path
STORE_LOCATION = CFG-1002 : Store location : {0}
CLOSE = CFG-1003 : Closed
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties Fri Mar 30 13:44:25 2012
@@ -18,11 +18,11 @@
#
# Default File used for all non-defined locales.
#
-# 0 - name
-CREATED = MST-1001 : Created : {0}
+CREATED = MST-1001 : Created
# 0 - path
STORE_LOCATION = MST-1002 : Store location : {0}
CLOSED = MST-1003 : Closed
RECOVERY_START = MST-1004 : Recovery Start
RECOVERED = MST-1005 : Recovered {0,number} messages
-RECOVERY_COMPLETE = MST-1006 : Recovery Complete
\ No newline at end of file
+RECOVERY_COMPLETE = MST-1006 : Recovery Complete
+PASSIVATE = MST-1007 : Store Passivated
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties Fri Mar 30 13:44:25 2012
@@ -19,8 +19,7 @@
# Default File used for all non-defined locales.
#
#
-# 0 - name
-CREATED = TXN-1001 : Created : {0}
+CREATED = TXN-1001 : Created
# 0 - path
STORE_LOCATION = TXN-1002 : Store location : {0}
CLOSED = TXN-1003 : Closed
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java Fri Mar 30 13:44:25 2012
@@ -40,7 +40,8 @@ public class BindingLogSubject extends A
public BindingLogSubject(String routingKey, Exchange exchange,
AMQQueue queue)
{
- setLogStringWithFormat(BINDING_FORMAT, queue.getVirtualHost().getName(),
+ setLogStringWithFormat(BINDING_FORMAT,
+ queue.getVirtualHost().getName(),
exchange.getTypeShortString(),
exchange.getNameShortString(),
queue.getNameShortString(),
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java Fri Mar 30 13:44:25 2012
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.logging.subjects;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.STORE_FORMAT;
@@ -28,10 +27,9 @@ import static org.apache.qpid.server.log
public class MessageStoreLogSubject extends AbstractLogSubject
{
- /** Create an ExchangeLogSubject that Logs in the following format. */
- public MessageStoreLogSubject(VirtualHost vhost, MessageStore store)
+ /** Create an MessageStoreLogSubject that Logs in the following format. */
+ public MessageStoreLogSubject(VirtualHost vhost, String messageStoreName)
{
- setLogStringWithFormat(STORE_FORMAT, vhost.getName(),
- store.getClass().getSimpleName());
+ setLogStringWithFormat(STORE_FORMAT, vhost.getName(), messageStoreName);
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java Fri Mar 30 13:44:25 2012
@@ -20,7 +20,10 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
@@ -29,6 +32,8 @@ import java.util.concurrent.ConcurrentMa
public class DefaultQueueRegistry implements QueueRegistry
{
+ private static final Logger LOGGER = Logger.getLogger(DefaultExchangeRegistry.class);
+
private ConcurrentMap<AMQShortString, AMQQueue> _queueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>();
private final VirtualHost _virtualHost;
@@ -72,4 +77,22 @@ public class DefaultQueueRegistry implem
{
return getQueue(new AMQShortString(queue));
}
+
+ @Override
+ public void stopAllAndUnregisterMBeans()
+ {
+ for (final AMQQueue queue : getQueues())
+ {
+ queue.stop();
+ try
+ {
+ queue.getManagedObject().unregister();
+ }
+ catch (AMQException e)
+ {
+ LOGGER.warn("Failed to unregister mbean", e);
+ }
+ }
+ _queueMap.clear();
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java Fri Mar 30 13:44:25 2012
@@ -40,4 +40,6 @@ public interface QueueRegistry
Collection<AMQQueue> getQueues();
AMQQueue getQueue(String queue);
+
+ void stopAllAndUnregisterMBeans();
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java Fri Mar 30 13:44:25 2012
@@ -28,7 +28,6 @@ import org.apache.qpid.framing.FieldTabl
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.federation.Bridge;
import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.queue.AMQQueue;
public interface DurableConfigurationStore
@@ -46,13 +45,11 @@ public interface DurableConfigurationSto
* @param name The name to be used by this store
* @param recoveryHandler Handler to be called as the store recovers on start up
* @param config The apache commons configuration object.
- *
* @throws Exception If any error occurs that means the store is unable to configure itself.
*/
void configureConfigStore(String name,
ConfigurationRecoveryHandler recoveryHandler,
- Configuration config,
- LogSubject logSubject) throws Exception;
+ Configuration config) throws Exception;
/**
* Makes the specified exchange persistent.
*
Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java (from r1307317, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java&r1=1307317&r2=1307416&rev=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java Fri Mar 30 13:44:25 2012
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,26 +17,14 @@
* under the License.
*
*/
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import java.util.Collection;
+package org.apache.qpid.server.store;
-public interface QueueRegistry
+public enum Event
{
- VirtualHost getVirtualHost();
-
- void registerQueue(AMQQueue queue);
-
- void unregisterQueue(AMQShortString name);
-
- AMQQueue getQueue(AMQShortString name);
-
- Collection<AMQShortString> getQueueNames();
-
- Collection<AMQQueue> getQueues();
-
- AMQQueue getQueue(String queue);
+ BEFORE_ACTIVATE,
+ AFTER_ACTIVATE,
+ BEFORE_PASSIVATE,
+ AFTER_PASSIVATE,
+ BEFORE_CLOSE,
+ AFTER_CLOSE
}
Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventListener.java (from r1307317, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventListener.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventListener.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java&r1=1307317&r2=1307416&rev=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventListener.java Fri Mar 30 13:44:25 2012
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,26 +17,9 @@
* under the License.
*
*/
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import java.util.Collection;
+package org.apache.qpid.server.store;
-public interface QueueRegistry
+public interface EventListener
{
- VirtualHost getVirtualHost();
-
- void registerQueue(AMQQueue queue);
-
- void unregisterQueue(AMQShortString name);
-
- AMQQueue getQueue(AMQShortString name);
-
- Collection<AMQShortString> getQueueNames();
-
- Collection<AMQQueue> getQueues();
-
- AMQQueue getQueue(String queue);
+ public void event(Event event);
}
Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java (from r1307317, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java&r1=1307317&r2=1307416&rev=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java Fri Mar 30 13:44:25 2012
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,24 +19,30 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
-import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
-public abstract class AbstractMessageStore implements MessageStore
+public class EventManager
{
- private LogSubject _logSubject;
+ private ConcurrentMap<Event, List<EventListener>> _listeners = new ConcurrentHashMap<Event, List<EventListener>>();
- public void configure(VirtualHost virtualHost) throws Exception
+ public void addEventListener(EventListener listener, Event event)
{
- _logSubject = new MessageStoreLogSubject(virtualHost, this);
- CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
+ _listeners.putIfAbsent(event, new ArrayList<EventListener>());
+ final List<EventListener> list = _listeners.get(event);
+ list.add(listener);
}
- public void close() throws Exception
+ public void notifyEvent(Event event)
{
- CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
+ if (_listeners.containsKey(event))
+ {
+ for (EventListener listener : _listeners.get(event))
+ {
+ listener.event(event);
+ }
+ }
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Fri Mar 30 13:44:25 2012
@@ -20,103 +20,58 @@
*/
package org.apache.qpid.server.store;
-import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.federation.Bridge;
-import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.message.EnqueableMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-/** A simple message store that stores the messages in a threadsafe structure in memory. */
-public class MemoryMessageStore implements MessageStore
+/** A simple message store that stores the messages in a thread-safe structure in memory. */
+public class MemoryMessageStore extends NullMessageStore
{
- private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
-
- private static final int DEFAULT_HASHTABLE_CAPACITY = 50000;
-
- private static final String HASHTABLE_CAPACITY_CONFIG = "hashtable-capacity";
-
-
private final AtomicLong _messageId = new AtomicLong(1);
- private AtomicBoolean _closed = new AtomicBoolean(false);
- private LogSubject _logSubject;
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
private static final Transaction IN_MEMORY_TRANSACTION = new Transaction()
{
- public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
+ @Override
+ public StoreFuture commitTranAsync() throws AMQStoreException
{
+ return StoreFuture.IMMEDIATE_FUTURE;
}
- public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
+ @Override
+ public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
}
- public void commitTran() throws AMQStoreException
+ @Override
+ public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
}
- public StoreFuture commitTranAsync() throws AMQStoreException
+ @Override
+ public void commitTran() throws AMQStoreException
{
- return StoreFuture.IMMEDIATE_FUTURE;
}
+ @Override
public void abortTran() throws AMQStoreException
{
}
+ @Override
public void removeXid(long format, byte[] globalId, byte[] branchId)
{
}
+ @Override
public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
{
}
-
};
- public void configureConfigStore(String name, ConfigurationRecoveryHandler handler, Configuration configuration, LogSubject logSubject) throws Exception
- {
- _logSubject = logSubject;
- CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED(this.getClass().getName()));
-
-
- }
-
- public void configureMessageStore(String name,
- MessageStoreRecoveryHandler recoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler,
- Configuration config, LogSubject logSubject) throws Exception
- {
- if(_logSubject == null)
- {
- _logSubject = logSubject;
- }
- int hashtableCapacity = config.getInt(name + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
- _log.info("Using capacity " + hashtableCapacity + " for hash tables");
- CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
- }
-
- public void close() throws Exception
- {
- _closed.getAndSet(true);
- CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
-
- }
-
+ @Override
public StoredMessage addMessage(StorableMessageMetaData metaData)
{
final long id = _messageId.getAndIncrement();
@@ -125,96 +80,21 @@ public class MemoryMessageStore implemen
return message;
}
-
- public void createExchange(Exchange exchange) throws AMQStoreException
- {
-
- }
-
- public void removeExchange(Exchange exchange) throws AMQStoreException
- {
-
- }
-
- public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
- {
-
- }
-
- public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
- {
-
- }
-
-
- public void createQueue(AMQQueue queue) throws AMQStoreException
- {
- // Not requred to do anything
- }
-
- public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
- {
- // Not required to do anything
- }
-
- public void removeQueue(final AMQQueue queue) throws AMQStoreException
- {
- // Not required to do anything
- }
-
- public void updateQueue(final AMQQueue queue) throws AMQStoreException
- {
- // Not required to do anything
- }
-
- public void createBrokerLink(final BrokerLink link) throws AMQStoreException
- {
-
- }
-
- public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
- {
-
- }
-
- public void createBridge(final Bridge bridge) throws AMQStoreException
- {
-
- }
-
- public void deleteBridge(final Bridge bridge) throws AMQStoreException
- {
-
- }
-
+ @Override
public Transaction newTransaction()
{
return IN_MEMORY_TRANSACTION;
}
-
- public List<AMQQueue> createQueues() throws AMQException
- {
- return null;
- }
-
- public Long getNewMessageId()
- {
- return _messageId.getAndIncrement();
- }
-
+ @Override
public boolean isPersistent()
{
return false;
}
- private void checkNotClosed() throws MessageStoreClosedException
- {
- if (_closed.get())
- {
- throw new MessageStoreClosedException();
- }
+ @Override
+ public void close() throws Exception
+ {
+ _closed.getAndSet(true);
}
-
-
}
Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java (from r1307317, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java&r1=1307317&r2=1307416&rev=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java Fri Mar 30 13:44:25 2012
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,23 +20,22 @@
package org.apache.qpid.server.store;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
-import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.decorators.EventDecorator;
+import org.apache.qpid.server.store.decorators.OperationalLoggingDecorator;
-public abstract class AbstractMessageStore implements MessageStore
+public class MemoryMessageStoreFactory implements MessageStoreFactory
{
- private LogSubject _logSubject;
- public void configure(VirtualHost virtualHost) throws Exception
+ @Override
+ public MessageStore createMessageStore(LogSubject logSubject)
{
- _logSubject = new MessageStoreLogSubject(virtualHost, this);
- CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
+ return new OperationalLoggingDecorator(new EventDecorator(new MemoryMessageStore()), logSubject);
}
- public void close() throws Exception
+ @Override
+ public String getStoreClassName()
{
- CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
+ return MemoryMessageStore.class.getSimpleName();
}
+
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Fri Mar 30 13:44:25 2012
@@ -21,7 +21,6 @@
package org.apache.qpid.server.store;
import org.apache.commons.configuration.Configuration;
-import org.apache.qpid.server.logging.LogSubject;
/**
* MessageStore defines the interface to a storage area, which can be used to preserve the state of messages.
@@ -42,9 +41,9 @@ public interface MessageStore extends Du
void configureMessageStore(String name,
MessageStoreRecoveryHandler messageRecoveryHandler,
TransactionLogRecoveryHandler tlogRecoveryHandler,
- Configuration config, LogSubject logSubject) throws Exception;
-
+ Configuration config) throws Exception;
+ void activate() throws Exception;
public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData);
@@ -65,4 +64,8 @@ public interface MessageStore extends Du
*/
void close() throws Exception;
+ void addEventListener(EventListener eventListener, Event event);
+
+ MessageStore getUnderlyingStore();
+
}
Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java (from r1307317, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java&r1=1307317&r2=1307416&rev=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java Fri Mar 30 13:44:25 2012
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,26 +17,11 @@
* under the License.
*
*/
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+package org.apache.qpid.server.store;
-import java.util.Collection;
-
-public interface QueueRegistry
+public class MessageStoreConstants
{
- VirtualHost getVirtualHost();
-
- void registerQueue(AMQQueue queue);
-
- void unregisterQueue(AMQShortString name);
-
- AMQQueue getQueue(AMQShortString name);
-
- Collection<AMQShortString> getQueueNames();
- Collection<AMQQueue> getQueues();
+ public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
- AMQQueue getQueue(String queue);
}
Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java (from r1307317, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java&r1=1307317&r2=1307416&rev=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java Fri Mar 30 13:44:25 2012
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,26 +17,13 @@
* under the License.
*
*/
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+package org.apache.qpid.server.store;
-import java.util.Collection;
+import org.apache.qpid.server.logging.LogSubject;
-public interface QueueRegistry
+public interface MessageStoreFactory
{
- VirtualHost getVirtualHost();
-
- void registerQueue(AMQQueue queue);
-
- void unregisterQueue(AMQShortString name);
-
- AMQQueue getQueue(AMQShortString name);
-
- Collection<AMQShortString> getQueueNames();
-
- Collection<AMQQueue> getQueues();
+ MessageStore createMessageStore(LogSubject logSubject);
- AMQQueue getQueue(String queue);
+ String getStoreClassName();
}
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java?rev=1307416&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java Fri Mar 30 13:44:25 2012
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.federation.Bridge;
+import org.apache.qpid.server.federation.BrokerLink;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public class NullMessageStore implements MessageStore
+{
+ @Override
+ public void configureConfigStore(String name,
+ ConfigurationRecoveryHandler recoveryHandler,
+ Configuration config) throws Exception
+ {
+ }
+
+ @Override
+ public void createExchange(Exchange exchange) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void removeExchange(Exchange exchange) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void createQueue(AMQQueue queue) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void removeQueue(AMQQueue queue) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void updateQueue(AMQQueue queue) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void createBrokerLink(final BrokerLink link) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void createBridge(final Bridge bridge) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void deleteBridge(final Bridge bridge) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void configureMessageStore(String name,
+ MessageStoreRecoveryHandler recoveryHandler,
+ TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception
+ {
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ }
+
+ @Override
+ public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
+ {
+ return null;
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return false;
+ }
+
+ @Override
+ public Transaction newTransaction()
+ {
+ return null;
+ }
+
+ @Override
+ public void activate() throws Exception
+ {
+ }
+
+ @Override
+ public void addEventListener(EventListener eventListener, Event event)
+ {
+ }
+
+ @Override
+ public MessageStore getUnderlyingStore()
+ {
+ return this;
+ }
+}
\ No newline at end of file
Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java (from r1307317, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java&r1=1307317&r2=1307416&rev=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java Fri Mar 30 13:44:25 2012
@@ -18,26 +18,14 @@
* under the License.
*
*/
-package org.apache.qpid.server.queue;
+package org.apache.qpid.server.store;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import java.util.Collection;
-
-public interface QueueRegistry
+public enum State
{
- VirtualHost getVirtualHost();
-
- void registerQueue(AMQQueue queue);
-
- void unregisterQueue(AMQShortString name);
-
- AMQQueue getQueue(AMQShortString name);
-
- Collection<AMQShortString> getQueueNames();
-
- Collection<AMQQueue> getQueues();
-
- AMQQueue getQueue(String queue);
-}
+ INITIAL,
+ CONFIGURING,
+ RECOVERING,
+ ACTIVE,
+ CLOSING,
+ CLOSED
+}
\ No newline at end of file
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java?rev=1307416&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java Fri Mar 30 13:44:25 2012
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+
+public class StateManager
+{
+ private State _state = State.INITIAL;
+
+ public synchronized State getState()
+ {
+ return _state;
+ }
+
+ public synchronized void stateTransition(final State current, final State desired)
+ {
+ if (_state != current)
+ {
+ throw new IllegalStateException("Cannot transition to the state: " + desired + "; need to be in state: " + current
+ + "; currently in state: " + _state);
+ }
+ _state = desired;
+ }
+
+ public synchronized boolean isInState(State testedState)
+ {
+ return _state.equals(testedState);
+ }
+
+ public synchronized boolean isNotInState(State testedState)
+ {
+ return !isInState(testedState);
+ }
+
+ public synchronized void checkInState(State checkedState)
+ {
+ if (isNotInState(checkedState))
+ {
+ throw new IllegalStateException("Unexpected state. Was : " + _state + " but expected : " + checkedState);
+ }
+ }
+}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java?rev=1307416&r1=1307415&r2=1307416&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java Fri Mar 30 13:44:25 2012
@@ -31,11 +31,10 @@ public interface StoreFuture
public void waitForCompletion()
{
-
}
};
boolean isComplete();
void waitForCompletion();
-}
\ No newline at end of file
+}
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/AbstractDecorator.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/AbstractDecorator.java?rev=1307416&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/AbstractDecorator.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/AbstractDecorator.java Fri Mar 30 13:44:25 2012
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.decorators;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.federation.Bridge;
+import org.apache.qpid.server.federation.BrokerLink;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.Event;
+import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
+
+/**
+ * AbstractDecorator. All methods <bMUST</b> perform simple
+ * delegation to their equivalent decorated counterpart without
+ * change.
+ */
+public class AbstractDecorator implements MessageStore
+{
+ protected final MessageStore _decoratedStore;
+
+ public AbstractDecorator(MessageStore store)
+ {
+ _decoratedStore = store;
+ }
+
+ @Override
+ public void configureMessageStore(String name,
+ MessageStoreRecoveryHandler messageRecoveryHandler,
+ TransactionLogRecoveryHandler tlogRecoveryHandler,
+ Configuration config) throws Exception
+ {
+ _decoratedStore.configureMessageStore(name, messageRecoveryHandler,
+ tlogRecoveryHandler, config);
+ }
+
+ @Override
+ public void configureConfigStore(String name,
+ ConfigurationRecoveryHandler recoveryHandler, Configuration config) throws Exception
+ {
+ _decoratedStore.configureConfigStore(name, recoveryHandler, config);
+ }
+
+ @Override
+ public void activate() throws Exception
+ {
+ _decoratedStore.activate();
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ _decoratedStore.close();
+ }
+
+ @Override
+ public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(
+ T metaData)
+ {
+ return _decoratedStore.addMessage(metaData);
+ }
+
+ @Override
+ public void createExchange(Exchange exchange) throws AMQStoreException
+ {
+ _decoratedStore.createExchange(exchange);
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return _decoratedStore.isPersistent();
+ }
+
+ @Override
+ public Transaction newTransaction()
+ {
+ return _decoratedStore.newTransaction();
+ }
+
+ @Override
+ public void removeExchange(Exchange exchange) throws AMQStoreException
+ {
+ _decoratedStore.removeExchange(exchange);
+ }
+
+ @Override
+ public void addEventListener(EventListener eventListener, Event event)
+ {
+ _decoratedStore.addEventListener(eventListener, event);
+ }
+
+ @Override
+ public void bindQueue(Exchange exchange, AMQShortString routingKey,
+ AMQQueue queue, FieldTable args) throws AMQStoreException
+ {
+ _decoratedStore.bindQueue(exchange, routingKey, queue, args);
+ }
+
+ @Override
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey,
+ AMQQueue queue, FieldTable args) throws AMQStoreException
+ {
+ _decoratedStore.unbindQueue(exchange, routingKey, queue, args);
+ }
+
+ @Override
+ public void createQueue(AMQQueue queue) throws AMQStoreException
+ {
+ _decoratedStore.createQueue(queue);
+ }
+
+ @Override
+ public void createQueue(AMQQueue queue, FieldTable arguments)
+ throws AMQStoreException
+ {
+ _decoratedStore.createQueue(queue, arguments);
+ }
+
+ @Override
+ public void removeQueue(AMQQueue queue) throws AMQStoreException
+ {
+ _decoratedStore.removeQueue(queue);
+ }
+
+ @Override
+ public void updateQueue(AMQQueue queue) throws AMQStoreException
+ {
+ _decoratedStore.updateQueue(queue);
+ }
+
+ @Override
+ public void createBrokerLink(BrokerLink link) throws AMQStoreException
+ {
+ _decoratedStore.createBrokerLink(link);
+ }
+
+ @Override
+ public void deleteBrokerLink(BrokerLink link) throws AMQStoreException
+ {
+ _decoratedStore.deleteBrokerLink(link);
+ }
+
+ @Override
+ public void createBridge(Bridge bridge) throws AMQStoreException
+ {
+ _decoratedStore.createBridge(bridge);
+ }
+
+ @Override
+ public void deleteBridge(Bridge bridge) throws AMQStoreException
+ {
+ _decoratedStore.deleteBridge(bridge);
+ }
+
+ @Override
+ public MessageStore getUnderlyingStore()
+ {
+ return _decoratedStore.getUnderlyingStore();
+ }
+}
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/EventDecorator.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/EventDecorator.java?rev=1307416&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/EventDecorator.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/EventDecorator.java Fri Mar 30 13:44:25 2012
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.decorators;
+
+import org.apache.qpid.server.store.Event;
+import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.EventManager;
+import org.apache.qpid.server.store.MessageStore;
+
+public class EventDecorator extends AbstractDecorator
+{
+ protected final EventManager _eventManager;
+
+ public EventDecorator(MessageStore store)
+ {
+ super(store);
+ _eventManager = new EventManager();
+ }
+
+ @Override
+ public void activate() throws Exception
+ {
+ _eventManager.notifyEvent(Event.BEFORE_ACTIVATE);
+ _decoratedStore.activate();
+ _eventManager.notifyEvent(Event.AFTER_ACTIVATE);
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ _eventManager.notifyEvent(Event.BEFORE_CLOSE);
+ _decoratedStore.close();
+ _eventManager.notifyEvent(Event.AFTER_CLOSE);
+ }
+
+ @Override
+ public void addEventListener(EventListener eventListener, Event event)
+ {
+ _eventManager.addEventListener(eventListener, event);
+ }
+}
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecorator.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecorator.java?rev=1307416&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecorator.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/decorators/OperationalLoggingDecorator.java Fri Mar 30 13:44:25 2012
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.decorators;
+
+import static org.apache.qpid.server.store.MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
+
+public class OperationalLoggingDecorator extends AbstractDecorator
+{
+ protected final LogSubject _logSubject;
+
+ public OperationalLoggingDecorator(final MessageStore decoratedStore, LogSubject logSubject)
+ {
+ super(decoratedStore);
+ _logSubject = logSubject;
+ }
+
+ @Override
+ public void configureMessageStore(String name,
+ MessageStoreRecoveryHandler messageRecoveryHandler,
+ TransactionLogRecoveryHandler tlogRecoveryHandler,
+ Configuration config) throws Exception
+ {
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED());
+ CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED());
+
+ if (config != null && config.getString(ENVIRONMENT_PATH_PROPERTY) != null)
+ {
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.STORE_LOCATION(config.getString(ENVIRONMENT_PATH_PROPERTY)));
+ }
+
+ _decoratedStore.configureMessageStore(name, messageRecoveryHandler,
+ tlogRecoveryHandler, config);
+ }
+
+ @Override
+ public void configureConfigStore(String name,
+ ConfigurationRecoveryHandler recoveryHandler, Configuration config) throws Exception
+ {
+ CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED());
+
+ _decoratedStore.configureConfigStore(name, recoveryHandler, config);
+ }
+
+ @Override
+ public void activate() throws Exception
+ {
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.RECOVERY_START());
+ _decoratedStore.activate();
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.RECOVERY_COMPLETE());
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
+ _decoratedStore.close();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org