You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/24 16:41:57 UTC
svn commit: r499446 [2/4] - in /incubator/qpid/trunk/qpid/java: broker/etc/
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/configuration/
broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main/ja...
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Wed Jan 24 07:41:48 2007
@@ -34,6 +34,7 @@
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
{
@@ -50,15 +51,19 @@
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<QueueBindBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueBindBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+
final QueueBindBody body = evt.getMethod();
final AMQQueue queue;
if (body.queue == null)
{
- queue = protocolSession.getChannel(evt.getChannelId()).getDefaultQueue();
+ queue = session.getChannel(evt.getChannelId()).getDefaultQueue();
if (queue == null)
{
throw new AMQException("No default queue defined on channel and queue was null");
@@ -94,7 +99,7 @@
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
- protocolSession.writeFrame(response);
+ session.writeFrame(response);
}
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Wed Jan 24 07:41:48 2007
@@ -22,7 +22,6 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQChannelException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.configuration.Configured;
@@ -37,7 +36,7 @@
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.text.MessageFormat;
import java.util.concurrent.atomic.AtomicInteger;
@@ -58,18 +57,21 @@
private final AtomicInteger _counter = new AtomicInteger();
- private final MessageStore _store;
+
protected QueueDeclareHandler()
{
Configurator.configure(this);
- _store = ApplicationRegistry.getInstance().getMessageStore();
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ MessageStore store = virtualHost.getMessageStore();
+
QueueDeclareBody body = evt.getMethod();
// if we aren't given a queue name, we create one which we return to the client
@@ -94,10 +96,10 @@
}
else
{
- queue = createQueue(body, queueRegistry, protocolSession);
+ queue = createQueue(body, virtualHost, session);
if (queue.isDurable() && !queue.isAutoDelete())
{
- _store.createQueue(queue);
+ store.createQueue(queue);
}
queueRegistry.registerQueue(queue);
if (autoRegister)
@@ -109,14 +111,14 @@
}
}
}
- else if(queue.getOwner() != null && !protocolSession.getContextKey().equals(queue.getOwner()))
+ else if(queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner()))
{
// todo - constant
throw body.getChannelException(405, "Cannot declare queue, as exclusive queue with same name declared on another connection");
}
//set this as the default queue on the channel:
- protocolSession.getChannel(evt.getChannelId()).setDefaultQueue(queue);
+ session.getChannel(evt.getChannelId()).setDefaultQueue(queue);
}
if (!body.nowait)
@@ -130,7 +132,7 @@
queue.getMessageCount(), // messageCount
body.queue); // queue
_log.info("Queue " + body.queue + " declared successfully");
- protocolSession.writeFrame(response);
+ session.writeFrame(response);
}
}
@@ -144,10 +146,43 @@
return MessageFormat.format("{0,number,0000000000000}", value);
}
- protected AMQQueue createQueue(QueueDeclareBody body, QueueRegistry registry, AMQProtocolSession session)
+ protected AMQQueue createQueue(QueueDeclareBody body, VirtualHost virtualHost, final AMQProtocolSession session)
throws AMQException
{
+ final QueueRegistry registry = virtualHost.getQueueRegistry();
AMQShortString owner = body.exclusive ? session.getContextKey() : null;
- return new AMQQueue(body.queue, body.durable, owner, body.autoDelete || (!body.durable && body.exclusive), registry);
+ final AMQQueue queue = new AMQQueue(body.queue, body.durable, owner, body.autoDelete, virtualHost);
+ final AMQShortString queueName = queue.getName();
+
+ if(body.exclusive && !body.durable)
+ {
+ final AMQProtocolSession.Task deleteQueueTask =
+ new AMQProtocolSession.Task()
+ {
+
+ public void doTask(AMQProtocolSession session) throws AMQException
+ {
+ if(registry.getQueue(queueName) == queue)
+ {
+ queue.delete();
+ }
+
+ }
+ };
+
+ session.addSessionCloseTask(deleteQueueTask);
+
+ queue.addQueueDeleteTask(new AMQQueue.Task()
+ {
+ public void doTask(AMQQueue queue)
+ {
+ session.removeSessionCloseTask(deleteQueueTask);
+ }
+ });
+
+
+ }
+
+ return queue;
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Wed Jan 24 07:41:48 2007
@@ -24,18 +24,14 @@
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.framing.QueueDeleteBody;
import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQChannelException;
-import org.apache.qpid.protocol.AMQConstant;
public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody>
{
@@ -47,7 +43,6 @@
}
private final boolean _failIfNotFound;
- private final MessageStore _store;
public QueueDeleteHandler()
{
@@ -57,12 +52,16 @@
public QueueDeleteHandler(boolean failIfNotFound)
{
_failIfNotFound = failIfNotFound;
- _store = ApplicationRegistry.getInstance().getMessageStore();
}
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ MessageStore store = virtualHost.getMessageStore();
+
QueueDeleteBody body = evt.getMethod();
AMQQueue queue;
if(body.queue == null)
@@ -71,7 +70,7 @@
}
else
{
- queue = queues.getQueue(body.queue);
+ queue = queueRegistry.getQueue(body.queue);
}
if(queue == null)
@@ -96,7 +95,7 @@
else
{
int purged = queue.delete(body.ifUnused, body.ifEmpty);
- _store.removeQueue(queue.getName());
+ store.removeQueue(queue.getName());
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java Wed Jan 24 07:41:48 2007
@@ -9,6 +9,7 @@
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
@@ -34,8 +35,12 @@
_failIfNotFound = failIfNotFound;
}
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueuePurgeBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueuePurgeBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
QueuePurgeBody body = evt.getMethod();
AMQQueue queue;
if(body.queue == null)
@@ -52,7 +57,7 @@
}
else
{
- queue = queues.getQueue(body.queue);
+ queue = queueRegistry.getQueue(body.queue);
}
if(queue == null)
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java Wed Jan 24 07:41:48 2007
@@ -24,6 +24,7 @@
import org.apache.qpid.framing.TxCommitBody;
import org.apache.qpid.framing.TxCommitOkBody;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -47,10 +48,9 @@
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<TxCommitBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxCommitBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
try
{
@@ -58,13 +58,13 @@
{
_log.debug("Commit received on channel " + evt.getChannelId());
}
- AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+ AMQChannel channel = session.getChannel(evt.getChannelId());
channel.commit();
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
- channel.processReturns(protocolSession);
+ session.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
+ channel.processReturns(session);
}
catch(AMQException e)
{
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Wed Jan 24 07:41:48 2007
@@ -30,6 +30,7 @@
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBody>
{
@@ -44,20 +45,20 @@
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<TxRollbackBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxRollbackBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
try{
- AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+ AMQChannel channel = session.getChannel(evt.getChannelId());
channel.rollback();
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
+ session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
//Now resend all the unacknowledged messages back to the original subscribers.
//(Must be done after the TxnRollback-ok response).
- channel.resend(protocolSession, false);
+ channel.resend(session, false);
}catch(AMQException e){
throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage());
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java Wed Jan 24 07:41:48 2007
@@ -29,6 +29,7 @@
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody>
{
@@ -43,14 +44,14 @@
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<TxSelectBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxSelectBody> evt) throws AMQException
{
- protocolSession.getChannel(evt.getChannelId()).setLocalTransactional();
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+ session.getChannel(evt.getChannelId()).setLocalTransactional();
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
+ session.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java Wed Jan 24 07:41:48 2007
@@ -67,7 +67,7 @@
{
try
{
- ApplicationRegistry.getInstance().getManagedObjectRegistry().registerObject(this);
+ getManagedObjectRegistry().registerObject(this);
}
catch (JMException e)
{
@@ -75,11 +75,16 @@
}
}
+ protected ManagedObjectRegistry getManagedObjectRegistry()
+ {
+ return ApplicationRegistry.getInstance().getManagedObjectRegistry();
+ }
+
public void unregister() throws AMQException
{
try
{
- ApplicationRegistry.getInstance().getManagedObjectRegistry().unregisterObject(this);
+ getManagedObjectRegistry().unregisterObject(this);
}
catch (JMException e)
{
@@ -91,6 +96,7 @@
{
return getObjectInstanceName() + "[" + getType() + "]";
}
+
/**
* Created the ObjectName as per the JMX Specs
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Wed Jan 24 07:41:48 2007
@@ -35,10 +35,10 @@
import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.state.AMQStateManager;
@@ -51,6 +51,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CopyOnWriteArrayList;
public class AMQMinaProtocolSession implements AMQProtocolSession,
ProtocolVersionList,
@@ -65,16 +66,14 @@
private AMQShortString _contextKey;
+ private VirtualHost _virtualHost;
+
private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
private final AMQStateManager _stateManager;
- private final QueueRegistry _queueRegistry;
-
- private final ExchangeRegistry _exchangeRegistry;
-
private AMQCodecFactory _codecFactory;
private AMQProtocolSessionMBean _managedObject;
@@ -93,6 +92,8 @@
private byte _major;
private byte _minor;
private FieldTable _clientProperties;
+ private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+
public ManagedObject getManagedObject()
{
@@ -100,23 +101,23 @@
}
- public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
+ public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry,
AMQCodecFactory codecFactory)
throws AMQException
{
- _stateManager = new AMQStateManager(queueRegistry, exchangeRegistry, this);
+ _stateManager = new AMQStateManager(virtualHostRegistry, this);
_minaProtocolSession = session;
session.setAttachment(this);
- _queueRegistry = queueRegistry;
- _exchangeRegistry = exchangeRegistry;
+
+
_codecFactory = codecFactory;
_managedObject = createMBean();
_managedObject.register();
// this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
}
- public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
+ public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry,
AMQCodecFactory codecFactory, AMQStateManager stateManager)
throws AMQException
{
@@ -124,8 +125,7 @@
_minaProtocolSession = session;
session.setAttachment(this);
- _queueRegistry = queueRegistry;
- _exchangeRegistry = exchangeRegistry;
+
_codecFactory = codecFactory;
_managedObject = createMBean();
_managedObject.register();
@@ -461,6 +461,10 @@
{
_managedObject.unregister();
}
+ for(Task task : _taskList)
+ {
+ task.doTask(this);
+ }
}
}
@@ -556,4 +560,27 @@
{
return _minaProtocolSession.getRemoteAddress();
}
+
+
+ public VirtualHost getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ public void setVirtualHost(VirtualHost virtualHost)
+ {
+ _virtualHost = virtualHost;
+ }
+
+ public void addSessionCloseTask(Task task)
+ {
+ _taskList.add(task);
+ }
+
+ public void removeSessionCloseTask(Task task)
+ {
+ _taskList.remove(task);
+ }
+
+
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Wed Jan 24 07:41:48 2007
@@ -53,41 +53,26 @@
{
private static final Logger _logger = Logger.getLogger(AMQPFastProtocolHandler.class);
- /**
- * The registry of all queues. This is passed to frame listeners when frame
- * events occur.
- */
- private final QueueRegistry _queueRegistry;
+ private final IApplicationRegistry _applicationRegistry;
- /**
- * The registry of all exchanges. This is passed to frame listeners when frame
- * events occur.
- */
- private final ExchangeRegistry _exchangeRegistry;
private boolean _useSSL;
public AMQPFastProtocolHandler(Integer applicationRegistryInstance)
{
- IApplicationRegistry registry = ApplicationRegistry.getInstance(applicationRegistryInstance);
-
- _queueRegistry = registry.getQueueRegistry();
- _exchangeRegistry = registry.getExchangeRegistry();
- _logger.debug("AMQPFastProtocolHandler created");
+ this(ApplicationRegistry.getInstance(applicationRegistryInstance));
}
- public AMQPFastProtocolHandler(QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry)
+ public AMQPFastProtocolHandler(IApplicationRegistry applicationRegistry)
{
- _queueRegistry = queueRegistry;
- _exchangeRegistry = exchangeRegistry;
+ _applicationRegistry = applicationRegistry;
_logger.debug("AMQPFastProtocolHandler created");
}
protected AMQPFastProtocolHandler(AMQPFastProtocolHandler handler)
{
- this(handler._queueRegistry, handler._exchangeRegistry);
+ this(handler._applicationRegistry);
}
public void sessionCreated(IoSession protocolSession) throws Exception
@@ -95,7 +80,7 @@
SessionUtil.initialize(protocolSession);
final AMQCodecFactory codecFactory = new AMQCodecFactory(true);
- createSession(protocolSession, _queueRegistry, _exchangeRegistry, codecFactory);
+ createSession(protocolSession, _applicationRegistry, codecFactory);
_logger.info("Protocol session created");
final ProtocolCodecFilter pcf = new ProtocolCodecFilter(codecFactory);
@@ -120,9 +105,9 @@
/**
* Separated into its own, protected, method to allow easier reuse
*/
- protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQCodecFactory codec) throws AMQException
+ protected void createSession(IoSession session, IApplicationRegistry applicationRegistry, AMQCodecFactory codec) throws AMQException
{
- new AMQMinaProtocolSession(session, queues, exchanges, codec);
+ new AMQMinaProtocolSession(session, applicationRegistry.getVirtualHostRegistry(), codec);
}
public void sessionOpened(IoSession protocolSession) throws Exception
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java Wed Jan 24 07:41:48 2007
@@ -42,8 +42,7 @@
public AMQPProtocolProvider()
{
IApplicationRegistry registry = ApplicationRegistry.getInstance();
- _handler = new AMQPFastProtocolHandler(registry.getQueueRegistry(),
- registry.getExchangeRegistry());
+ _handler = new AMQPFastProtocolHandler(registry);
}
public AMQPFastProtocolHandler getHandler()
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Wed Jan 24 07:41:48 2007
@@ -25,6 +25,7 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.protocol.AMQProtocolWriter;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.AMQException;
import javax.security.sasl.SaslServer;
@@ -32,6 +33,13 @@
public interface AMQProtocolSession extends AMQProtocolWriter
{
+
+
+ public static interface Task
+ {
+ public void doTask(AMQProtocolSession session) throws AMQException;
+ }
+
/**
* Called when a protocol data block is received
* @param message the data block that has been received
@@ -126,4 +134,13 @@
void setClientProperties(FieldTable clientProperties);
Object getClientIdentifier();
+
+ VirtualHost getVirtualHost();
+
+ void setVirtualHost(VirtualHost virtualHost);
+
+ void addSessionCloseTask(Task task);
+
+ void removeSessionCloseTask(Task task);
+
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Wed Jan 24 07:41:48 2007
@@ -26,6 +26,7 @@
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.management.ManagedObject;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -91,6 +92,11 @@
public String getRemoteAddress()
{
return _session.getIOSession().getRemoteAddress().toString();
+ }
+
+ public ManagedObject getParentObject()
+ {
+ return _session.getVirtualHost().getManagedObject();
}
public Long getWrittenBytes()
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Wed Jan 24 07:41:48 2007
@@ -30,11 +30,14 @@
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import javax.management.JMException;
import java.text.MessageFormat;
import java.util.List;
+import java.util.ArrayList;
import java.util.concurrent.Executor;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,6 +48,7 @@
public class AMQQueue implements Managable, Comparable
{
+
public static final class ExistingExclusiveSubscription extends AMQException
{
@@ -95,6 +99,12 @@
private final AtomicBoolean _isExclusive = new AtomicBoolean();
+ private final AtomicBoolean _deleted = new AtomicBoolean(false);
+
+
+
+ private List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
+
/**
* Manages message delivery.
@@ -102,11 +112,6 @@
private final DeliveryManager _deliveryMgr;
/**
- * The queue registry with which this queue is registered.
- */
- private final QueueRegistry _queueRegistry;
-
- /**
* Used to track bindings to exchanges so that on deletion they can easily
* be cancelled.
*/
@@ -119,6 +124,9 @@
private final AMQQueueMBean _managedObject;
+ private final VirtualHost _virtualHost;
+
+
/**
* max allowed size(KB) of a single message
*/
@@ -145,59 +153,26 @@
}
public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
- boolean autoDelete, QueueRegistry queueRegistry)
- throws AMQException
- {
- this(name, durable, owner, autoDelete, queueRegistry,
- AsyncDeliveryConfig.getAsyncDeliveryExecutor(), new SubscriptionImpl.Factory());
- }
-
- public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
- boolean autoDelete, QueueRegistry queueRegistry, SubscriptionFactory subscriptionFactory)
+ boolean autoDelete, VirtualHost virtualHost)
throws AMQException
{
- this(name, durable, owner, autoDelete, queueRegistry,
- AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscriptionFactory);
+ this(name, durable, owner, autoDelete, virtualHost,
+ AsyncDeliveryConfig.getAsyncDeliveryExecutor(), new SubscriptionSet(), new SubscriptionImpl.Factory());
}
- public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
- boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery,
- SubscriptionFactory subscriptionFactory)
- throws AMQException
- {
-
- this(name, durable, owner, autoDelete, queueRegistry, asyncDelivery, new SubscriptionSet(), subscriptionFactory);
- }
- public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
- boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery)
- throws AMQException
- {
-
- this(name, durable, owner, autoDelete, queueRegistry, asyncDelivery, new SubscriptionSet(),
- new SubscriptionImpl.Factory());
- }
-
- protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
- boolean autoDelete, QueueRegistry queueRegistry,
- SubscriptionSet subscribers, SubscriptionFactory subscriptionFactory)
- throws AMQException
- {
- this(name, durable, owner, autoDelete, queueRegistry,
- AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers, subscriptionFactory);
- }
protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
- boolean autoDelete, QueueRegistry queueRegistry,
+ boolean autoDelete, VirtualHost virtualHost,
SubscriptionSet subscribers)
throws AMQException
{
- this(name, durable, owner, autoDelete, queueRegistry,
+ this(name, durable, owner, autoDelete, virtualHost,
AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers, new SubscriptionImpl.Factory());
}
protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
- boolean autoDelete, QueueRegistry queueRegistry,
+ boolean autoDelete, VirtualHost virtualHost,
Executor asyncDelivery, SubscriptionSet subscribers, SubscriptionFactory subscriptionFactory)
throws AMQException
{
@@ -205,18 +180,20 @@
{
throw new IllegalArgumentException("Queue name must not be null");
}
- if (queueRegistry == null)
+ if (virtualHost == null)
{
- throw new IllegalArgumentException("Queue registry must not be null");
+ throw new IllegalArgumentException("Virtual Host must not be null");
}
_name = name;
_durable = durable;
_owner = owner;
_autoDelete = autoDelete;
- _queueRegistry = queueRegistry;
+ _virtualHost = virtualHost;
_asyncDelivery = asyncDelivery;
+
_managedObject = createMBean();
_managedObject.register();
+
_subscribers = subscribers;
_subscriptionFactory = subscriptionFactory;
_deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
@@ -492,10 +469,18 @@
public void delete() throws AMQException
{
- _subscribers.queueDeleted(this);
- _bindings.deregister();
- _queueRegistry.unregisterQueue(_name);
- _managedObject.unregister();
+ if(!_deleted.getAndSet(true))
+ {
+ _subscribers.queueDeleted(this);
+ _bindings.deregister();
+ _virtualHost.getQueueRegistry().unregisterQueue(_name);
+ _managedObject.unregister();
+ for(Task task : _deleteTaskList)
+ {
+ task.doTask(this);
+ }
+ _deleteTaskList.clear();
+ }
}
protected void autodelete() throws AMQException
@@ -620,6 +605,24 @@
return _deliveryMgr.performGet(session, channel, acks);
}
-
+ public QueueRegistry getQueueRegistry()
+ {
+ return _virtualHost.getQueueRegistry();
+ }
+
+ public VirtualHost getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ public static interface Task
+ {
+ public void doTask(AMQQueue queue) throws AMQException;
+ }
+
+ public void addQueueDeleteTask(Task task)
+ {
+ _deleteTaskList.add(task);
+ }
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Wed Jan 24 07:41:48 2007
@@ -20,7 +20,9 @@
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.Main;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -28,11 +30,7 @@
import org.apache.mina.common.ByteBuffer;
import javax.management.openmbean.*;
-import javax.management.JMException;
-import javax.management.Notification;
-import javax.management.MBeanException;
-import javax.management.MBeanNotificationInfo;
-import javax.management.OperationsException;
+import javax.management.*;
import javax.management.monitor.MonitorNotification;
import java.util.List;
import java.util.ArrayList;
@@ -73,6 +71,12 @@
_queueName = jmxEncode(new StringBuffer(queue.getName()), 0).toString();
}
+
+ public ManagedObject getParentObject()
+ {
+ return _queue.getVirtualHost().getManagedObject();
+ }
+
static
{
try
@@ -373,6 +377,14 @@
return _messageList;
}
+//
+// public ObjectName getObjectName() throws MalformedObjectNameException
+// {
+// String objNameString = super.getObjectName().toString();
+//
+// return new ObjectName(objNameString);
+// }
+
/**
* returns Notifications sent by this MBean.
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java Wed Jan 24 07:41:48 2007
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.framing.AMQShortString;
import java.util.concurrent.ConcurrentMap;
@@ -30,8 +31,16 @@
{
private ConcurrentMap<AMQShortString, AMQQueue> _queueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>();
- public DefaultQueueRegistry()
+ private final VirtualHost _virtualHost;
+
+ public DefaultQueueRegistry(VirtualHost virtualHost)
+ {
+ _virtualHost = virtualHost;
+ }
+
+ public VirtualHost getVirtualHost()
{
+ return _virtualHost;
}
public void registerQueue(AMQQueue queue) throws AMQException
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java Wed Jan 24 07:41:48 2007
@@ -21,11 +21,14 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.framing.AMQShortString;
public interface QueueRegistry
{
+ VirtualHost getVirtualHost();
+
void registerQueue(AMQQueue queue) throws AMQException;
void unregisterQueue(AMQShortString name) throws AMQException;
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Wed Jan 24 07:41:48 2007
@@ -23,6 +23,7 @@
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.Configurator;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.HashMap;
import java.util.Iterator;
@@ -38,7 +39,7 @@
{
private static final Logger _logger = Logger.getLogger(ApplicationRegistry.class);
- private static Map _instanceMap = new HashMap();
+ private static Map<Integer, IApplicationRegistry> _instanceMap = new HashMap<Integer, IApplicationRegistry>();
private final Map<Class<?>, Object> _configuredObjects = new HashMap<Class<?>, Object>();
@@ -62,20 +63,13 @@
{
synchronized (ApplicationRegistry.class)
{
- Iterator keyIterator = _instanceMap.keySet().iterator();
+ Iterator<IApplicationRegistry> keyIterator = _instanceMap.values().iterator();
while (keyIterator.hasNext())
{
- int key = (Integer) keyIterator.next();
- IApplicationRegistry instance = (IApplicationRegistry) _instanceMap.get(key);
+ IApplicationRegistry instance = keyIterator.next();
- if ((instance != null))
- {
- if (instance.getMessageStore() != null)
- {
- instance.getMessageStore().close();
- }
- }
+ instance.close();
}
}
}
@@ -118,7 +112,7 @@
{
try
{
- ((IApplicationRegistry) _instanceMap.get(instanceID)).getMessageStore().close();
+ _instanceMap.get(instanceID).close();
}
catch (Exception e)
{
@@ -143,7 +137,7 @@
public static IApplicationRegistry getInstance(int instanceID)
{
- IApplicationRegistry instance = (IApplicationRegistry) _instanceMap.get(instanceID);
+ IApplicationRegistry instance = _instanceMap.get(instanceID);
if (instance == null)
{
@@ -168,6 +162,14 @@
}
}
+ public void close() throws Exception
+ {
+ for(VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts())
+ {
+ virtualHost.close();
+ }
+ }
+
public Configuration getConfiguration()
{
return _configuration;
@@ -192,6 +194,8 @@
}
return instance;
}
+
+
public static void setDefaultApplicationRegistry(String clazz)
{
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java Wed Jan 24 07:41:48 2007
@@ -38,22 +38,26 @@
import org.apache.qpid.server.security.auth.AuthenticationManager;
import org.apache.qpid.server.security.auth.SASLAuthenticationManager;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.mina.common.ByteBuffer;
import java.io.File;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
{
- private QueueRegistry _queueRegistry;
-
- private ExchangeRegistry _exchangeRegistry;
-
- private ExchangeFactory _exchangeFactory;
private ManagedObjectRegistry _managedObjectRegistry;
private AuthenticationManager _authenticationManager;
- private MessageStore _messageStore;
+ private VirtualHostRegistry _virtualHostRegistry;
+
+
+ private final Map<String, VirtualHost> _virtualHosts = new ConcurrentHashMap<String, VirtualHost>();
+
public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException
{
@@ -91,11 +95,19 @@
public void initialise() throws Exception
{
initialiseManagedObjectRegistry();
- _queueRegistry = new DefaultQueueRegistry();
- _exchangeFactory = new DefaultExchangeFactory();
- _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
+ _virtualHostRegistry = new VirtualHostRegistry();
_authenticationManager = new SASLAuthenticationManager();
- initialiseMessageStore();
+
+ initialiseVirtualHosts();
+ }
+
+ private void initialiseVirtualHosts() throws Exception
+ {
+ for(String name : getVirtualHostNames())
+ {
+
+ _virtualHostRegistry.registerVirtualHost(new VirtualHost(name,getConfiguration().subset("virtualhosts.virtualhost."+name)));
+ }
}
private void initialiseManagedObjectRegistry()
@@ -111,34 +123,10 @@
}
}
- private void initialiseMessageStore() throws Exception
- {
- String messageStoreClass = _configuration.getString("store.class");
- Class clazz = Class.forName(messageStoreClass);
- Object o = clazz.newInstance();
- if (!(o instanceof MessageStore))
- {
- throw new Exception("Message store class must implement " + MessageStore.class + ". Class " + clazz +
- " does not.");
- }
- _messageStore = (MessageStore) o;
- _messageStore.configure(getQueueRegistry(), "store", _configuration);
- }
-
- public QueueRegistry getQueueRegistry()
+ public VirtualHostRegistry getVirtualHostRegistry()
{
- return _queueRegistry;
- }
-
- public ExchangeRegistry getExchangeRegistry()
- {
- return _exchangeRegistry;
- }
-
- public ExchangeFactory getExchangeFactory()
- {
- return _exchangeFactory;
+ return _virtualHostRegistry;
}
public ManagedObjectRegistry getManagedObjectRegistry()
@@ -151,8 +139,8 @@
return _authenticationManager;
}
- public MessageStore getMessageStore()
+ public Collection<String> getVirtualHostNames()
{
- return _messageStore;
- }
+ return getConfiguration().getList("virtualhosts.virtualhost.name");
+ }
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java Wed Jan 24 07:41:48 2007
@@ -26,8 +26,12 @@
import org.apache.qpid.server.management.ManagedObjectRegistry;
import org.apache.qpid.server.security.auth.AuthenticationManager;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.commons.configuration.Configuration;
+import java.util.Collection;
+
public interface IApplicationRegistry
{
/**
@@ -35,7 +39,9 @@
* that need access to the application registry itself for initialisation are able to use it. Attempting to
* initialise in the constructor will lead to failures since the registry reference will not have been set.
*/
- void initialise() throws Exception;
+ void initialise() throws Exception;
+
+ void close() throws Exception;
/**
* This gets access to a "configured object". A configured object has fields populated from a the configuration
@@ -54,15 +60,11 @@
*/
Configuration getConfiguration();
- QueueRegistry getQueueRegistry();
-
- ExchangeRegistry getExchangeRegistry();
-
- ExchangeFactory getExchangeFactory();
-
ManagedObjectRegistry getManagedObjectRegistry();
AuthenticationManager getAuthenticationManager();
- MessageStore getMessageStore();
+ Collection<String> getVirtualHostNames();
+
+ VirtualHostRegistry getVirtualHostRegistry();
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Wed Jan 24 07:41:48 2007
@@ -31,6 +31,7 @@
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.log4j.Logger;
import java.util.HashMap;
@@ -46,8 +47,8 @@
public class AMQStateManager implements AMQMethodListener
{
private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
- private final QueueRegistry _queueRegistry;
- private final ExchangeRegistry _exchangeRegistry;
+
+ private final VirtualHostRegistry _virtualHostRegistry;
private final AMQProtocolSession _protocolSession;
/**
* The current state
@@ -63,15 +64,15 @@
private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>();
- public AMQStateManager(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
+
+ public AMQStateManager(VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
{
- this(AMQState.CONNECTION_NOT_STARTED, true, queueRegistry, exchangeRegistry, protocolSession);
+ this(AMQState.CONNECTION_NOT_STARTED, true, virtualHostRegistry, protocolSession);
}
- protected AMQStateManager(AMQState initial, boolean register, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
+ protected AMQStateManager(AMQState initial, boolean register, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
{
- _queueRegistry = queueRegistry;
- _exchangeRegistry = exchangeRegistry;
+ _virtualHostRegistry = virtualHostRegistry;
_protocolSession = protocolSession;
_currentState = initial;
if (register)
@@ -176,7 +177,7 @@
checkChannel(evt, _protocolSession);
- handler.methodReceived(this, _queueRegistry, _exchangeRegistry, _protocolSession, evt);
+ handler.methodReceived(this, evt);
return true;
}
return false;
@@ -240,5 +241,15 @@
public void removeStateListener(StateListener listener)
{
_stateListeners.remove(listener);
+ }
+
+ public VirtualHostRegistry getVirtualHostRegistry()
+ {
+ return _virtualHostRegistry;
+ }
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java Wed Jan 24 07:41:48 2007
@@ -25,6 +25,7 @@
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.framing.AMQMethodBody;
/**
@@ -34,7 +35,5 @@
*/
public interface StateAwareMethodListener <B extends AMQMethodBody>
{
- void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<B> evt) throws AMQException;
+ void methodReceived(AMQStateManager stateManager, AMQMethodEvent<B> evt) throws AMQException;
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Wed Jan 24 07:41:48 2007
@@ -28,6 +28,7 @@
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
import java.util.List;
@@ -67,7 +68,7 @@
_contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(hashtableCapacity);
}
- public void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception
+ public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
{
configure(base, config);
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Wed Jan 24 07:41:48 2007
@@ -27,6 +27,7 @@
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.List;
@@ -35,13 +36,13 @@
/**
* Called after instantiation in order to configure the message store. A particular implementation can define
* whatever parameters it wants.
- * @param queueRegistry the registry of queues to be used by this store
+ * @param virtualHost the virtual host using by this store
* @param base the base element identifier from which all configuration items are relative. For example, if the base
* element is "store", the all elements used by concrete classes will be "store.foo" etc.
* @param config the apache commons configuration object
* @throws Exception if an error occurs that means the store is unable to configure itself
*/
- void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception;
+ void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception;
/**
* Called to close and cleanup any resources used by the message store.
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java Wed Jan 24 07:41:48 2007
@@ -33,24 +33,23 @@
import org.apache.qpid.server.security.auth.NullAuthenticationManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.MapConfiguration;
import java.util.HashMap;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Arrays;
public class NullApplicationRegistry extends ApplicationRegistry
{
- private QueueRegistry _queueRegistry;
-
- private ExchangeRegistry _exchangeRegistry;
-
- private ExchangeFactory _exchangeFactory;
-
private ManagedObjectRegistry _managedObjectRegistry;
private AuthenticationManager _authenticationManager;
- private MessageStore _messageStore;
+ private VirtualHostRegistry _virtualHostRegistry;
public NullApplicationRegistry()
@@ -60,15 +59,16 @@
public void initialise() throws Exception
{
+ _configuration.addProperty("store.class","org.apache.qpid.server.store.MemoryMessageStore");
+
_managedObjectRegistry = new NoopManagedObjectRegistry();
- _queueRegistry = new DefaultQueueRegistry();
- _exchangeFactory = new DefaultExchangeFactory();
- _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
+ _virtualHostRegistry = new VirtualHostRegistry();
+ VirtualHost dummyHost = new VirtualHost("test",getConfiguration());
+ _virtualHostRegistry.registerVirtualHost(dummyHost);
_authenticationManager = new NullAuthenticationManager();
- _messageStore = new MemoryMessageStore();
- ((MemoryMessageStore)_messageStore).configure();
_configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes
+
}
public Configuration getConfiguration()
@@ -76,20 +76,6 @@
return _configuration;
}
- public QueueRegistry getQueueRegistry()
- {
- return _queueRegistry;
- }
-
- public ExchangeRegistry getExchangeRegistry()
- {
- return _exchangeRegistry;
- }
-
- public ExchangeFactory getExchangeFactory()
- {
- return _exchangeFactory;
- }
public ManagedObjectRegistry getManagedObjectRegistry()
{
@@ -101,9 +87,15 @@
return _authenticationManager;
}
- public MessageStore getMessageStore()
+ public Collection<String> getVirtualHostNames()
+ {
+ String[] hosts = {"test"};
+ return Arrays.asList( hosts );
+ }
+
+ public VirtualHostRegistry getVirtualHostRegistry()
{
- return _messageStore;
+ return _virtualHostRegistry;
}
}
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java?view=auto&rev=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java Wed Jan 24 07:41:48 2007
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.management.MBeanAttribute;
+import org.apache.qpid.server.management.MBeanOperation;
+import org.apache.qpid.server.management.MBeanOperationParameter;
+import org.apache.qpid.server.queue.ManagedQueue;
+import org.apache.qpid.server.exchange.ManagedExchange;
+
+import javax.management.openmbean.TabularData;
+import javax.management.JMException;
+import javax.management.MBeanOperationInfo;
+import java.io.IOException;
+
+/**
+ * The management interface exposed to allow management of an Exchange.
+ * @version 0.1
+ */
+public interface ManagedVirtualHost
+{
+ static final String TYPE = "VirtualHost";
+
+ /**
+ * Returns the name of the managed virtualHost.
+ * @return the name of the exchange.
+ * @throws java.io.IOException
+ */
+ @MBeanAttribute(name="Name", description= TYPE + " Name")
+ String getName() throws IOException;
+
+
+}
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?view=auto&rev=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Wed Jan 24 07:41:48 2007
@@ -0,0 +1,193 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.DefaultQueueRegistry;
+import org.apache.qpid.server.exchange.*;
+import org.apache.qpid.server.management.*;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.configuration.Configurator;
+import org.apache.qpid.server.AMQBrokerManagerMBean;
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+
+import javax.management.NotCompliantMBeanException;
+
+public class VirtualHost
+{
+ private static final Logger _logger = Logger.getLogger(VirtualHost.class);
+
+
+ private final String _name;
+
+ private QueueRegistry _queueRegistry;
+
+ private ExchangeRegistry _exchangeRegistry;
+
+ private ExchangeFactory _exchangeFactory;
+
+ private MessageStore _messageStore;
+
+ protected VirtualHostMBean _virtualHostMBean;
+
+ private AMQBrokerManagerMBean _brokerMBean;
+
+
+ /**
+ * Abstract MBean class. This has some of the methods implemented from
+ * management intrerface for exchanges. Any implementaion of an
+ * Exchange MBean should extend this class.
+ */
+ public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
+ {
+ public VirtualHostMBean() throws NotCompliantMBeanException
+ {
+ super(ManagedVirtualHost.class, "VirtualHost");
+ }
+
+
+ public String getObjectInstanceName()
+ {
+ return _name.toString();
+ }
+
+ public String getName()
+ {
+ return _name.toString();
+ }
+
+ public VirtualHost getVirtualHost()
+ {
+ return VirtualHost.this;
+ }
+
+
+ } // End of MBean class
+
+
+
+ public VirtualHost(String name, Configuration hostConfig) throws Exception
+ {
+ _name = name;
+
+ _virtualHostMBean = new VirtualHostMBean();
+ _virtualHostMBean.register();
+
+ _queueRegistry = new DefaultQueueRegistry(this);
+ _exchangeFactory = new DefaultExchangeFactory(this);
+ _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
+
+ initialiseMessageStore(hostConfig);
+
+ _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
+ _brokerMBean.register();
+
+ }
+
+ private void initialiseMessageStore(Configuration config) throws Exception
+ {
+ String messageStoreClass = config.getString("store.class");
+
+ Class clazz = Class.forName(messageStoreClass);
+ Object o = clazz.newInstance();
+
+ if (!(o instanceof MessageStore))
+ {
+ throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
+ " does not.");
+ }
+ _messageStore = (MessageStore) o;
+ _messageStore.configure(this, "store", config);
+ }
+
+
+
+
+ public <T> T getConfiguredObject(Class<T> instanceType, Configuration config)
+ {
+ T instance;
+ try
+ {
+ instance = instanceType.newInstance();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+ throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+ }
+ Configurator.configure(instance);
+
+ return instance;
+ }
+
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public QueueRegistry getQueueRegistry()
+ {
+ return _queueRegistry;
+ }
+
+ public ExchangeRegistry getExchangeRegistry()
+ {
+ return _exchangeRegistry;
+ }
+
+ public ExchangeFactory getExchangeFactory()
+ {
+ return _exchangeFactory;
+ }
+
+ public ApplicationRegistry getApplicationRegistry()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public MessageStore getMessageStore()
+ {
+ return _messageStore;
+ }
+
+ public void close() throws Exception
+ {
+ if(_messageStore != null)
+ {
+ _messageStore.close();
+ }
+ }
+
+ public ManagedObject getBrokerMBean()
+ {
+ return _brokerMBean;
+ }
+
+
+
+ public ManagedObject getManagedObject()
+ {
+ return _virtualHostMBean;
+ }
+}
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java?view=auto&rev=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java Wed Jan 24 07:41:48 2007
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.util.Map;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class VirtualHostRegistry
+{
+ private final Map<String, VirtualHost> _registry = new ConcurrentHashMap<String,VirtualHost>();
+
+ public synchronized void registerVirtualHost(VirtualHost host) throws Exception
+ {
+ if(_registry.containsKey(host.getName()))
+ {
+ throw new Exception("Virtual Host with name " + host.getName() + " already registered.");
+ }
+ _registry.put(host.getName(),host);
+ }
+
+ public VirtualHost getVirtualHost(String name)
+ {
+ return _registry.get(name);
+ }
+
+ public Collection<VirtualHost> getVirtualHosts()
+ {
+ return new ArrayList<VirtualHost>(_registry.values());
+ }
+}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Wed Jan 24 07:41:48 2007
@@ -156,7 +156,7 @@
{
this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
username + ":" + password + "@" +
- (clientName == null ? "" : clientName) +
+ (clientName == null ? "" : clientName) + "/" +
virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"));
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java Wed Jan 24 07:41:48 2007
@@ -114,6 +114,10 @@
if (virtualHost != null && (!virtualHost.equals("")))
{
+ if(virtualHost.startsWith("/"))
+ {
+ virtualHost = virtualHost.substring(1);
+ }
setVirtualHost(virtualHost);
}
else
Modified: incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/client/connection/TestManyConnections.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/client/connection/TestManyConnections.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/client/connection/TestManyConnections.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/client/connection/TestManyConnections.java Wed Jan 24 07:41:48 2007
@@ -46,7 +46,7 @@
long startTime = System.currentTimeMillis();
for (int i = 0; i < count; i++)
{
- createConnection(i, "vm://:1", "myClient" + i, "guest", "guest", "/test");
+ createConnection(i, "vm://:1", "myClient" + i, "guest", "guest", "test");
}
long endTime = System.currentTimeMillis();
_log.info("Time to create " + count + " connections: " + (endTime - startTime) +
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java Wed Jan 24 07:41:48 2007
@@ -76,7 +76,7 @@
Hashtable<String, String> env = new Hashtable<String, String>();
- env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/tests?brokerlist='vm://:1'");
+ env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/test?brokerlist='vm://:1'");
env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
_context = factory.getInitialContext(env);
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java Wed Jan 24 07:41:48 2007
@@ -70,7 +70,7 @@
Hashtable<String, String> env = new Hashtable<String, String>();
- env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/tests?brokerlist='vm://:1'");
+ env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/test?brokerlist='vm://:1'");
env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
_context = factory.getInitialContext(env);
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java Wed Jan 24 07:41:48 2007
@@ -49,7 +49,7 @@
public void testRecoverResendsMsgs() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
@@ -57,7 +57,7 @@
//force synch to ensure the consumer has resulted in a bound queue
((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
@@ -106,7 +106,7 @@
public void testRecoverResendsMsgsAckOnEarlier() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
@@ -114,7 +114,7 @@
//force synch to ensure the consumer has resulted in a bound queue
((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
@@ -170,7 +170,7 @@
public void testAcknowledgePerConsumer() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true);
@@ -178,7 +178,7 @@
MessageConsumer consumer = consumerSession.createConsumer(queue);
MessageConsumer consumer2 = consumerSession.createConsumer(queue2);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
MessageProducer producer2 = producerSession.createProducer(queue2);
@@ -209,7 +209,7 @@
public void testRecoverInAutoAckListener() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true);
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java?view=diff&rev=499446&r1=499445&r2=499446
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java Wed Jan 24 07:41:48 2007
@@ -48,7 +48,7 @@
protected void setUp() throws Exception
{
super.setUp();
- init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
}
protected void tearDown() throws Exception